Java多线程?用CompletableFuture就够了
前言
在项目开发中,经常会遇到一个问题:在一个后端接口里,往往会进行多项耗时任务(相互之间独立,没有依赖)的操作,如:
- 需要从不同的外部接口获取不同的数据,做融合;
- 请求外部接口数据的同时,还需要读取数据库;
- 等等
如果在一个请求的主线程里,串行做这些任务操作,会导致响应时间的线性叠加,极有可能导致不符合要求,如图1:
图1
那么,对这些耗时任务进行并行操作,从而使得:响应时间 约等于 耗时最大的任务处理时间,这样可以大大降低系统的响应时间,如图2:
图2
Future
Future类型,其实就是一个未来任务的返回对象,或者说是子线程的返回对象(通过线程池方式分配子线程)
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
复制代码
可以看到,通过线程池的方式创建子线程后,executor.submit()返回的是一个Future对象,通过future.get()方法来获得该子任务的运行结果。需要注意的是,这个操作是阻塞的,也就是说,如果这个子任务没有运行结束,主线程会一直block在改行,直到子任务完成。
一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:
- get():获取结果(可能会等待)
- get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
- cancel(boolean mayInterruptIfRunning):取消当前任务;
- isDone():判断任务是否已完成。
CompletableFuture
当需要判断图2中的所有task是否完成时,如果采用Future,则需要:
- 调用future.get()获取运行结果,
- 或者轮询future.isDone()方法直到返回true。
无论哪种方法,都是在主线程里调用,且会阻塞主线程。
以上痛点,从Java 8开始引入了CompletableFuture方法。主要新增的功能有:
- thenAccept(): 当task正常完成后,回调调用.thenAccept()方法
- exceptionally(): 当task出现异常时,回调调用.exceptionally()方法
- anyOf(): 当所有的task中,只要有一个task完成,则主线程继续往下走,可以使用.anyOf()方法
- allOf(): 所有的task均完成后,则主线程继续往下走
- supplyAsync(): 异步执行,有返回值
- runAsync(): 异步执行,无返回值
针对图2,需要所有task都完成后,再执行后续操作,就可以用allOf()方法:
CompletableFuture.allOf(task1, task2, ..., taskn).join();
注意:CompletableFuture的命名规则:
- xxx():表示该方法将继续在已有的线程中执行;
- xxxAsync():表示将异步在线程池中执行,即可以异步执行。
基于CompletableFuture+线程池的实现
线程池配置类
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
异步服务与服务实现
public interface AsyncService {
@Async("asyncExecutor")
CompletableFuture<String> getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType);
}
@Service
public class AsyncServiceImpl implements AsyncService {
@Autowired
CustomProps customProps;
@Autowired
RestTemplate restTemplate;
@Override
public CompletableFuture<String> getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType) {
return CompletableFuture
.completedFuture(
FactoryUtil
.createFactory(customProps, null, restTemplate)
.obtainData(queryTrainInfoDetailReqDTOWithType.setQueryType(queryType), String.class)
);
}
}
业务代码中调用异步服务接口
...
@Autowired
AsyncService asyncService;
@Override
public ReturnData qTrainInfoDetail(QueryTrainInfoDetailReqDTO queryTrainInfoDetailReqDTO) {
QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType = new QueryTrainInfoDetailReqDTOWithType().setQueryTrainInfoDetailReqDTO(queryTrainInfoDetailReqDTO);
CompletableFuture<String> fromCpFirstReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 1);
CompletableFuture<String> fromCpSecondReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 2);
CompletableFuture.allOf(fromCpFirstReq, fromCpSecondReq).join(); //阻塞直到当第一次请求和第二次请求都完成
}
...