posts - 22, comments - 32, trackbacks - 0, articles - 74
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理
springboot 3 web工程 多线程查询任务处理方式:

@Configuration
public class AsyncThreadPoolConfig {
// 获取 CPU 核心数
private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();

/**
* 自定义异步线程池 springboot 推荐返回 ThreadPoolTaskExecutor ,也可以返回JDK原生:Executor,ExcecutorService


*/
@Bean(name = "asyncExecutor") // 线程池名称,用于 @Async 注解指定
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程数(默认线程数):线程池创建时初始化的线程数量
executor.setCorePoolSize(CPU_CORES);
// 最大线程数:线程池最大可创建的线程数量
executor.setMaxPoolSize(CPU_CORES * 2);
// 队列容量:任务队列的最大长度,超过核心线程数的任务会先进入队列等待
executor.setQueueCapacity(10000);
// 线程空闲时间:当线程数超过核心线程数时,多余线程的最大空闲时间(单位:秒)
executor.setKeepAliveSeconds(60);
// 线程名称前缀:方便日志排查
executor.setThreadNamePrefix("async-task-");
// 拒绝策略:当任务队列满且线程数达到最大线程数时,如何处理新任务
// ThreadPoolExecutor.CALLER_RUNS_POLICY:由提交任务的线程执行(避免任务丢失,适合非核心任务)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
executor.initialize();
return executor;
}


List<CompletableFuture<List<GameUserView>>> futureList = new ArrayList<>();
for (int i = 0; i < userIdList.size(); i += 500) {
int end = Math.min(i + 500, userIdList.size());
List<Long> batchIds = new ArrayList<>(userIdList.subList(i, end));
// 指定自定义线程池
CompletableFuture<List<GameUserView>> future = CompletableFuture.supplyAsync(
() -> gameUserTelegramInfoMapper.getBotNameByUserIds(batchIds,YesOrNoEnum.YES.getCode()), asyncExecutor)//使用自定义线程池
.orTimeout(600, MILLISECONDS)//单次超时600ms
.exceptionally(e -> {
log.error("多线程批量查询用户信息完成!", e);
return new ArrayList<>();
});
futureList.add(future);
}
//2: 等待所有任务完成并汇总结果
CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
//3: 汇总结果
List<GameUserView> userViews =allFuture.thenApply(v ->
futureList.stream()
//单个任务异常兜底(exceptionally
.map(f -> f.exceptionally(e -> new ArrayList<>()).join())
.filter(result -> !result.isEmpty())
.flatMap(List::stream)
.collect(Collectors.toList())
).orTimeout(5, SECONDS)//4:总超时兜底,任务层面超时,替代try-catch捕获TimeoutException
.exceptionally(e ->{
//5:超时/异常时自动取消所有未完成任务,防止资源耗尽。
futureList.forEach(f -> {
if(!f.isDone()){
boolean cancel = f.cancel(true);
log.info("取消子任务:{}", cancel);
}
});
log.error("多线程批量查询用户信息完成!",e);
return new ArrayList<>();
}).join();//6:最终无参join(已通过orTimeout+exceptionally兜底,无阻塞风险)
log.info("多线程批量查询用户信息完成!结果数:{}",userViews.size());
return userViews;


只有注册用户登录后才能发表评论。


网站导航: