posts - 22, comments - 32, trackbacks - 0, articles - 71
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

如何在 Spring 使用@Async,@EnableAsync注释进行异步处理:

异步处理适用那些与业务逻辑(横切关注点)不直接相关或者不作为其他业务逻辑输入的部分,也可在分布式系统中解耦。

*译注:横切关注点(cross-cutting concerns)指一些具有横越多个模块的行为,使用传统的软件开发方法不能够达到有效模块化的一类特殊关注点。*

Spring 中,`@Async`注解可以标记异步操作。然而,使用`@Async`时有一些限制,仅仅把它加在方法上并不能确保方法会在独立的线程中执行。如果你只是偶尔用到 `@Async`,需要格外当心。

1. @Async 的工作机制

首先为方法添加 `Async` 注解。接着,Spring 会基于 `proxyTargetClass` 属性,为包含 `Async` 定义的对象创建代理(JDK Proxy/CGlib)。
最后,Spring 会尝试搜索与当前上下文相关的线程池,把该方法作为独立的执行路径提交。确切地说,Spring 会搜索唯一的 `TaskExecutor` bean 或者名为 `taskExecutor` 的 bean。如果找不到,则使用默认的 `SimpleAsyncTaskExecutor`。

要完成上面的过程,使用中需要注意几个限制,否则会出现 `Async` 不起作用的情况。

2. @Async 的限制

1. 必须在标记 `@ComponentScan` 或 `@configuration` 的类中使用 `@Async`。

未来实现类获取异步处理结果

如果想要获取异步处理的结果,可以通过未来接口的实现类调用得到()方法获得。
未来接口的常见实现类有FutureTask。
在SpringBoot中,一般用AsyncResult作为异步结果。

future 缺点:

使用Future获得初始化执行结果时,可以使用初始化附加方法get(),或者替换看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始约会了CompletableFuture,它针对Future了改进之处,可以针对某些对象,当初始化任务完成或发生异常时,自动调用对象的替代方法。下面会详细解释:

示例:spring boot工程初步处理业务类
1.AsyncTaskManager
@Service
@EnableAsync
public class AsyncTaskManager {
/**
* 这个业务注入的类
*/
@Autowired
private MessageDao messageDao;

/**
* @Async注解表示异步,后面的参数对应于线程池配置类ExecutorConfig中的方法名asyncServiceExecutor()
* 如果不写后面的参数,直接使用@Async注解,则是使用默认的线程池
* Future<String>为异步返回的结果。可以通过get()方法获取结果。
* @param s
* @throws Exception
*/
@Async(value = "asyncTaskExecutor")
public void transTask(String s) throws Exception {
messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
TimeUnit.SECONDS.sleep(6);
}

/**
* 异步调用,有返回值,必须是Future类型,不然报错
* 如果不写后面的参数,直接用@Async,则是使用默认的线程池。
* 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public Future<String> transTaskForFuture(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}

/**
* 基于回调的listenableFuture比上种子线程直接返回Future优质是,主线程不用等待,任务在完成后会自动执行回调代码。
* 因此在调用时要注册回调代码,包括成功回调和失败回调
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public ListenableFuture<String> transTaskForCallback(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}

/**
* 从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
* 最主要是可以提供复杂的
* CompletableFuture可以指定异步处理流程:
* thenAccept()处理正常结果;
* exceptional()处理异常结果;
* thenApplyAsync()用于串行化另一个CompletableFuture;
* anyOf()和allOf()用于并行化多个CompletableFuture。
* 详解请看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
Object result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
Object result=null;
try {
result=messageDao.getUserCode(s);
System.out.println(Thread.currentThread().getName()+" 子线程开始执行...result=" + result);
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
}
Dao层业务类:


@Repository
public class MessageDao {

public String getMessage(String s){
return s;
}

public String callBackMessage(String s){
return "这是注册回调返回结果s="+s;
}

public String getUserCode(int id){
return "000"+id;
}
public String getUserName(String code){
return "李四";
}
public String getUserDepartment(String code){
return "技术开发部";
}
}
 

线程池ThreadPoolTask​​Executor

SpringBoot中的线程池一般用ThreadPoolTask​​Executor类
。ThreadPoolTask​​Executor继承关系如下:

ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor

关系结构图为:

2.自定义线程池配置如下:

@Configuration
public class AsyncTaskConfig {
/**
* IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
* CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度自测而定
*/
@Bean(name = "asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//1: 核心线程数目
executor.setCorePoolSize(4);
//2: 指定最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
//3: 队列中最大的数目
executor.setQueueCapacity(200);
//4: 线程名称前缀
executor.setThreadNamePrefix("LocustTask-");
//5:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy: 会在execute 方法的调用线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务
// AbortPolicy: 抛出java.util.concurrent.RejectedExecutionException异常
// DiscardOldestPolicy: 抛弃旧的任务
// DiscardPolicy: 抛弃当前的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//6: 线程空闲后的最大存活时间(默认值 60),当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
//7:线程空闲时间,当线程空闲时间达到keepAliveSeconds(秒)时,线程会退出,直到线程数量等于corePoolSize,如果allowCoreThreadTimeout=true,则会直到线程数量等于0
executor.setAllowCoreThreadTimeOut(false);
executor.initialize();
return executor;
}
}

@EnableAsync开启初步

@EnableAsync表示开启初始,可以放在@Controller层上方,也可以放在Applicationclass的上方,也可以直接放在业务类上例AsyncTaskManager

@Controller
@EnableAsync
public class XXXController {
    @Autowired
    private AsyncTaskManager asyncTaskManager;

    @GetMapping("/user/getList")
    @ResponseBody
    public String getUserData(){
        return asyncTaskManager.getAsyncResult();
    }
}

Junint 4单元测试类如下
AsyncTaskTest:

  1 public class AsyncTaskTest extends BaseTest {
  2 
  3     @Autowired
  4     private AsyncTaskManager asyncTaskManager;
  5 
  6     @Autowired
  7     private MessageDao messageDao;
  8 
  9     /**
 10      * 单无测试方法,没有办法测试多线程池郊果,因为单测试方法运行完后,整个JVM进程会水销毁,所有测试只能启动tomcat进行测试。
 11      *
 12      * @throws Exception
 13      */
 14     @Test
 15     public void testAsyncTask() throws Exception {
 16         for (int i = 1; i <= 10; i++) {
 17             asyncTaskManager.transTask("2222");
 18         }
 19     }
 20 
 21     /**
 22      * 主线等待子线完成后,获取返回结果
 23      *
 24      * @throws Exception
 25      */
 26     @Test
 27     public void testAsyncTaskForFuture() throws Exception {
 28         Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
 29         while (true) {
 30             if (future.isDone() && !future.isCancelled()) {
 31                 System.out.println(Thread.currentThread().getName() + "子线程执行完毕");
 32                 break;
 33             } else {
 34                 Thread.sleep(2000);
 35                 System.out.println("主线程" + Thread.currentThread().getName() + "待子线程执行完毕");
 36             }
 37         }
 38     }
 39 
 40     /**
 41      * 在调用时候,主线不用等待,可以注册回调类和方法进行
 42      *
 43      * @throws Exception
 44      */
 45     @Test
 46     public void testAsyncTaskForCallback() throws Exception {
 47         // 在主要线程设置 独有上下文变量
 48         ThreadContext.setUserId(222222222222L);
 49         ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
 50         future.addCallback(
 51             successCallback -> {
 52                 try {
 53                     String s = future.get(2L, TimeUnit.SECONDS);
 54                     String result = messageDao.callBackMessage(s);
 55                     //在线程池中子线程获取父线程设置变量
 56                     System.out.println("回调结果:" + result + ";parent thread value:" + ThreadContext.getUserId());
 57                 } catch (Exception e) {
 58                     e.printStackTrace();
 59                 }
 60             },
 61             FailureCallback -> {
 62                 System.out.println("子线程执行失败.");
 63             }
 64         );
 65         Thread.sleep(20000);
 66     }
 67 
 68     /**
 69      * 验证多线程常用的场景比如有: 4个任务需要4个线程去执行,同时成功后才执行相应操作
 70      * A,B,C,D 4 个任务
 71      * CompletableFuture.allOf()方法
 72      * 由于 allOf 聚合了多个 CompletableFuture 实例,所以它是没有返回值的。这也是它的一个缺点
 73      * @throws Exception
 74      */
 75     @Test
 76     public void testAsyncTaskForAllOf() throws Exception {
 77         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
 78         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
 79         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
 80         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
 81         // 1: 把4个线程返回 completableFuture_3 组合成一个
 82         CompletableFuture alloff=CompletableFuture.allOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
 83         // 2:如果没有后续的动作,可以直接 join()和get() 执行结果,主线程一直被阻塞,一直等到用户线程返回,如果不使用join 和get 主线程不会被阻塞
 84         // CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。
 85         String result= (String)alloff.join();
 86         System.out.println("所有任务同时完成"+result);
 87         Thread.sleep(20000);
 88     }
 89 
 90     /**
 91      * 验证多线程常用的场景比如有: 4个任务需要4个线程去执行,同时成功后才执行相应操作
 92      * A,B,C,D 4 个任务
 93      * CompletableFuture.anyOf()方法 其中有一个执行成功,就算完成
 94      *
 95      * @throws Exception
 96      */
 97     @Test
 98     public void testAsyncTaskForAnyOf() throws Exception {
 99         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
100         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
101         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
102         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
103         CompletableFuture anyOf=CompletableFuture.anyOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
104         //这里利用Jdk8函数式接口lambda表达式来实现匿名内部类,?是泛型通配符
105         Object  s=anyOf.get(1500,TimeUnit.MILLISECONDS);
106         System.out.println(" anyof 输出结果 s="+s);
107         Thread.sleep(20000);
108     }
109 
110     /**
111      * 验证多线程常用的场景比如有: 3个任务需要3个线程去执行
112      * 根据 A 方法 异步返回结果,分别去异步执行 查询员工名称和部门,然后返回结果
113      * @throws Exception
114      */
115     @Test
116     public void testAsyncTaskForCompletableFuture2() throws Exception {
117         CompletableFuture<Object> completableFuture_A = asyncTaskManager.task1("task-1");
118         // 1: 如果A成功后返回结果,作为B的入参去执行(thenApply 方法 都是在自己当前线程中执行)
119         CompletableFuture<Object> fetchNameFuture_B = completableFuture_A.thenApplyAsync((result) ->{
120             return messageDao.getUserName((String)result);
121             }
122         );
123         //2:B 执行成功后结果作为入参,执行C,然后返回
124         CompletableFuture<Object> fetchNameFuture_C=fetchNameFuture_B.thenApplyAsync((result)->{
125             return messageDao.getUserDepartment((String)result);
126         });
127         // join()会一直程序会一直block
128         System.out.println(fetchNameFuture_C.join());
129         // 手动完成一个complete,会立即执行,可以看到future调用complete(T t)会立即执行。但是complete(T t)只能调用一次,后续的重复调用会失效
130         //future已经执行完毕能够返回结果,此时再调用complete(T t)则会无效
131         System.out.println(fetchNameFuture_B.complete("complete"));
132         Thread.sleep(90000);
133     }
134 
135     /**
136      * 这个方法验证把两个异步线程的结果聚合起来返回
137      * @throws Exception
138      */
139     @Test
140     public void testAsyncTaskForThenCombine() throws Exception {
141         //1: 第一个查询查询员工消息,
142         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
143         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
144 
145         CompletableFuture<Object> future=futureA.thenCombine(futureB,(resultA,resultB)->{
146             return resultA+";"+resultB;
147         });
148         Object s=future.join();
149         System.out.println(" result:"+ s);
150         // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
151         Thread.sleep(20000);
152     }
153     /**
154      * 这个方法验证thenAcceptBoth接口是指,接受两个异步线程,等待两个完成后,做下一步动作,它的第二个参数是一个消费型的函数接口
155      *  BiConsumer 这就标明它可以对上边传入的异步线程的结果做处理(改变传入线程结果的值),并且没有返回值
156      * @throws Exception
157      */
158     @Test
159     public void testAsyncTaskForThenAcceptBoth() throws Exception {
160         //1: 第一个查询查询员工消息
161         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
162         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
163 
164         CompletableFuture<Void> allResult=futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
165             String result=messageDao.getUserDepartment(resultA+";"+resultB);
166             System.out.println("======result="+result);
167         });
168         // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
169         Thread.sleep(20000);
170     }
171 
172     /**
173      * 验证futureA,futureB 两个异步线程,其中一个返回,就返回。
174      * @throws Exception
175      */
176 
177     @Test
178     public void testAsyncTaskForAcceptEither() throws Exception {
179         //1: 第一个查询查询员工消息
180         CompletableFuture <Object> futureA = asyncTaskManager.task1(“ task-1”);
181          CompletableFuture <Object> futureB = asyncTaskManager.task2(“ task-2”);
182          futureA.acceptEither(futureB,(result)-> {
183              字符串s = messageDao.getUserName(result +“”);
184              System.out.println(“它的一个串行返回返回的结果:” + s);
185          });
186          //  主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:187
         线程。睡眠(20000);
188      }
189  }
190 


只有注册用户登录后才能发表评论。


网站导航: