Java中CompletableFuture 的详细介绍

网友投稿 297 2022-07-29


目录1.概述1.0 创建 CompletableFuture 的对象的工厂方法1.1 non-async 和 async 区别1.1.1 non-async 示例:注册 action 的时候任务可能已经结束1.1.2 non-async 示例:注册 action 的时候任务未完成1.2 Run 类方法1.3 Accept 类方法1.4 Apply 类方法2 单个任务执行完成后执行一个动作(action)2.0 示例 exceptionally3 两个任务执行编排4 多个任务执行编排5 CompletableFuture 其他方法

1.概述

1.0 创建 CompletableFuture 的对象的工厂方法

staticCompletableFuture runAsync(Runnablerunnable)

staticCompletableFuture runAsync(Runnable runnable, Executor executor)

static CompletableFuture supplyAsync(Supplier supplier)

static CompletableFuture supplyAsync(Supplier

runAsync 的参数是 Runnable, 返回值是 CompletableFuture, 意思是工厂方法创建的 CompletableFuture 对象封装的任务没有返回值。

例如:

CompletableFuture run = CompletableFuture.runAsync(()-> System.out.println("hello"));

而 suppyAsync 参数类型是 Supplier,返回值是CompletableFuture , 意思是任务不接受参数,但是会返回结果。

CompletableFuture supply = CompletableFuture.supplyAsync(() -> {

System.out.println("Hello");

return "hello world!"";

});

System.out.println(supply.get()); //hello world!"

所以如果任务需要返回结果,那么应该选择 suppyAsync;否则可以选择 runAsync。

1.1 non-async 和 async 区别

public CompletionStage thenRun(Runnable action);

public CompletionStage thenRunAsync(Runnable action);

public CompletionStage thenRunAsync(Runnable action,Executor executor);

CompletableFuture 中有众多类似这样的方法,http://那么 non-async 和 async 和版本的方法究竟有什么区别呢? 参考官方文档的描述:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

翻译:传递给 non-async 方法作为参数的函数(action)可能会在完成当前的 CompletableFuture 的线程中执行,也可能会在方法的调用者线程中执行。

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

翻译:所有没有Executor 参数的 async 方法都在 ForkJoinPool.commonPool()线程池中执行(除非不支持最小并发度为2,这种情况下,每个任务会创建一个新线程去执行)。为了简化监控、调试和追踪,所有生成的异步任务都是接口 CompletableFuture.AsynchronousCompletionTask的实例。

从上面这两段官方描述看。async 类方法比较容易理解,就是 CompletableFuture 实例的任务执行完成后,会将 action 提交到缺省的异步线程池 ForkJoinPool.commonPool(),或者 async 类方法参数Executor executor 指定的线程池中执行。

而对于 non-async 的描述则有点不明确。action 可能会在完成 CompletableFuture 实例任务的线程中执行,也可能会在调用 thenRun(编排任务完成后执行 action 的系列方法) 方法的线程中执行。这个主要是看调用 thenRun 的时候,CompletableFuture 实例的任务是否已经完成。如果没有完成,那么action 会在完成任务的线程中执行。如果任务已经完成,则 action 会在调用thenAccept 等注册方法的线程中执行。

1.1.1 non-async 示例:注册 action 的时候任务可能已经结束

@Test

void testThenRunWithTaskCompleted() throws Exception{

CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {

@Override

public Integer get() {

System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" );

return 1;

}

}).thenRun(() -> {

System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" );

});

future.get();

}

运行结果:

[ForkJoinPool.commonPool-worker-1]  in task[main]  in action

分析: 任务通过 CompletableFuture.supplyAsync 提交后,会以异步的方式在 ForkJoinPool.commonPool() 线程池中运行。这时候有两个线程,一个是[ForkJoinPool.commonPool-worker-1] 执行 Supplier.get 方法;一个是[main] 主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action。由于Supplier.get 非常简单,几乎立刻返回。所以很大概率在主线程调用 thenRun 编排任务完成后执行 action的时候,异步任务已经完成,所以 action 在主线程中执行了。注:在笔者的电脑上几乎100% 是这样的调度方式。

1.1.2 non-async 示例:注册 action 的时候任务未完成

@Test

void testThenRunWithTaskUncompleted() throws Exception{

CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {

@Override

public Integer get() {

System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" );

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return new Random().nextInt(10);

}

}).thenRun(() -> {

System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" );

});

future.get();

}

运行结果:

[ForkJoinPool.commonPool-worker-1]  in task[ForkJoinPool.commonPool-worker-1]  in action

分析:在 Supplier.get 加入 sleep,延迟任务结束。主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action 的时候,任务没有结束。所以这个action 在完成任务的线程中jnotzal执行。

1.2 Run 类方法

Run 类方法指 thenRun、runAfterBoth、runAfterEither 等。Run 类方法的动作参数都是Runable action。例如 thenRun(Runnable action)。这就意味着这个 action 不关心任务的结果,action 本身也没有返回值。只是为了实现在完成任务后执行一个动作的目的。

1.3 Accept 类方法

Accept 类方法指 thenAccept、runAcceptBoth、runAcceptEither 等。Accept 类方法的动作参数都是Consumer super T> action。例如 thenAccept(Consumer super T> action)。如方法名和参数所示,action 是接受并消费任务结果的消费者,action 没有返回值。

1.4 Apply 类方法

Apply 类方法指 thenApply、applyToEither 等。Apply 类方法的动作参数都是Function super T,? extends U> fn。例如 thenApply(Function super T,? extends U> fn)。如方法名和参数所示,action 将任务结果应用函数参数 fn 做转换,返回转换后的结果,类似于 stream 中的 map。

2 单个任务执行完成后执行一个动作(action)

方法说明public CompletableFuture thenRun(Runnable action)thenRunAsync(Runnable action)thenRunAsync(Runnable action, Executor executor)任务完成后执行 action,action 中不关心任务的结果,action 也没有返回值public CompletableFuture thenAccept(Consumer super T> action)thenAcceptAsync(Consumer super T> action)thenAcceptAsync(Consumer super T> action)任务完成后执行 action。如方法名和参数所示,action 是接受并消费任务结果的消费者,action 没有返回值public CompletableFuture thenApply(Function super T,? extends U> fn)thenApplyAsync(Function super T,? extends U> fn)thenApplyAsync(Function super T,? extends U> fn, Executor executor)任务完成后执行 action。action 将任务结果应用 action 的函数参数做转换,返回转换后的结果,类似于 stream 中的 mappublic CompletableFuture whenComplete(BiConsumer super T, ? super Throwable> action)whenCompleteAsync(BiConsumer super T, ? super Throwable> action)whenCompleteAsync(BiConsumer super T, ? super Throwable> action, Executor executor)任务完成后或者异常时运行action。action 是接受并消费任务结果的消费者,并且有返回值。可以认为是支持异常处理的 thenAccept 版本。public CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn)handleAsync(BiFunction super T, Throwable, ? extends U> fn)handleAsync(BiFunction super T, Throwable, ? extends U> fn, Executor executor)任务完成后或者异常时运行action。可以认为是支持异常处理的 thenApply 版本。public CompletableFuture exceptionally(Function fn)如果任务或者 action 发生异常,则会触发exceptionally的调用相当于 try...catch

2.0 示例 exceptionally

@Test

void testCompletableFutureExceptionally(){

CompletableFuture first = CompletableFuture

.supplyAsync(() -> {

if (true) {

throw new RuntimeException("exception in task");

}

return "hello world";

})

.thenApply(data -> {

if (true) {

throw new RuntimeException("exception in action");

}

return 1;

})

.exceptionally(e -> {

System.out.println("[" + Thread.currentThread().getName() + "] " + " print exception stack trace" );

e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获

return 0;

});

}

3 两个任务执行编排

下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了

方法说明public CompletableFuture thenCompose(Function super T,? extends CompletionStage> fn)串行组合两个 CompletableFuture 任务,后一个任务依赖前一个任务的结果,后一个任务可以返回与第一个任务不同类型的返回值。执行后一个任务的线程与前面讨论 action 执行的线程类似。public CompletableFuture runAfterBoth(CompletionStage> other, Runnable action)这里有两个并行任务,一个是runAfterBoth 调用本身所属的CompletableFuture 实例A,一个是参数 other 引用的任务B。两个并行任务都完成后执行 action,action 中不关心任务的结果,action 也没有返回值public CompletableFuture thenAcceptBoth(CompletionStage extends U> other, BiConsumer super T, ? super U> action)与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务都完成后执行 action。如方法名和参数所示,action 是接受并消费两个任务结果的消费者,action 没有返回值public CompletableFuture thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务都完成后执行 action,action 依赖两个任务的结果,并对结果做转换,返回一个新值public CompletableFuture runAfterEither(CompletionStage> other, Runnable action)与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action,action 中不关心任务的结果,action 也没有返回值public CompletableFuture acceptEither(CompletionStage extends T> other, Consumer super T> action)与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action。如方法名和参数所示,action 是接受并消费两个任务结果的消费者,action 没有返回值public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn)与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action。action 将任务结果应用 action 的函数参数做转换,返回转换后的结果,类似于 stream 中的 map

4 多个任务执行编排

下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了

方法说明public static CompletableFuture allOf(CompletableFuture>... cfs)所有参数引用的任务都完成后,才触发执行当前的 CompletableFutuhttp://re 实例任务。public static CompletableFuture anyOf(CompletableFuture>... cfs)参数引用的任务中任何一个完成后,就会触发执行当前的 CompletableFuture 实例任务。

5 CompletableFuture 其他方法

方法说明public boolean cancel(boolean mayInterruptIfRunning)如果任务未完成,以 CancellationException 异常结束任务。public boolean isCancelled()判断任务是否取消。public T join()阻塞等待 获取返回值public T get() throws InterruptedException, ExecutionExceptionpublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException阻塞等待(有超时重载版本)获取返回值。get 与 join区别,get 会抛出 checked 异常,调用代码需要处理异常。join 没有超时重载版本。public T getNow(T valueIfAbsent)获取返回值,如果任务未完成则返回valueIfAbsent 参数指定valuepublic boolean isDone()任务是否执行完成


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Boot 教程之创建项目的三种方式
下一篇:使用@DS轻松解决动态数据源的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~