posts - 20, comments - 32, trackbacks - 0, articles - 63
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

2020年3月25日

异步方法注解@Async

在SpringBoot中进行异步处理,可以使用异步注解@Async和@EnableAsync。
@Async注解表示异步,如:@Async("asyncTaskExecutor"),
后面的参数asyncServiceExecutor对应于自定义的线程池配置类(在以下例子中为ExecutorConfig) 中的线程池方法名
如果不写后面的参数,直接用@Async,则是使用默认的线程池。

Future实现类获取异步处理结果

如果想要获取异步处理的结果,可以通过Future接口的实现类调用get()方法获得。
Future接口的常见实现类有FutureTask。
在SpringBoot中,一般用 AsyncResult作为异步结果.

Future 缺点:

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。下面会详细解释:

示例:spring boot工程  异步处理业务类
1.AsyncTaskManager

@Service
@EnableAsync
public class AsyncTaskManager {
/**
* 这个业务注入的类
*/
@Autowired
private MessageDao messageDao;

/**
* @Async注解表示异步,后面的参数对应于线程池配置类ExecutorConfig中的方法名asyncServiceExecutor()
* 如果不写后面的参数,直接使用@Async注解,则是使用默认的线程池
* Future<String>为异步返回的结果。可以通过get()方法获取结果。
* @param s
* @throws Exception
*/
@Async(value = "asyncTaskExecutor")
public void transTask(String s) throws Exception {
messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
TimeUnit.SECONDS.sleep(6);
}

/**
* 异步调用,有返回值,必须是Future类型,不然报错
* 如果不写后面的参数,直接用@Async,则是使用默认的线程池。
* 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public Future<String> transTaskForFuture(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}

/**
* 基于回调的listenableFuture比上种子线程直接返回Future优质是,主线程不用等待,任务在完成后会自动执行回调代码。
* 因此在调用时要注册回调代码,包括成功回调和失败回调
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public ListenableFuture<String> transTaskForCallback(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}

/**
* 从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
* 最主要是可以提供复杂的
* CompletableFuture可以指定异步处理流程:
* thenAccept()处理正常结果;
* exceptional()处理异常结果;
* thenApplyAsync()用于串行化另一个CompletableFuture;
* anyOf()和allOf()用于并行化多个CompletableFuture。
* 详解请看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
Object result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
Object result=null;
try {
result=messageDao.getUserCode(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
}
Dao层业务类:


@Repository
public class MessageDao {

public String getMessage(String s){
return s;
}

public String callBackMessage(String s){
return "这是注册回调返回结果s="+s;
}

public String getUserCode(int id){
return "000"+id;
}
public String getUserName(String code){
return "李四";
}
public String getUserDepartment(String code){
return "技术开发部";
}
}
 

线程池ThreadPoolTaskExecutor

SpringBoot中的线程池一般用ThreadPoolTaskExecutor 类。
ThreadPoolTaskExecutor继承关系如下:

ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor

关系结构图为:

2.自定义线程池配置如下:

@Configuration
public class AsyncTaskConfig {
/**
* IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
* CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度自测而定
*/
@Bean(name = "asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//1: 核心线程数目
executor.setCorePoolSize(4);
//2: 指定最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
//3: 队列中最大的数目
executor.setQueueCapacity(200);
//4: 线程名称前缀
executor.setThreadNamePrefix("LocustTask-");
//5:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy: 会在execute 方法的调用线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务
// AbortPolicy: 抛出java.util.concurrent.RejectedExecutionException异常
// DiscardOldestPolicy: 抛弃旧的任务
// DiscardPolicy: 抛弃当前的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//6: 线程空闲后的最大存活时间(默认值 60),当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
//7:线程空闲时间,当线程空闲时间达到keepAliveSeconds(秒)时,线程会退出,直到线程数量等于corePoolSize,如果allowCoreThreadTimeout=true,则会直到线程数量等于0
executor.setAllowCoreThreadTimeOut(false);
executor.initialize();
return executor;
}
}

@EnableAsync开启异步

@EnableAsync表示开启异步,可以放在@Controller层上方,也可以放在Application类的上方,也可以直接放在业务类上例AsyncTaskManager

@Controller
@EnableAsync
public class XXXController {
    @Autowired
    private AsyncTaskManager asyncTaskManager;

    @GetMapping("/user/getList")
    @ResponseBody
    public String getUserData(){
        return asyncTaskManager.getAsyncResult();
    }
}

Junint 4单元测试类如下
AsyncTaskTest:

/**
* 小结CompletableFuture:
* thenAccept()处理正常结果;
* exceptional()处理异常结果;
* thenApplyAsync()用于串行化另一个CompletableFuture;
* anyOf()和allOf()用于并行化多个CompletableFuture
*
* 除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
* 最后我们注意CompletableFuture的命名规则:
*
* xxx():表示该方法将继续在已有的线程中执行;
* xxxAsync():表示将异步在线程池中执行。
*
*/
public class AsyncTaskTest extends BaseTest {

@Autowired
private AsyncTaskManager asyncTaskManager;

@Autowired
private MessageDao messageDao;

/**
* 主线等待子线完成后,获取返回结果
*
* @throws Exception
*/
@Test
public void testAsyncTaskForFuture() throws Exception {
Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
while (true) {
if (future.isDone() && !future.isCancelled()) {
System.out.println(Thread.currentThread().getName() + "子线程执行完毕");
break;
} else {
Thread.sleep(2000);
System.out.println("主线程" + Thread.currentThread().getName() + "待子线程执行完毕...");
}
}
}

/**
* 在调用时候,主线不用等待,可以注册回调类和方法进行
*
* @throws Exception
*/
@Test
public void testAsyncTaskForCallback() throws Exception {
ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
future.addCallback(
successCallback -> {
try {
String s=future.get(2L, TimeUnit.SECONDS);
String result=messageDao.callBackMessage(s);
System.out.println("回调结果:"+ result);
} catch (Exception e) {
e.printStackTrace();
}
},
FailureCallback -> {
System.out.println("子线程执行失败....");
}
);
Thread.sleep(20000);
}

/**
* 如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势,CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,
* 例如:第一个CompletableFuture 查询 员工code , 第二个CompletableFuture 可以根据 员工code 查询员工名称
* @throws Exception
*/
@Test
public void testAsyncTaskForCompletableFuture() throws Exception {
CompletableFuture<Object> completableFuture_1 = asyncTaskManager.transTaskForCompletableFuture("AAA---BBB");
// 1: 如果成功
completableFuture_1.thenAccept((result)->{
System.out.println(result);
});
// 2: 如果失败
completableFuture_1.exceptionally((e) ->{
e.printStackTrace();
return null;
});
Thread.sleep(20000);
}

/**
* 如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势,CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,
* 例如:第一个CompletableFuture 查询 员工code , 第二个CompletableFuture 可以根据 员工code 查询员工名称。
* @throws Exception
*/
@Test
public void testAsyncTaskForCompletableFuture2() throws Exception {
// 上面跟这种方式跟 ListenableFuture类似,看下面串行,和 并行执行
CompletableFuture<Object> completableFuture_1 = asyncTaskManager.transTaskForCompletableFuture("AAA---BBB");
// 1: 如果成功 completableFuture_1 去查询用户名称
CompletableFuture<String> fetchNameFuture=completableFuture_1.thenApplyAsync((code)->{
return messageDao.getUserName((String)code);
});
//2: 根据 fetchNameFuture 后打印下
fetchNameFuture.thenApply((reuslt) ->{
System.out.println(reuslt);
return reuslt;
});
Thread.sleep(20000);
}

/**
* 如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势,CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,
* 例如:两个异步查询同时去查询 员工消息和员工code,随时成功后去查询员工的名称和员工部门,随边一个成功打印
* @throws Exception
*/
@Test
public void testAsyncTaskForCompletableFuture3() throws Exception {
//1: 第一个查询查询员工消息,
CompletableFuture<Object> completableFuture_1 = asyncTaskManager.transTaskForCompletableFuture("AAA---BBB");
//2: 第二个 查询员工code
CompletableFuture<Object> completableFuture_2 = asyncTaskManager.transTaskForCompletableFuture2(1);

//3:用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> newCompletableFuture=CompletableFuture.anyOf(completableFuture_1,completableFuture_2);

//4:两个CompletableFuture执行异步查询:
CompletableFuture<String> cfFetchFrom_1 =newCompletableFuture.thenApplyAsync((code)->{
return messageDao.getUserName((String)code);
});
CompletableFuture<String> cfFetchFrom_2 =newCompletableFuture.thenApplyAsync((code)->{
return messageDao.getUserDepartment((String)code);
});
//5:用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFrom_1, cfFetchFrom_2);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(20000);
}

}

posted @ 2020-03-25 20:19 张钊钊 阅读(42) | 评论 (0)编辑 收藏

2018年12月10日

或者按Shift+p公司生产服务上常常出现 CPU 100% 问题,需要快速定位问题出现在那里,以下备注解决方法步骤:

1: 工具:top方法:
执行top -c ,显示进程运行信息列表
键入P (大写p),进程按照CPU使用率排序  (输入大写P,则结果按CPU占用降序排序。输入大写M,结果按内存占用降序排序。(注:大写P可以在capslock状态输入p,或者按Shift+p)
线上服务CPU100%问题快速定位实战
如上图找出最耗CPU 进程 10765
统计信息区

前五行是系统整体的统计信息。第一行是任务队列信息,同 uptime 命令的执行结果。其内容如下:

01:06:48当前时间
up 1:22系统运行时间,格式为时:分
1 user当前登录用户数
load average: 0.06, 0.60, 0.48系统负载,即任务队列的平均长度。
三个数值分别为 1分钟、5分钟、15分钟前到现在的平均值。

步骤二:找出最耗CPU的线程
  1.    top -Hp 10765 显示一个进程的线程运行信息列表
  2. 键入shift +p 线程按照CPU使用率降序排序
 步骤三: 把 10765 转化成16进制(因为堆栈是线程id是十六进制)
     命令: printf '%x' 10765   输出结果:2a0d

步骤四: 使用JVM命令  jstatck 
            jstack 10765 | grep '2a0d' -C5 --color     打印堆栈信息,通过id 过滤到线程的堆栈信息。


以下是top 其它常用命令:

附常用操作:

top   //每隔5秒显式所有进程的资源占用情况
top -d 2 //每隔2秒显式所有进程的资源占用情况
top -c //每隔5秒显式进程的资源占用情况,并显示进程的命令行参数(默认只有进程名)
top -p 1111 -p 6789//每隔5秒显示pid是1111和pid是6789的两个进程的资源占用情况
top -d 2 -c -p 1111//每隔2秒显示pid是1111的进程的资源使用情况,并显式该进程启动的命令行参数        

posted @ 2018-12-10 15:59 张钊钊 阅读(60) | 评论 (0)编辑 收藏

2018年11月30日

     摘要: 1.为什么我们需要多线程消费者模型?假设我们实现了一个通知模块,允许用户订阅来自其他用户,其他应用程序的通知。我们的模块读取将由其他用户,应用程序写入Kafka集群的消息。在这种情况下,我们可以获得写入Kafka主题的其他人的所有通知,我们的模块将创建一个消费者来订阅该主题。一开始似乎一切都很好。但是,如果其他应用程序,用户...产生的通知数量快速增加并超过我们模块可以处理的速率,会发生什么?好吧...  阅读全文

posted @ 2018-11-30 18:26 张钊钊 阅读(607) | 评论 (0)编辑 收藏

以前一直没有接触过kafka 消息中间件,现在公司要用它来做消息服务(sub/pub),安装都不多说了 主要是开发的时候遇到问题和解决方法:

版本: zookeeper-3.4.12.tar.gz  kafka_2.12-2.1.0.tgz  连接工具: kafkatool_64bit.exe   集成: spring boot 

pom.xml:

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>

程序就集成:

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
    public KafkaConsumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer<String, String>(consumerConfigs());
    }

 问题就在这里  KafkaConsumer 是让spring IOC来管理,刚刚开始只有@Bean 生成的对象实例就只有一个,但是在启动线程消息的时候只能一个对象一个线程,如果一个对象在启用线程去消费会报  KafkaConsumer is not safe for multi-threaded access

解决办法:
1.线程与KafkaConsumer对象实例的对应关系是1:1
2.要保证线程与KafkaConsumer对象的关系是固定不变的,也就是说,一个线程始终都只能操作同一个KafkaConsumer对象且一个KafkaConsumer对象始终是由同一个线程来操作的 所以在 @Bean 又加了  @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS) 来每一次用生成一个新实例对象 

2:问题 线程与KafkaConsumer对象实例的对应关系是1:1  ,但订阅的对对象 和线程使用poll  KafkaConsumer 对象又会发生变化导致监听消费报错
    Consumer is not subscribed to any topics or assigned any partitions,为什么会报没有定阅呢,明明已经定阅了
 
   解决办法不要让spring IOC 来管理KafkaConsumer 生成实例对象  使用new 方式生成。

看来了解下原理是很重要的以下是比较不错的文章(里边还有多线程消费源码和原理讲解)




posted @ 2018-11-30 16:05 张钊钊 阅读(1315) | 评论 (0)编辑 收藏

2018年1月25日

最近在公司有点时间所以深入研究了下数据库索引btree/b+tree数据结构和原理,由此牵引出了好多问题,请看如下带着问题研究。

1:为什么 btree/b+tree 数据结构适合数据库索引,它到底是怎么样一个原理和结构?

btree/b+tree 数据结构:

在之前的文章中我们介绍过AVL树,红黑树,它们都属于二叉树,即每个节点最多只能拥有2个子节点,而B-tree(B树)的每个节点可以拥有2个以上的子节点,所以我们简单概括一下:B-tree就是一颗多路平衡查找树,它广泛应用于数据库索引和文件系统中。

首先我们介绍一下一颗 m 阶B-tree的特性,那么这个 m 阶是怎么定义的呢?这里我们以一个节点能拥有的最大子节点数来表示这颗树的阶数。举个例子,如果一个节点最多有 n 个key,那么这个节点最多就会有 n+1 个子节点,这棵树就叫做 n+1(m=n+1)阶树。一颗 m 阶B-tree包括以下5条特性:

  1. 每个节点最多有 m 个子节点
  2. 除根节点和叶子节点,其它每个节点至少有 [m/2] (向上取整的意思)个子节点
  3. 若根节点不是叶子节点,则其至少有2个子节点
  4. 所有NULL节点到根节点的高度都一样
  5. 除根节点外,其它节点都包含 n 个key,其中 [m/2] -1 <= n <= m-1

这些特性可能看着不太好理解,下面我们会介绍B-tree的插入,在插入节点的过程中我们就会慢慢理解这些特性了。B-tree的插入比较简单,就是一个节点至下而上的分裂过程。下面我们具体以一颗4阶树来展示B-tree的插入过程。

首先我们 插入 200,300,400,没有什么问题,直接插入就好。

| 200 | 300 | 400 |

现在我们接着插入500,这个时候我们发现有点问题,根据定义及特性1我们知道一颗4阶B-tree的每个节点最多只能有3个key,插入500后这个节点就有4个key了。

| 200 | 300 | 400 | 500 |

这个时候我们就需要分裂,将中间的key上移到父节点,左边的作为左节点,右边的作为右节点,如下图所示:

这个时候我们是不是就明白特性3了,如果根节点不是叶子节点,那么它肯定发生了分裂,所以至少会有2个子节点。同样我们接着插入600,700,800,900插入过程如下图所示:

现在根节点也已经满了,如果我们继续插入910,920,会怎样呢?根节点就会继续分裂,树继续向上生长。看下图:

通过整个的插入过程我们也会发现,B-tree和二叉树的一个显著的区别就是,B-tree是从下往上生长,而二叉树是从上往下生长的。现在我们想想特性2和特性5是为什么?首先我们知道子节点的个数是等于key的数目+1,然后一个节点达到m个key后就会分裂,所以分裂后的节点最少能得到 m/2 - 1个key 。为啥还要减一呢?因为还要拿一个作为父节点。所以这个节点最少回拥有 m/2 - 1 + 1 = m/2 个子节点。同样得到特性5,因为最少有m/2个子节点,所以最少就含有m/2-1个key,m 阶树,每个节点存到了m个key就会分裂,所以最多就有 m-1个key。

根据以上特性我们能推出一棵含有N个总关键字数的m阶的B-tree树的最大高度h的值,

树的高度h: 1, 2, 3 , 4 ,.......... , h

节点个数s: 1, 2, 2*(m/2), 2*(m/2)(m/2), ........ ,2*(m/2)的h-2次方

s = 1 + 2(1 - (m/2)^{h-1} )/(1- (m/2))

N = 1 + s * ((m/2) - 1) = 2 * ((m/2)^{h-1} ) - 1

h = log┌m/2┐((N+1)/2 )+1

2:为什么btree/b+tree 为常用数据库索引结构?

上文说过,红黑树等数据结构也可以用来实现索引,但是文件系统及数据库系统普遍采用B-/+Tree作为索引结构,这一节将结合计算机组成原理相关知识讨论B-/+Tree作为索引的理论基础。

一般来说,索引本身也很大,不可能全部存储在内存中,因此索引往往以索引文件的形式存储的磁盘上。这样的话,索引查找过程中就要产生磁盘I/O消耗,相对于内存存取,I/O存取的消耗要高几个数量级,所以评价一个数据结构作为索引的优劣最重要的指标就是在查找过程中磁盘I/O操作次数的渐进复杂度。换句话说,索引的结构组织要尽量减少查找过程中磁盘I/O的存取次数。下面先介绍内存和磁盘存取原理,然后再结合这些原理分析B-/+Tree作为索引的效率。

主存存取原理

目前计算机使用的主存基本都是随机读写存储器(RAM),现代RAM的结构和存取原理比较复杂,这里本文抛却具体差别,抽象出一个十分简单的存取模型来说明RAM的工作原理。

图5

从抽象角度看,主存是一系列的存储单元组成的矩阵,每个存储单元存储固定大小的数据。每个存储单元有唯一的地址,现代主存的编址规则比较复杂,这里将其简化成一个二维地址:通过一个行地址和一个列地址可以唯一定位到一个存储单元。图5展示了一个4 x 4的主存模型。

主存的存取过程如下:

当系统需要读取主存时,则将地址信号放到地址总线上传给主存,主存读到地址信号后,解析信号并定位到指定存储单元,然后将此存储单元数据放到数据总线上,供其它部件读取。

写主存的过程类似,系统将要写入单元地址和数据分别放在地址总线和数据总线上,主存读取两个总线的内容,做相应的写操作。

这里可以看出,主存存取的时间仅与存取次数呈线性关系,因为不存在机械操作,两次存取的数据的“距离”不会对时间有任何影响,例如,先取A0再取A1和先取A0再取D3的时间消耗是一样的。

磁盘存取原理

上文说过,索引一般以文件形式存储在磁盘上,索引检索需要磁盘I/O操作。与主存不同,磁盘I/O存在机械运动耗费,因此磁盘I/O的时间消耗是巨大的。

图6是磁盘的整体结构示意图。

图6

一个磁盘由大小相同且同轴的圆形盘片组成,磁盘可以转动(各个磁盘必须同步转动)。在磁盘的一侧有磁头支架,磁头支架固定了一组磁头,每个磁头负责存取一个磁盘的内容。磁头不能转动,但是可以沿磁盘半径方向运动(实际是斜切向运动),每个磁头同一时刻也必须是同轴的,即从正上方向下看,所有磁头任何时候都是重叠的(不过目前已经有多磁头独立技术,可不受此限制)。

图7是磁盘结构的示意图。

图7

盘片被划分成一系列同心环,圆心是盘片中心,每个同心环叫做一个磁道,所有半径相同的磁道组成一个柱面。磁道被沿半径线划分成一个个小的段,每个段叫做一个扇区,每个扇区是磁盘的最小存储单元。为了简单起见,我们下面假设磁盘只有一个盘片和一个磁头。

当需要从磁盘读取数据时,系统会将数据逻辑地址传给磁盘,磁盘的控制电路按照寻址逻辑将逻辑地址翻译成物理地址,即确定要读的数据在哪个磁道,哪个扇区。为了读取这个扇区的数据,需要将磁头放到这个扇区上方,为了实现这一点,磁头需要移动对准相应磁道,这个过程叫做寻道,所耗费时间叫做寻道时间,然后磁盘旋转将目标扇区旋转到磁头下,这个过程耗费的时间叫做旋转时间。

局部性原理与磁盘预读

由于存储介质的特性,磁盘本身存取就比主存慢很多,再加上机械运动耗费,磁盘的存取速度往往是主存的几百分分之一,因此为了提高效率,要尽量减少磁盘I/O。为了达到这个目的,磁盘往往不是严格按需读取,而是每次都会预读,即使只需要一个字节,磁盘也会从这个位置开始,顺序向后读取一定长度的数据放入内存。这样做的理论依据是计算机科学中著名的局部性原理:

当一个数据被用到时,其附近的数据也通常会马上被使用。

程序运行期间所需要的数据通常比较集中。

由于磁盘顺序读取的效率很高(不需要寻道时间,只需很少的旋转时间),因此对于具有局部性的程序来说,预读可以提高I/O效率。

预读的长度一般为页(page)的整倍数。页是计算机管理存储器的逻辑块,硬件及操作系统往往将主存和磁盘存储区分割为连续的大小相等的块,每个存储块称为一页(在许多操作系统中,页得大小通常为4k),主存和磁盘以页为单位交换数据。当程序要读取的数据不在主存中时,会触发一个缺页异常,此时系统会向磁盘发出读盘信号,磁盘会找到数据的起始位置并向后连续读取一页或几页载入内存中,然后异常返回,程序继续运行。

B-/+Tree索引的性能分析

到这里终于可以分析B-/+Tree索引的性能了。

上文说过一般使用磁盘I/O次数评价索引结构的优劣。先从B-Tree分析,根据B-Tree的定义,可知检索一次最多需要访问h个节点。数据库系统的设计者巧妙利用了磁盘预读原理,将一个节点的大小设为等于一个页,这样每个节点只需要一次I/O就可以完全载入。为了达到这个目的,在实际实现B- Tree还需要使用如下技巧:

每次新建节点时,直接申请一个页的空间,这样就保证一个节点物理上也存储在一个页里,加之计算机存储分配都是按页对齐的,就实现了一个node只需一次I/O。

B-Tree中一次检索最多需要h-1次I/O(根节点常驻内存),渐进复杂度为O(h)=O(logdN)。一般实际应用中,出度d是非常大的数字,通常超过100,因此h非常小(通常不超过3)。

综上所述,用B-Tree作为索引结构效率是非常高的。

而红黑树这种结构,h明显要深的多。由于逻辑上很近的节点(父子)物理上可能很远,无法利用局部性,所以红黑树的I/O渐进复杂度也为O(h),效率明显比B-Tree差很多。

上文还说过,B+Tree更适合外存索引,原因和内节点出度d有关。从上面分析可以看到,d越大索引的性能越好,而出度的上限取决于节点内key和data的大小:

dmax = floor(pagesize / (keysize + datasize + pointsize))  (pagesize – dmax >= pointsize)

dmax = floor(pagesize / (keysize + datasize + pointsize)) - 1  (pagesize – dmax < pointsize)

floor表示向下取整。由于B+Tree内节点去掉了data域,因此可以拥有更大的出度,拥有更好的性能。

这一章从理论角度讨论了与索引相关的数据结构与算法问题,下一章将讨论B+Tree是如何具体实现为MySQL中索引,同时将结合MyISAM和InnDB存储引擎介绍非聚集索引和聚集索引两种不同的索引实现形式。



posted @ 2018-01-25 13:44 张钊钊 阅读(2770) | 评论 (0)编辑 收藏

2017年8月7日

1:pom.xml 找自己spring boot 对应的版本
  <!-- redis 配置-->
 <dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>2.8.1</version>
 </dependency>
 <dependency>
 <groupId>org.springframework.data</groupId>
 <artifactId>spring-data-redis</artifactId>
 <version>1.7.2.RELEASE</version>
 </dependency>
<!-- end redis 配置-->

2: 在spring boot   *.properties 配置文件中增加连接配置:
     spring.redis.hostName=127.0.0.1
    spring.redis.port=6379
    spring.redis.password=
    spring.redis.pool.maxActive=8
    spring.redis.pool.maxWait=-1
    spring.redis.pool.maxIdle=8
    spring.redis.pool.minIdle=0
    spring.redis.timeout=0
3: spring boot 中增加redis 配置连接
      /**
 * Created by Administrator on 2017/2/27.
 
*/
@Configuration
@EnableAutoConfiguration
public class RedisConfig {

    private static Logger logger = LoggerFactory.getLogger(RedisConfig.class);

    @Bean
    @ConfigurationProperties(prefix="spring.redis")
    public JedisPoolConfig getRedisConfig(){
        JedisPoolConfig config = new JedisPoolConfig();
        return config;
    }
  @Bean
    @ConfigurationProperties(prefix="spring.redis")
    public RedisConnectionFactory jedisConnectionFactory(){
        return new JedisConnectionFactory(getRedisConfig());
    }
    @Bean
    public RedisTemplate<String, Object> redisTemplate(){
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        //这个缓存key的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
        //设置redis 转换 value 通过jdk序列化方法,GenericJackson2JsonRedisSerializer 这个是json形式
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setConnectionFactory(jedisConnectionFactory());
        return template;
    }

    /**
     * 这个是redis 多值操作返回模版对象。比如:map,set ,list 等
     * 
@return
     
*/  @Bean
    public ValueOperations<String, Object> valueOperations(){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        //这个缓存key的序列化方式
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //这个缓存value的序列化方式,通过有jdk序列化,这个是json 序列化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setExposeConnection(true);
        redisTemplate.setConnectionFactory(jedisConnectionFactory());
        redisTemplate.afterPropertiesSet();
        return redisTemplate.opsForValue();
    }
}
4: RedisTemplate 使用
   @Service
public class RedisServiceImpl implements RedisService{
    @Autowired
    private RedisTemplate redisTemplate;
    public boolean expire(final String key, long expire,final TimeUnit unit) {
        return redisTemplate.expire(key, expire, unit);
    }
    @Override
    public void setList(String key, List<String> list, long expire, TimeUnit unit) throws Exception {
        redisTemplate.delete(key);
        redisTemplate.opsForValue().set(key,list,expire,unit);
    }
    public List<String> getList(String key) {
        return (List<String>)redisTemplate.opsForValue().get(key);
    }
    public void remove(String key){
        redisTemplate.delete(key);
    }
    @Override
    public void setKey(String key, String s, long expire, TimeUnit unit) {
        redisTemplate.opsForValue().set(key,s,expire,unit);
    }
    
    @Override
    public String getKey(String key) {
        return (String) redisTemplate.opsForValue().get(key);
    }
    @Override
    public Object getKeyForObject(String key) {
        return redisTemplate.opsForValue().get(key);
    }
    @Override
    public void setMap(String key, Map<String, Object> map, long expire, TimeUnit unit) throws Exception {
        this.expire(key,expire,unit);
        redisTemplate.opsForHash().putAll(key,map);
    }
      @Override
    public void removeKeyForMap(String key, String mapKey) {
        redisTemplate.opsForHash().delete(key,mapKey);
    }
    @Override
    public void putMapKey(String key, String mapKey, String mapValue) {
        redisTemplate.opsForHash().put(key,mapKey,mapValue);
    }
    public Set<String> keys(String o ){
        return redisTemplate.keys(o);
    }
}

5:单元测试类

RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)// 指定spring-boot的启动类
//相当于  --spring.profiles.active=dev
@ActiveProfiles(value = "dev")
public class RedisCacheTest {
    protected final Logger logger = Logger.getLogger(this.getClass());
    @Autowired
    private RedisService redisService;
    @Autowired
    private RedisTemplate redisTemplate;
    @Test
    public void CacheTest() throws Exception {
        redisService.setKey("111111","222222",100, TimeUnit.SECONDS);
        redisService.getKey("111111");
    }
    @Test
    public void SaveObjectForCacheTest()throws Exception{
        TargetDB targetDB=new TargetDB();
        targetDB.setDbType("111");
        targetDB.setPort("22");
        targetDB.setUserName("zzzlyr");
        redisTemplate.opsForValue().set("key111",targetDB);
        TargetDB targetDB12= (TargetDB) redisTemplate.opsForValue().get("key111");
        System.out.println(targetDB12.toString());
    }
      @Test
    public void SaveMapForCacheTest()throws Exception {
        TargetDB targetDB = new TargetDB();
        targetDB.setDbType("111");
        targetDB.setPort("22");
        targetDB.setUserName("zzzlyr");
        List<TargetDB> list = new LinkedList<TargetDB>();
        list.add(targetDB);
        Map<String, List<TargetDB>> map = new LinkedHashMap<String, List<TargetDB>>();
        map.put("111111111111", list);
        //向缓存中放入map
        redisTemplate.opsForHash().putAll("zzzzz", map);
        //从缓存中获取map
        Map<String, List<TargetDB>> cacheMap = redisTemplate.opsForHash().entries("zzzzz");
        //Map<String,List<String>> cacheMap= (Map<String, List<String>>) redisTemplate.opsForValue().get("platform-app_AppOperation");
        System.out.println(cacheMap.toString());
    }
}

posted @ 2017-08-07 17:38 张钊钊 阅读(457) | 评论 (0)编辑 收藏

2017年7月10日

前言:JUnit元数据

@Before: 
使用了该元数据的方法在每个测试方法执行之前都要执行一次。 
@After: 
使用了该元数据的方法在每个测试方法执行之后要执行一次。 
注意:@Before和@After标示的方法只能各有一个。这个相当于取代了JUnit以前版本中的setUp和tearDown方法,当然你还可以继续叫这个名字,不过JUnit不会霸道的要求你这么做了。
@Test(expected=*.class) 
在JUnit4.0之前,对错误的测试,我们只能通过fail来产生一个错误,并在try块里面assertTrue(true)来测试。现在,通过@Test元数据中的expected属性。expected属性的值是一个异常的类型
@Test(timeout=xxx): 
该元数据传入了一个时间(毫秒)给测试方法, 
如果测试方法在制定的时间之内没有运行完,则测试也失败。 
@ignore: 
该元数据标记的测试方法在测试中会被忽略。当测试的方法还没有实现,或者测试的方法已经过时,或者在某种条件下才能测试该方法(比如需要一个数据库联接,而在本地测试的时候,数据库并没有连接),那么使用该标签来标示这个方法。同时,你可以为该标签传递一个String的参数,来表明为什么会忽略这个测试方 法。比如:@lgnore(“该方法还没有实现”),在执行的时候,仅会报告该方法没有实现,而不会运行测试方法。、

一、包含必要地Package

最主要地一个 Package就是org.junit.*,把它包含进来之后,绝大部分功能就有了。还有一句话也非常地重要“import static org.junit.Assert.*;”,我们在测试的时候使用的一系列assertEquals方法就来自这个包。大家注意一下,这是一个静态包含 (static),是JDK5中新增添的一个功能。也就是说,assertEquals是Assert类中的一系列的静态方法

二、测试类的声明

测试类是一个独立的类,没有任何父类。测试类的名字也可以任意命名,没有任何局限性。它与普通类的区别在于它内部的方法的声明

三、创建一个待测试的对象

你要测试哪个类,那么你首先就要创建一个该类的对象。

private staticCalculator calculator =newCalculator();

为了测试Calculator类,我们必须创建一个calculator对象。

四、测试方法的声明

在测试类中,并不是每一个方法都是用于测试的,你必须使用“标注”来明确表明哪些是测试方法。“标注”也是JDK5的一个新特性,用在此处非常恰当。我们可以看到,在某些方法的前有@Before、@Test、@Ignore等字样,这些就是标注,以一个“@”作为开头。这些标注都是JUnit4自定义 的,熟练掌握这些标注的含义非常重要。

六、 忽略测试某些尚未完成的方法

七、 Fixture(暂且翻译为“固定代码段”)

Fixture 的含义就是“在某些阶段必然被调用的代码”。“在任何一个测试执行之前必须执行的代码”就是一个Fixture,我们用@Before来标注它

一、 高级Fixture

两个Fixture标注,分别是@Before和@After,是否适合完成如下功能:有一个类是负责对大文件(超过 500兆)进行读写,他的每一个方法都是对文件进行操作。换句话说,在调用每一个方法之前,我们都要打开一个大文件并读入文件内容,这绝对是一个非常耗费时间的操作。如果我们使用@Before和@After,那么每次测试都要读取一次文件,效率及其低下。这里我们所希望的是在所有测试一开始读一次文件, 所有测试结束之后释放文件,而不是每次测试都读文件。JUnit的作者显然也考虑到了这个问题,它给出了@BeforeClass 和 @AfterClass两个Fixture来帮我们实现这个功能。从名字上就可以看出,用这两个Fixture标注的函数,只在测试用例初始化时执行@BeforeClass方法,当所有测试执行完毕之后,执行@AfterClass进行收尾工作。在这里要注意一下,每个测试类只能有一个方法被标注为 @BeforeClass或@AfterClass,并且该方法必须是Public和Static的。

二、 限时测试

那个求平方根的函数有Bug,是个死循环:

  public voidsquareRoot(intn) ...{

         for(; ;) ;//Bug : 死循环

  }

如果测试的时候遇到死循环,对于那些逻辑很复杂,循环嵌套比较深的程序,很有可能出现死循环,因此一定要采取一些预防措施。我们给这些测试函数设定一个执行时间,超过了这个时间,他们就会被系统强行终止,并且系统还会向你汇报该函数结束的原因是因为超时,这样你就可以发现这些Bug了。只需要给@Test标注加一个参数即可,代码如下:

  @Test(timeout = 1000)

  public voidsquareRoot() ...{

         calculator.squareRoot(4);

         assertEquals(2,calculator.getResult());

  }

  Timeout参数表明了你要设定的时间,单位为毫秒,因此1000就代表1秒。

三、 测试异常

经常会编写一些需要抛出异常的函数,如果一个函数应该抛出异常,但是它没抛出,当然是Bug。例如,我们写的计算器类有除法功能,如果除数是一个0,那么必然要抛出“除0异常”。因此,我们很有必要对这些进行测试。代码如下:

  @Test(expected = ArithmeticException.class)

  public void divideByZero() ...{

         calculator.divide(0);

  }

如上述代码所示,我们需要使用@Test标注的expected属性,将我们要检验的异常传递给他,这样JUnit框架就能自动帮我们检测是否抛出了我们指定的异常。

四、 Runner (运行器)

把测试代码提交给JUnit框架后,框架如何来运行代码呢?答案就是——Runner。在JUnit中有很多个 Runner,他们负责调用测试代码,每一个Runner都有各自的特殊功能,要根据需要选择不同的Runner来运行测试代码。JUnit中有一个默认Runner,如果没有指定,那么系统自动使用默认 Runner来运行你的代码。换句话说,下面两段代码含义是完全一样的:

import org.junit.runner.RunWith;

import org.junit.runners.Suite;

 

@RunWith(Suite.class)

@Suite.SuiteClasses({

MyTestCase.class, //测试类

PartSuite.class, //另一个测试套

})

public class AllTestCases {

}

  要想指定一个Runner,需要使用@RunWith标注,并且把你所指定的Runner作为参数传递给它。另外一个要注意的 是,@RunWith是用来修饰类的,而不是用来修饰函数的。只要对一个类指定了Runner,那么这个类中的所有函数都被这个Runner来调用。

五、 参数化测试

一个对考试分数进行评价的函数,返回值分别为“优秀,良好,一般,及格,不及格”,因此你在编写测试的时候,至少要写5个测试,把这5中情况都包含了,这 确实是一件很麻烦的事情。我们还使用我们先前的例子,测试一下“计算一个数的平方”这个函数,暂且分三类:正数、0、负数。测试代码如下:

importorg.junit.AfterClass;

  importorg.junit.Before;

  importorg.junit.BeforeClass;

  importorg.junit.Test;

  importstatic org.junit.Assert.*;

  public classAdvancedTest ...{

         private static Calculator calculator =new Calculator();

         @Before

         public void clearCalculator() ...{

                calculator.clear();

         }

         @Test

         public void square1() ...{

                calculator.square(2);

                assertEquals(4,calculator.getResult());

         }

         @Test

         public void square2() ...{

         calculator.square(0);

         assertEquals(0, calculator.getResult());

         }

         @Test

         public void square3() ...{

                calculator.square(-3);

                assertEquals(9,calculator.getResult());

         }

}

为了简化类似的测试,JUnit4提出了“参数化测试”的概念,只写一个测试函数,把这若干种情况作为参数传递进去,一次性的完成测试。代码如下:

  importstatic org.junit.Assert.assertEquals;

  importorg.junit.Test;

  importorg.junit.runner.RunWith;

  importorg.junit.runners.Parameterized;

  importorg.junit.runners.Parameterized.Parameters;

  importjava.util.Arrays;

  importjava.util.Collection;

  @RunWith(Parameterized.class)

  public classSquareTest{

  private static Calculator calculator = new Calculator();

         private int param;

         private int result;

         @Parameters

         public static Collection data(){

                return Arrays.asList(newObject[][]...{

                {2, 4},

                {0, 0},

                {-3, 9},

         });

  }

//构造函数,对变量进行初始化

六、断言和假设

断言:org.junit.Assert用于测试用例中,如果断言失败,用例即结束。

假设:org.junit.Assume用于在准备环境时判断环境是否符合要求,包括测试套的@BeforeClass,测试类的@BeforeClass,测试类的实例化,测试类的@Before。

如果假设失败,假设所处初始化代码方法立即结束,更深级别的后续工作也被忽略,相关测试用例被忽略,但与假设同级别的收尾工作还要继续执行。 
例如:如果在测试类的@BeforeClass中假设失败,该类的实例化及子级别将被忽略,@AfterClass会继续执行。
七、工程实例
     
 如果不想在单元测试中操作数据库中的数据。可以在测试方法上加 
     @Test

    @Transactional //单元测试  @Transactional 不会进行数据提交事物
    @Rollback(true) // 这个注释可以不用加(单元测试默认值)
    public void testaddUserPrivate()throws Exception{
        UserPrivate userPrivate=new UserPrivate();
        userPrivate.setTenantId("31");
        userPrivate.setMenuCode("7777888");
        userPrivate.setpUid("111222");
        userPrivate.setRoleId("3332277");
        userPrivate.setValue("99999");
        userPrivate.setUpdateDateTime(new Date());
        userPrivate.setCreateDateTime(new Date());
        int s=userPrivateService.addUserPrivate(userPrivate);
        Assert.assertEquals(1, s); // 这是断言的使用
    }
        

posted @ 2017-07-10 10:17 张钊钊 阅读(259) | 评论 (0)编辑 收藏

2017年7月4日

1: spring cloud eureka 如果出现某个应用实例 down(1), 说明 spring admin 健康检测没有通过导致 eureka 注册中心不会把这个实例从列表中删除掉。 这样所有使用这个实例的服务都会现404(前提是在应用中配置过spring admin); 2:spring admin 健康检测会检测*.properties里的所有连能性的配置(mysql,redis,短信服务,邮件服务),如果这些URL中有一个不通,则会导致eureka中出现, 这个实例down(1) 并且不会从列表中删除掉。 例: 应用中不使用reides,但是在pom.xml中引用reides的配置(只限于spring-boot redis配置) 这样spring admin 健康检测发现*.properties没配置redis,但是spring-boot-starter-data-redis 有默认配置(是localhost), 会导致检测不通过,eureka 显示状态为 down(1). 处理这样问题可以使用:http://eureakIP:port/health 如果没有问题会返回:{"description":"Spring Cloud Eureka Discovery Client","status":"UP"} 如果有问题会返回那个实例的检测什么配置项没有通过,只要修改后重启应用实例,这样eureka应用会显示UP(1);

posted @ 2017-07-04 18:30 张钊钊 阅读(3268) | 评论 (0)编辑 收藏

2017年6月20日

spring boot + mybatis 配置; MybatisConfiguration 类 加 @EnableTransactionManagement spring boot 启动类 加 @EnableTransactionManagement 其实都不要配置,spring boot 默认配置。 一般 @Transactional 东西 加在,service 层。 如果在 controller 呢? 官方解释如下: spring-framework-reference.pdf 文档上有这样一段话: only looks for @Transactional on beans in the same application context it is defined in. This means that, if you put in a WebApplicationContext for a DispatcherServlet, it only checks for @Transactional beans in your controllers, and not your services. 意思就是:只会查找和它在相同的应用上下文件中定义的bean上面的@Transactional注解,如果你把它放在Dispatcher的应用上下文中,它只检查控制器(Controller)上的@Transactional注解,而不是你services上的@Transactional注解。 所以,可以确定的是我们是可以在Controller上使用事务注解的,但是我们不推荐这样做(本人也从来没有这样做过)。

posted @ 2017-06-20 17:03 张钊钊 阅读(174) | 评论 (0)编辑 收藏

2017年3月28日

package stacktest;

/**
* @Author: zzz
* @CreateTime: 2017/3/28 10:52
* @Description: 队列特点(先进先出),链表实现的队列 在队头删除元素,在队尾插入元素。
* 这样才能满足队列的特性。
*/
public class MyQueue<T> {
private Node<T> front; //队列头,只能删除元素

    private Node<T> rear; //队列尾,只能用来插入入元素

    private int size;//队列的长度

/**
* 初始化队列
*/
public MyQueue() {
front = new Node<T>();
rear = front;
}

/**
* 链表的数据结构
*/
private class Node<T> {
public T data;
public Node<T> next;

public Node(T data, Node next) {
this.data = data;
this.next = next;
}

public Node(T data) {
this.data = data;
}

public Node() {
}
}

public void add(T data) {
//新插入的节点永远是尾节点,它的next 指向null(即没有后继节点)
        Node newNode = new Node(data, null);
//让尾节点next指向新节点
        rear.next = newNode;
rear = newNode;
size++;
}

public T pop() throws Exception {
if (size < 1) {
throw new Exception("错误,队列为空。");
}
Node<T> nextNode = front.next;

front.next = nextNode.next;
size--;
if (size < 1) {
rear = front;
size = 0;
}
return nextNode.data;

}

//取队首元素

public T peek() throws Exception {
if (size < 1){
throw new Exception("错误,队列为空。");
};
return front.next.data;

}
//返回队列的大小
public int getSize() {
return size;
}

//判断队列是否为空
public boolean isEmpty() {
return size == 0;
}

/**
* 遍历算法,移动front指针,直到front指针追上rear指针
*/
public void traverse(){
for(Node currentNode=front.next; currentNode!=null; currentNode=currentNode.next ){
System.out.println(currentNode.data);
}
}

public static void main(String[] args)throws Exception{
MyQueue<String> queue=new MyQueue<>();
for(int i=0;i<10;i++){
queue.add("88888-"+i);
}

/* for(int i=0;i<10;i++){
String s=queue.pop();
System.out.print(s+";");
}*/
queue.traverse();

}
}

posted @ 2017-03-28 15:48 张钊钊 阅读(236) | 评论 (0)编辑 收藏