Java多线程之异步Future机制的原理和实现

网友投稿 210 2023-07-08


Java多线程之异步Future机制的原理和实现

项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

public class AddTask implements Callable {

private int a,b;

public AddTask(int a, int b) {

this.a = a;

this.b = b;

}

@Override

public Integer call throws Exception {

Integer result = a + b;

return result;

}

public static void main(String[] args) throws InterruptedException, ExecutionException {

ExecutorService executor = Executors.newSingleThreadExecutor;

//JDK目前为止返回的都是FutureTask的实例

Future future = executor.submit(new AddTask(1, 2));

Integer result = future.get;// 只有当future的状态是已完成时(future.isDone = true),get方法才会返回

}

}

虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

public interface Future {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled;

boolean isDone;

V get throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

package future;

import java.util.concurrent.CancellationException;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

/**

* The result of an asynchronous operation.

*

* @author lixiaohui

* @param 执行结果的类型参数

*/

public interface IFuture extends Future {

boolean isSuccess; // 是否成功

V getNow; //立即返回结果(不管Future是否处于完成状态)

Throwable cause; //若执行失败时的原因

boolean isCancellable; //是否可以取消

IFuture await throws InterruptedException; //等待future的完成

boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成

boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;

IFuture awaitUninterruptibly; //等待future的完成,不响应中断

boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断

boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);

IFuture addListener(IFutureListener l); //当future完成时,会通知这些加进来的监听器

IFuture removeListener(IFutureListener l);

}

接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原���的核心就是这两个方法.看看JDK里面的解释:

public class Object {

/**

* Causes the current thread to wait until azUFLmESnnother thread invokes the

* {@link java.lang.Object#notify} method or the

* {@link java.lang.Object#notifyAll} method for this object.

* In other words, this method behaves exactly as if it simply

* performs the call {@code wait(0)}.

* 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify/notifyAll

*/

public final void wait throws InterruptedException {

wait(0);

}

/**

* Wakes up all threads that are waiting on this object's monitor. A

* thread waits on an object's monitor by calling one of the

* {@code wait} methods.

*

* The awakened threads will not be able to proceed until the current

* thread relinquishes the lock on this object. The awakened threads

* will compete in the usual manner with any other threads that might

* be actively competing to synchronize on this object; for example,

* the awakened threads enjoy no reliable privilege or disadvantage in

* being the next thread to lock this object.

*/

public final native void notifyAll;

}

知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

package future;

import java.util.Collection;

import java.util.concurrent.CancellationException;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;

/**

*

* 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL}

* 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例

* 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll方法:

*

*

*

*

*

*

*

* @author lixiaohui

*

* @param

* 异步执行结果的类型

*/

public class AbstractFuture implements IFuture {

protected volatile Object result; // 需要保证其可见性

/**

* 监听器集

*/

protected Collection> listeners = new CopyOnWriteArrayList>;

/**

* 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时,

* result引用该对象

*/

private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal;

@Override

public boolean cancel(boolean mayInterruptIfRunning) {

if (isDone) { // 已完成了不能取消

return false;

}

synchronized (this) {

if (isDone) { // double check

return false;

}

result = new CauseHolder(new CancellationException);

notifyAll; // isDone = true, 通知等待在该对象的wait的线程

}

notifyListeners; // 通知监听器该异步操作已完成

return true;

}

@Override

public boolean isCancellable {

return result == null;

}

@Override

public boolean isCancelled {

return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;

}

@Override

public boolean isDone {

return result != null;

}

@Override

public V get throws InterruptedException, ExecutionException {

await; // 等待执行结果

Throwable cause = cause;

if (cause == null) { // 没有发生异常,异步操作正常结束

return getNow;

}

if (cause instanceof CancellationException) { // 异步操作被取消了

throw (CancellationException) cause;

}

throw new ExecutionException(cause); // 其他异常

}

@Override

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

if (await(timeout, unit)) {// 超时等待执行结果

Throwable cause = cause;

if (cause == null) {// 没有发生异常,异步操作正常结束

return getNow;

}

if (cause instanceof CancellationException) {// 异步操作被取消了

throw (CancellationException) cause;

}

throw new ExecutionException(cause);// 其他异常

}

// 时间到了异步操作还没有结束, 抛出超时异常

throw new TimeoutException;

}

@Override

public boolean isSuccess {

return result == null ? false : !(result instanceof CauseHolder);

}

@SuppressWarnings("unchecked")

@Override

public V getNow {

return (V) (result == SUCCESS_SIGNAL ? null : result);

}

@Override

public Throwable cause {

if (result != null && result instanceof CauseHolder) {

return ((CauseHolder) result).cause;

}

return null;

}

@Override

public IFuture addListener(IFutureListener listener) {

if (listener == null) {

throw new NullPointerException("listener");

}

if (isDone) { // 若已完成直接通知该监听器

notifyListener(listener);

return this;

}

synchronized (this) {

if (!isDone) {

listeners.add(listener);

return this;

}

}

notifyListener(listener);

return this;

}

@Override

public IFuture removeListener(IFutureListener listener) {

if (listener == null) {

throw new NullPointerException("listener");

}

if (!isDone) {

listeners.remove(listener);

}

return this;

}

@Override

public IFuture await throws InterruptedException {

return await0(true);

}

private IFuture await0(boolean interruptable) throws InterruptedException {

if (!isDone) { // 若已完成就直接返回了

// 若允许终端且被中断了则抛出中断异常

if (interruptable && Thread.interrupted) {

throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");

}

boolean interrupted = false;

synchronized (this) {

while (!isDone) {

try {

wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法

} catch (InterruptedException e) {

if (interruptable) {

throw e;

} else {

interrupted = true;

}

}

}

}

if (interrupted) {

// 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的,

// 这里重新设置以便让其它代码知道这里被中断了。

Thread.currentThread.interrupt;

}

}

return this;

}

@Override

public boolean await(long timeoutMillis) throws InterruptedException {

return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);

}

@Override

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {

return await0(unit.toNanos(timeout), true);

}

private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {

if (isDone) {

return true;

}

if (timeoutNanos <= 0) {

return isDone;

}

if (interruptable && Thread.interrupted) {

throw new InterruptedException(toString);

}

long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime;

long waitTime = timeoutNanos;

boolean interrupted = false;

try {

synchronized (this) {

if (isDone) {

return true;

}

if (waitTime <= 0) {

return isDone;

}

for (;;) {

try {

wait(waitTime / 1000000, (int) (waitTime % 1000000));

} catch (InterruptedException e) {

if (interruptable) {

throw e;

} else {

interrupted = true;

}

}

if (isDone) {

return true;

} else {

waitTime = timeoutNanos - (System.nanoTime - startTime);

if (waitTime <= 0) {

return isDone;

}

}

}

}

} finally {

if (interrupted) {

Thread.currentThread.interrupt;

}

}

}

@Override

public IFuture awaitUninterruptibly {

try {

return await0(false);

} catch (InterruptedException e) { // 这里若抛异常了就无法处理了

throw new java.lang.InternalError;

}

}

@Override

public boolean awaitUninterruptibly(long timeoutMillis) {

try {

return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);

} catch (InterruptedException e) {

throw new java.lang.InternalError;

}

}

@Override

public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {

try {

return await0(unit.toNanos(timeout), false);

} catch (InterruptedException e) {

throw new java.lang.InternalError;

}

}

protected IFuture setFailure(Throwable cause) {

if (setFailure0(cause)) {

notifyListeners;

return this;

}

throw new IllegalStateException("complete already: " + this);

}

private boolean setFailure0(Throwable cause) {

if (isDone) {

return false;

}

synchronized (this) {

if (isDone) {

return false;

}

result = new CauseHolder(cause);

notifyAll;

}

return true;

}

protected IFuture setSuccess(Object result) {

if (setSuccess0(result)) { // 设置成功后通知监听器

notifyListeners;

return this;

}

throw new IllegalStateException("complete already: " + this);

}

private boolean setSuccess0(Object result) {

if (isDone) {

return false;

}

synchronized (this) {

if (isDone) {

return false;

}

if (result == null) { // 异步操作正常执行完毕的结果是null

this.result = SUCCESS_SIGNAL;

} else {

this.result = result;

}

notifyAll;

}

return true;

}

private void notifyListeners {

for (IFutureListener l : listeners) {

notifyListener(l);

}

}

private void notifyListener(IFutureListener l) {

try {

l.operationCompleted(this);

} catch (Exception e) {

e.printStackTrace;

}

}

private static class SuccessSignal {

}

private static final class CauseHolder {

final Throwable cause;

CauseHolder(Throwable cause) {

this.cause = cause;

}

}

}

那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

package future.test;

import future.IFuture;

import future.IFutureListener;

/**

* 延时加法

* @author lixiaohui

*

*/

public class DelayAdder {

public static void main(String[] args) {

new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener {

@Override

public void operationCompleted(IFuture future) throws Exception {

System.out.println(future.getNow);

}

});

}

/**

* 延迟加

* @param delay 延时时长 milliseconds

* @param a 加数

* @param b 加数

* @return 异步结果

*/

public DelayAdditionFuture add(long delay, int a, int b) {

DelayAdditionFuture future = new DelayAdditionFuture;

new Thread(new DelayAdditionTask(delay, a, b, future)).start;

return future;

}

private class DelayAdditionTask implements Runnable {

private long delay;

private int a, b;

private DelayAdditionFuture future;

public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {

super;

this.delay = delay;

this.a = a;

this.b = b;

this.future = future;

}

@Override

public void run {

try {

Thread.sleep(delay);

Integer i = a + b;

// TODO 这里设置future为完成状态(正常执行完毕)

future.setSuccess(i);

} catch (InterruptedException e) {

// TODO 这里设置future为完成状态(异常执行完毕)

future.setFailure(e.getCause);

}

}

}

} package future.test;

import future.AbstractFuture;

import future.IFuture;

//只是把两个方法对外暴露

public class DelayAdditionFuture extends AbstractFuture {

@Override

public IFuture setSuccess(Object result) {

return super.setSuccess(result);

}

@Override

public IFuture setFailure(Throwable cause) {

return super.setFailure(cause);

}

}

可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Mybatis查不到数据查询返回Null问题
下一篇:java webservice上传下载文件代码分享
相关文章

 发表评论

暂时没有评论,来抢沙发吧~