分布式任务调度xxl
337
2022-07-29
目录1.概述1.0 创建 CompletableFuture 的对象的工厂方法1.1 non-async 和 async 区别1.1.1 non-async 示例:注册 action 的时候任务可能已经结束1.1.2 non-async 示例:注册 action 的时候任务未完成1.2 Run 类方法1.3 Accept 类方法1.4 Apply 类方法2 单个任务执行完成后执行一个动作(action)2.0 示例 exceptionally3 两个任务执行编排4 多个任务执行编排5 CompletableFuture 其他方法
1.概述
1.0 创建 CompletableFuture 的对象的工厂方法
staticCompletableFuture
staticCompletableFuture
static CompletableFuture supplyAsync(Supplier supplier)
static CompletableFuture supplyAsync(Supplier
runAsync 的参数是 Runnable, 返回值是 CompletableFuture, 意思是工厂方法创建的 CompletableFuture 对象封装的任务没有返回值。
例如:
CompletableFuture
而 suppyAsync 参数类型是 Supplier,返回值是CompletableFuture , 意思是任务不接受参数,但是会返回结果。
CompletableFuture
System.out.println("Hello");
return "hello world!"";
});
System.out.println(supply.get()); //hello world!"
所以如果任务需要返回结果,那么应该选择 suppyAsync;否则可以选择 runAsync。
1.1 non-async 和 async 区别
public CompletionStage
public CompletionStage
public CompletionStage
CompletableFuture 中有众多类似这样的方法,http://那么 non-async 和 async 和版本的方法究竟有什么区别呢? 参考官方文档的描述:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
翻译:传递给 non-async 方法作为参数的函数(action)可能会在完成当前的 CompletableFuture 的线程中执行,也可能会在方法的调用者线程中执行。
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.
翻译:所有没有Executor 参数的 async 方法都在 ForkJoinPool.commonPool()线程池中执行(除非不支持最小并发度为2,这种情况下,每个任务会创建一个新线程去执行)。为了简化监控、调试和追踪,所有生成的异步任务都是接口 CompletableFuture.AsynchronousCompletionTask的实例。
从上面这两段官方描述看。async 类方法比较容易理解,就是 CompletableFuture 实例的任务执行完成后,会将 action 提交到缺省的异步线程池 ForkJoinPool.commonPool(),或者 async 类方法参数Executor executor 指定的线程池中执行。
而对于 non-async 的描述则有点不明确。action 可能会在完成 CompletableFuture 实例任务的线程中执行,也可能会在调用 thenRun(编排任务完成后执行 action 的系列方法) 方法的线程中执行。这个主要是看调用 thenRun 的时候,CompletableFuture 实例的任务是否已经完成。如果没有完成,那么action 会在完成任务的线程中执行。如果任务已经完成,则 action 会在调用thenAccept 等注册方法的线程中执行。
1.1.1 non-async 示例:注册 action 的时候任务可能已经结束
@Test
void testThenRunWithTaskCompleted() throws Exception{
CompletableFuture
@Override
public Integer get() {
System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" );
return 1;
}
}).thenRun(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" );
});
future.get();
}
运行结果:
[ForkJoinPool.commonPool-worker-1] in task[main] in action
分析: 任务通过 CompletableFuture.supplyAsync 提交后,会以异步的方式在 ForkJoinPool.commonPool() 线程池中运行。这时候有两个线程,一个是[ForkJoinPool.commonPool-worker-1] 执行 Supplier.get 方法;一个是[main] 主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action。由于Supplier.get 非常简单,几乎立刻返回。所以很大概率在主线程调用 thenRun 编排任务完成后执行 action的时候,异步任务已经完成,所以 action 在主线程中执行了。注:在笔者的电脑上几乎100% 是这样的调度方式。
1.1.2 non-async 示例:注册 action 的时候任务未完成
@Test
void testThenRunWithTaskUncompleted() throws Exception{
CompletableFuture
@Override
public Integer get() {
System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" );
});
future.get();
}
运行结果:
[ForkJoinPool.commonPool-worker-1] in task[ForkJoinPool.commonPool-worker-1] in action
分析:在 Supplier.get 加入 sleep,延迟任务结束。主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action 的时候,任务没有结束。所以这个action 在完成任务的线程中jnotzal执行。
1.2 Run 类方法
Run 类方法指 thenRun、runAfterBoth、runAfterEither 等。Run 类方法的动作参数都是Runable action。例如 thenRun(Runnable action)。这就意味着这个 action 不关心任务的结果,action 本身也没有返回值。只是为了实现在完成任务后执行一个动作的目的。
1.3 Accept 类方法
Accept 类方法指 thenAccept、runAcceptBoth、runAcceptEither 等。Accept 类方法的动作参数都是Consumer super T> action。例如 thenAccept(Consumer super T> action)。如方法名和参数所示,action 是接受并消费任务结果的消费者,action 没有返回值。
1.4 Apply 类方法
Apply 类方法指 thenApply、applyToEither 等。Apply 类方法的动作参数都是Function super T,? extends U> fn。例如 thenApply(Function super T,? extends U> fn)。如方法名和参数所示,action 将任务结果应用函数参数 fn 做转换,返回转换后的结果,类似于 stream 中的 map。
2 单个任务执行完成后执行一个动作(action)
方法说明public CompletableFuture
2.0 示例 exceptionally
@Test
void testCompletableFutureExceptionally(){
CompletableFuture
.supplyAsync(() -> {
if (true) {
throw new RuntimeException("exception in task");
}
return "hello world";
})
.thenApply(data -> {
if (true) {
throw new RuntimeException("exception in action");
}
return 1;
})
.exceptionally(e -> {
System.out.println("[" + Thread.currentThread().getName() + "] " + " print exception stack trace" );
e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获
return 0;
});
}
3 两个任务执行编排
下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了
方法说明public CompletableFuture thenCompose(Function super T,? extends CompletionStage> fn)串行组合两个 CompletableFuture 任务,后一个任务依赖前一个任务的结果,后一个任务可以返回与第一个任务不同类型的返回值。执行后一个任务的线程与前面讨论 action 执行的线程类似。public CompletableFuture
4 多个任务执行编排
下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了
方法说明public static CompletableFuture
5 CompletableFuture 其他方法
方法说明public boolean cancel(boolean mayInterruptIfRunning)如果任务未完成,以 CancellationException 异常结束任务。public boolean isCancelled()判断任务是否取消。public T join()阻塞等待 获取返回值public T get() throws InterruptedException, ExecutionExceptionpublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException阻塞等待(有超时重载版本)获取返回值。get 与 join区别,get 会抛出 checked 异常,调用代码需要处理异常。join 没有超时重载版本。public T getNow(T valueIfAbsent)获取返回值,如果任务未完成则返回valueIfAbsent 参数指定valuepublic boolean isDone()任务是否执行完成
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~