异步回调 CompletableFuture
CompletableFuture
在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调的方式在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
Future
接口不是通过回调的方式,而是使用 get()
方法阻塞判断业务是否执行完毕,所以本质还是同步
CompletableFuture
接口通过回调的方式,并不会阻塞等待业务执行完毕,而是在业务执行完毕后回调通知主线程运行结果,这才是真正的异步。
CompletableFuture
实现了 Future
, CompletionStage
接口,实现了 Future
接口就可以兼容现在有线程池框架,而 CompletionStage
接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture
类
runAsync()
:调用没有返回值方法,主线程调用 get()
方法时会阻塞(这种方式和 Future
相似,本质还是同步)
supplyAsync()
:调用有返回值方法(回调的方式得到运行结果,程序不会阻塞,真正的异步)
具体案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class CompletableFutureDemo { public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()+" : CompletableFuture1"); }); completableFuture1.get();
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+" : CompletableFuture2"); int i = 10/0; return 1024; }); completableFuture2.whenComplete((t,u)->{ System.out.println("------t="+t); System.out.println("------u="+u); }).exceptionally(f -> { System.out.println("------exception="+f.getMessage); return 4444;}) .get(); } }
|
whenComplete()
方法的源码为:
1 2 3 4
| public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); }
|
其中,t
为返回结果,u
为异常信息(没有异常时为null)
Future 与 CompletableFuture
对比这两种方法,一个为同步一个为异步。
Futrue
在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue
,在 Future
里面有 isDone()
方法来判断任务是否处理结束,还有 get()
方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
与 CompletableFuture
相比, Future
的缺点:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过 Future
的 get()
方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future
不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于 Future
的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline
调用,这在 Future
中是没法实现的。
(4)不支持多个 Future 合并
比如我们有 10 个 Future
并行执行,我们想在所有的 Future
运行完毕之后,执行某些函数,是没法通过 Future
实现的。
(5)不支持异常处理
Future
的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的
创建 CompletableFuture 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); }
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
|
以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为线程池执行异步代码。
runAsync
方法用于没有返回值的任务,它以 Runnable
函数式接口类型为参数,所以CompletableFuture
的计算结果为空。
supplyAsync
方法用于有返回值的任务,以Supplier<U>
函数式接口类型为参数,CompletableFuture
的计算结果类型为 U
。
计算结果完成时的处理
当 CompletableFuture
的计算结果完成,或者抛出异常的时候,有如下四个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); } public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); }
|
- 方法不以
Async
结尾,意味着Action
使用相同的线程执行,而Async
可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
exceptionally
方法返回一个新的CompletableFuture
,当原始的CompletableFuture
抛出异常的时候,就会触发这个CompletableFuture
的计算,调用function计算值,否则如果原始的CompletableFuture
正常计算完后,这个新的CompletableFuture
也计算完成,它的值和原始的CompletableFuture
的计算的值相同。
除了上述四个方法之外,一组handle
方法也可用于处理计算结果。当原先的CompletableFuture
的值计算完成或者抛出异常的时候,会触发这个CompletableFuture
对象的计算,结果由BiFunction
参数计算而得。因此这组方法兼有whenComplete
和转换的两个功能。
1 2 3 4 5
| public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor);
|
进行转换
我们可以将操作串联起来,或者将CompletableFuture
组合起来。关键的入参只有一个Function
,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
1 2 3 4 5
| public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
|
函数的功能是当原来的CompletableFuture
计算完后,将结果传递给函数fn,将fn的结果作为新的CompletableFuture
计算结果。因此它的功能相当于将CompletableFuture<T>
转换成CompletableFuture<U>
。
需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。
消费
上面的方法是当计算完成的时候,会生成新的计算结果 (thenApply, handle),或者返回同样的计算结果whenComplete
。我们可以在每个CompletableFuture
上注册一个操作,该操作会在 CompletableFuture
完成执行后调用它。
1 2 3 4 5
| public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
|
CompletableFuture
通过 thenAccept
方法提供了这一功能,它接收CompletableFuture
执行完毕后的返回值做参数,只对结果执行Action,而不返回新的计算值,因此计算值为空:
下面一组方法当计算完成的时候会执行一个Runnable
,与thenAccept
不同,Runnable
并不使用CompletableFuture
计算的结果。
1 2 3 4 5
| public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
|
组合
通常,我们会有多个需要独立运行但又有所依赖的的任务。比如先等用于的订单处理完毕然后才发送邮件通知客户。
thenCompose
方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。你可以创建两个CompletableFutures
对象,对第一个 CompletableFuture
对象调用thenCompose
,并向其传递一个函数。当第一个CompletableFuture
执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个 CompletableFuture
的返回做输入计算出的第二个 CompletableFuture
对象。
1 2 3 4 5
| public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor);
|
多任务组合
allOf
:等待所有任务完成
anyOf
:只要有一个任务完成就可以
1 2 3
| CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
CompletableFuture<Void> anyOf = CompletableFuture.anyOf(future01, future02, future03);
|
异步编排案例
本章介绍使用异步编排的具体案例,具体业务见文章 【Project】云商城
业务介绍
需求分析:通过 skuId
查询出商品的相关信息,图片、标题、价格,属性对应版本等等。在点击商城项目中的详情页后,前端将发出请求查询指定 skuId
的各种商品信息,包括:
- 当前 SKU 基本信息
- 当前 SKU 的图片信息
- 当前 SKU 所属的 SPU 的所有销售属性组合,展示在界面上
- 当前 KUS 所属的 SPU 的介绍信息
- SPU 的规格参数(基本属性)信息
其中,查询 3/4/5 前需要先完成查询 1,因为三者都需要 SKU 的信息,同时三者之间是没有依赖关系的,完全可以并行查询节省时间。查询 2 则和其余的四条查询没有任何依赖关系,也可以并行查询。
首先是没有异步的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Override public SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo();
SkuInfoEntity skuInfoEntity = this.getById(skuId); skuItemVo.setInfo(skuInfoEntity);
Long catalogId = skuInfoEntity.getCatalogId(); Long spuId = skuInfoEntity.getSpuId();
List<SkuImagesEntity> imagesEntities = imagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities);
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(spuId); skuItemVo.setSaleAttr(saleAttrVos);
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId); skuItemVo.setDesc(spuInfoDescEntity);
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(spuId, catalogId); skuItemVo.setGroupAttrs(attrGroupVos);
return skuItemVo; }
|
该版本的所有任务都是串行的,整条查询的耗时为五个查询的耗时累计和。下面将对该版本进行优化,使用异步编排优化查询效率。
自定义线程池
首先自定义线程池,并注入到 Spring 容器中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Configuration public class MyThreadConfig {
@Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor(pool.getMaxSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); } }
|
配置文件绑定类:
1 2 3 4 5 6 7 8 9
| @Data @Component @ConfigurationProperties(prefix = "yunmall.thread") public class ThreadPoolConfigProperties { private Integer coreSize; private Integer maxSize; private Integer keepAliveTime; }
|
配置文件:
1 2 3 4 5 6
| yunmall: thread: core-size: 50 max-size: 200 keep-alive-time: 10
|
开启配置文件自动提示自定义的前缀:
1 2 3 4 5 6
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
异步编排版本
使用 CompletableFuture
进行异步编排,将查询 3/4/5 作为查询 1 的子查询任务,改造后的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Override public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException { SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> { SkuInfoEntity skuInfoEntity = this.getById(skuId); skuItemVo.setInfo(skuInfoEntity); return skuInfoEntity; }, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(info -> { List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(info.getSpuId()); skuItemVo.setSaleAttr(saleAttrVos); }, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(info -> { SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(info.getSpuId()); skuItemVo.setDesc(spuInfoDescEntity); }, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(info -> { List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(info.getSpuId(), info.getCatalogId()); skuItemVo.setGroupAttrs(attrGroupVos); }, executor);
CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> { List<SkuImagesEntity> imagesEntities = imagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); }, executor);
CompletableFuture.allOf(saleAttrFuture, descFuture, baseAttrFuture, imagesFuture).get();
return skuItemVo; }
|