简介
本文主要围绕以下关键字进行:
- Executors
- ThreadPoolExecutor
- ThreadLocal
- Springboot Async
- CompletableFuture
场景
在我们使用Springboot编程的时候,假如我们有一个这样的场景:
- http接口,传参是一个很大的文件
- 我们需要调用第三方接口处理这个文件的每一条数据,最后将数据写入到一个excel中
- 尽可能加快速度
首先想到的是Springboot的异步机制,在controller调用一个异步的service方法,在这个方法中进行文件处理和写入。问题是:文件处理想要快,需要让多线程处理,但是想要拿到每一个线程的结果,在最后综合数据写入excel又很麻烦,于是采用CompletableFuture天然支持分步处理。
最后的结论是:
- 使用Springboot的异步注解在分线程中执行,保证http迅速返回“正在处理”
- 在分线程中使用CompletableFuture多线程处理文件,处理后将结果写入excel
其中还有一些知识点如:
- ForkJoinPool的使用
- stream和parallelStream的区别
- forEach和forEachOrdered的区别
- threadlocal在单例中的特殊性
这些知识点我将在之后的文章中继续学习
Executors
关于线程池,通过Executors创建线程池是最常见的方法:
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
创建线程池的时候我们通常的代码是这样:
1 | ExecutorService executorService = Executors.newFixedThreadPool(10); |
如果使用阿里巴巴的Java规范检查工具的话,我们会发现工具不推荐我们这样使用线程池。因为这几种线程池有一些弊端:
**1)newFixedThreadPool和newSingleThreadExecutor:
**主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
通过源代码我们可以看出来这两种线程池创建了最大值为Integer.MAX_VALUE的LinkedBlockingQueue线程阻塞队列,当添加任务的速度大于处理的速度的时候,任务就会堆积到阻塞队列里面等待处理,这样会消耗很大的内存,引发OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
1 | public static ExecutorService newCachedThreadPool() { |
这两种线程池,通过源码我们可以看到很明显最大线程数量为Integer.MAX_VALUE,空闲线程存活时间为0,当添加任务的速度大于线程池处理的任务速度时,就会创建大量的线程,消耗大量资源,从而导致OOM。
虽然这些创建方式都不好,但是我们很轻易可以看出,这些线程池只不过是对ThreadPoolExecutor进行了一些参数调整,那么我们就应该根据自己的使用场景创建自定义的线程池。
ThreadPoolExecutor
1 | public ThreadPoolExecutor(int corePoolSize, |
这些参数的意义如下:
corePoolSize 指定了线程池里的线程数量,核心线程池大小
maximumPoolSize 指定了线程池里的最大线程数量
keepAliveTime 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁。
unit 时间单位。TimeUnit
workQueue 任务队列,用于存放提交但是尚未被执行的任务。我们可以选择如下几种:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
- PriorityBlockingQueue:具有优先级别的阻塞队列。
threadFactory 线程工厂,用于创建线程,一般可以用默认的,但是阿里巴巴的编码规范要求我们最好自定义一个,指定线程名称,方便日志查询。
handler 拒绝策略,所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
这也是使用Executors创建线程池无法实现的一点,我们通过手动创建线程池,就可以指定这些策略,如果我们关注每一个任务的执行情况,不允许丢失,我们就可以采用CallerRunsPolicy的策略,如果调用者是主线程,就是由主线程来进行执行。

模板代码为:
1 | ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); |
SpringBoot中Async异步与线程池
异步调用对应的是同步调用,同步调用可以理解为按照定义的顺序依次执行,有序性;异步调用在执行的时候不需要等待上一个指令调用结束就可以继续执行。比如这样一个场景:我们通过一个Http请求,希望将一个文件中的几百万条数据处理并导入数据库,如果同步执行,可能就会页面卡住甚至超时,我们希望业务逻辑正常执行的同时,返回SUCCESS,后序我们可以通过如轮询进度的方式来获取处理的最新状态。这时候我们就需要进行异步操作。
在Springboot中使用异步操作,我们可以使用上述的线程池进行处理,但是如果我们多个类中想使用同一个线程池,达到复用,亦或者我们想统一定义线程池,而不是用的时候临时创建一个,达到更加优美的编码方式,我们就需要使用Springboot的一些注解来解决。
全局启用异步机制我们需要在启动类上进行注释@EnableAsync
,如果我们并不关注线程池的具体内容以及名字,我们可以直接对方法使用@Async
启用异步机制,如果我们希望使用自定义的线程池,就需要写一个配置类:
1 |
|
根据上面的代码,我们就为Springboot容器中新增了一个名字为taskFacade
的bean
,当我们想用的时候只需要在方法头添加@Async(value = "taskFacade")
的注释,这样调用这个方法的调用方在执行到这个方法的时候就会开启异步。如果你对这个异步操作的返回值感兴趣,也可以将返回值通过Future
包装起来,再进行读取,但是这样的话,方法虽然在多线程中执行,但是会阻塞等待返回结果。原因是Future
的get
方法会阻塞调用方。
1 | // Service 伪代码 |
CompletableFuture
关于这个的详细使用,可以参考:https://www.jianshu.com/p/6bac52527ca4,为了防止链接失效,我copy到我这边。
1 runAsync 和 supplyAsync方法
CompletableFuture 提供了四个静态方法来创建一个异步操作。
1 | public static CompletableFuture<Void> runAsync(Runnable runnable) |
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync方法不支持返回值。
- supplyAsync可以支持返回值。
示例
1 | //无返回值 |
2 计算结果完成时的回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
1 | public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) |
可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例
1 | public static void whenComplete() throws Exception { |
3 thenApply 方法
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
1 | public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) |
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
示例
1 | private static void thenApply() throws Exception { |
第二个任务依赖第一个任务的结果。
4 handle 方法
handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
1 | public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); |
示例
1 | public static void handle() throws Exception{ |
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
5 thenAccept 消费处理结果
接收任务的处理结果,并消费处理,无返回结果。
1 | public CompletionStage<Void> thenAccept(Consumer<? super T> action); |
示例
1 | public static void thenAccept() throws Exception{ |
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。
6 thenRun 方法
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
1 | public CompletionStage<Void> thenRun(Runnable action); |
示例
1 | public static void thenRun() throws Exception{ |
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。
7 thenCombine 合并任务
thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
1 | public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); |
示例
1 | private static void thenCombine() throws Exception { |
8 thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
1 | public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); |
示例
1 | private static void thenAcceptBoth() throws Exception { |
9 applyToEither 方法
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
1 | public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); |
示例
1 | private static void applyToEither() throws Exception { |
10 acceptEither 方法
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
1 | public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); |
示例
1 | private static void acceptEither() throws Exception { |
11 runAfterEither 方法
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
1 | public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); |
示例
1 | private static void runAfterEither() throws Exception { |
12 runAfterBoth
两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
1 | public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); |
示例
1 | private static void runAfterBoth() throws Exception { |
13 thenCompose 方法
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
1 | public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); |
示例
1 | private static void thenCompose() throws Exception { |
14 allOf
allOf方法将对你多个操作判断是否结束,比如我们创建了一个CompletableFuture数组,有多个任务,我们需要在所有任务结束的时候执行任务,就可以使用这个
示例
伪代码
1 | List<CompletableFuture[]> futures = new ArrayList<>(); |
- 本文作者: October
- 本文链接: http://www.octber.xyz/2020/10/12/从阿里不推荐使用Executors到CompletableFuture的使用/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!