Java8 自定义CompletableFuture的原理解析

网友投稿 422 2022-09-17


Java8 自定义CompletableFuture的原理解析

目录java8 自定义CompletableFuture原理CompleteFuture简单使用下面简单介绍用法

Java8 自定义CompletableFuture原理

Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好?

public class FutureInAction3 {

public static void main(String[] args) {

Future future = iUbsCpJMdnvoke(() -> {

try {

Thread.sleep(10000L);

return "I am Finished.";

} catch (InterruptedException e) {

return "I am Error";

}

});

future.setCompletable(new Completable() {

@Override

public void complete(String s) {

System.out.println("complete called ---- " + s);

}

@Override

public void exception(Throwable cause) {

System.out.println("error");

cause.printStackTrace();

}

});

System.out.println("....do something else .....");

System.out.println("try to get result ->" + future.get());

}

private static Future invoke(Callable callable) {

AtomicReference result = new AtomicReference<>();

AtomicBoolean finished = new AtomicBoolean(false);

Future future = new Future() {

private Completable completable;

@Override

public T get() {

return result.get();

}

@Override

public boolean isDone() {

return finished.get();

}

// 设置完成

@Override

public void setCompletable(Completable completable) {

this.completable = completable;

}

// 获取

@Override

public Completable getCompletable() {

return completable;

}

};

Thread t = new Thread(() -> {

try {

T value = callable.action();

result.set(value);

finished.set(true);

if (future.getCompletable() != null)

future.getCompletable().complete(value);

} catch (Throwable caushttp://e) {

if (future.getCompletable() != null)

future.getCompletable().exception(cause);

}

});

t.start();

return future;

}

private interface Future {

T get();

boolean isDone();

// 1

void setCompletable(Completable completable);

// 2

Completable getCompletable();

}

private interface Callable {

T action();

}

// 回调接口

private interface Completable {

void complete(T t);

void exception(Throwable cause);

}

}

CompleteFuture简单使用

Java8 中的 completeFuture 是对 Future 的扩展实现, 主要是为了弥补 Future 没有相应的回调机制的缺陷.

我们先看看 Java8 之前的 Future 的使用

package demos;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

/**

* @author djh on 2019/4/22 10:23

* @E-Mail 1544579459@qq.com

*/

public class Demo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService cachePool = Executors.newCachedThreadhttp://Pool();

Future future = cachePool.submit(() -> {

Thread.sleep(3000);

return "异步任务计算结果!";

});

// 提交完异步任务后, 主线程可以继续干一些其他的事情.

doSomeThingElse();

// 为了获取异步计算结果, 我们可以通过 future.get 和 轮询机制来获取.

String result;

// Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷.

// result = future.get();

// 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载.

long start = System.currentTimeMillis();

while (true) {

if (future.isDone()) {

break;

}

}

System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));

result = future.get();

System.out.println("获取到异步计算结果啦: " + result);

cachePool.shutdown();

}

private static void doSomeThingElse() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.");

}

}

输出:

我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.

轮询耗时:2000

获取到异步计算结果啦: 异步任务计算结果!

Process finished with exit code 0

从上面的 Demo 中我们可以看出, future 在执行异步任务时, 对于结果的获取显的不那么优雅, 很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API.

下面简单介绍用法

package demos;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

* @author djh on 2019/5/1 20:26

* @E-Mail 1544579459@qq.com

*/

public class CompleteFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture completableFutureOne = new CompletableFuture<>();

ExecutorService cachePool = Executors.newCachedThreadPool();

cachePool.execute(() -> {

try {

Thread.sleep(3000);

completableFutureOne.complete("异步任务执行结果");

System.out.println(Thread.currentThread().getName());

} catch (InterruptedException e) {

e.printStackTrace();

}

});

// WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果.

CompletableFuture completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {

System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);

});

// ThenApply 方法返回的是一个新的 completeFuture.

CompletableFuture completableFutureThree = completableFutureTwo.thenApply(s -> {

System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return s.length();

});

System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());

cachePool.shutdown();

}

}

从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法, 这两个方法便是 CompleteFuture 中最具有意义的方法, 他们都会在 completeFuture 调用 complete 方法传入异步计算结果时回调, 从而获取到异步任务的结果.

相比之下 future 的阻塞和轮询方式获取异步任务的计算结果, CompleteFuture 获取结果的方式就显的优雅的多。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:三网合一 中国移动铁通光猫 HG6821M 如何设置宽带自动连接(三网合一光纤)
下一篇:牛逼的网络工程师是怎么学习的?(网络工程师好学么)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~