Java8中AbstractExecutorService与FutureTask源码详解

网友投稿 269 2022-08-31


Java8中AbstractExecutorService与FutureTask源码详解

目录前言一、AbstractExecutorService1、定义2、submit3、invokeAll4、invokeAny二、FutureTask1、定义2、构造方法3、get 4、run / runAndReset5、 cancel三、ExecutorCompletionService1、定义2、submit3、take / poll 总结

前言

本篇博客重点讲解ThreadPoolExecutor的三个基础设施类AbstractExecutorService、FutureTask和ExecutorCompletionService的实现细节,AbstractExecutorService实现了ExecutorService的大部分接口,子类只需实现excute方法和shutdown相关方法即可;FutureTask是RunnableFuture接口的主要实现,该接口是Runnable和Future的包装类接口,会执行Runnable对应的run方法,调用方可以通过Future接口获取任务的执行状态和结果;ExecutorCompletionService是帮助获取多个RunnableFuture任务的执行结果的工具类,基于FutureTask执行完成时的回调方法done实现的。

一、AbstractExecutorService

1、定义

ThreadPoolExecutor的类继承关系如下:

其中ExecutorService的子类如下:

右上角带S的表示内部类,我们重点关注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三个类的实现,后面两个类会在后面的博客中逐一探讨。

Executor包含的方法如下:

ExecutorService包含的方法如下:

上述接口方法中涉及的Callable接口的定义如下:

该接口也是表示一个执行任务,跟常见的Runnable接口的区别在于call方法有返回值而run方法没有返回值。

Future表示某个任务的执行结果,其定义的方法如下:

其子类比较多,如下:

后面会将涉及的子类逐一探讨的。 AbstractExecutorService基于Executor接口的excute方法实现了大部分的ExecutorService的接口,子类只需要重点实现excute方法和shutdown相关方法即可,下面来分析其具体的实现。

2、submit

//Runnable接口方法没有返回值,但是可以通过Future判断任务是否执行完成

public Future> submit(Runnable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task, null);

execute(ftask);

return ftask;

}

//因为Runnable的run方法没有返回值,所以如果run方法正常执行完成,其结果就是result

public Future submit(Runnable task, T result) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task, result);

execute(ftask);

return ftask;

}

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

//都是返回FutureTask

protected RunnableFuture newTaskFor(Runnable runnable, T value) {

return new FutureTask(runnable, value);

}

protected RunnableFuture newTaskFor(Callable callable) {

return new FutureTask(callable);

}

3、invokeAll

//执行完成tasks中所有的任务,如果有一个抛出异常,则取消掉剩余的任务

public List> invokeAll(Collection extends Callable> tasks)

throws InterruptedException {

if (tasks == null)

throw new NullPointerException();

ArrayList> futures = new ArrayList>(tasks.size());

boolean done = false;

try {

//遍历tasks中的任务将其转换成RunnableFuture,然后提交到线程池执行

for (Callable t : tasks) {

RunnableFuture f = newTaskFor(t);

futures.add(f);

execute(f);

}

//遍历Future列表

for (int i = 0, size = futures.size(); i < size; i++) {

Future f = futures.get(i);

if (!f.isDone()) { //如果未执行完成

try {

//等待任务执行完成

f.get();

} catch (CancellationException ignore) {

} catch (ExecutionException ignore) {

}

}

}

//所有任务都执行完了

done = true;

return futures;

} finally {

if (!done)

//出现异常,将所有的任务都取消掉

for (int i = 0, size = futures.size(); i < size; i++)

futures.get(i).cancel(true);

}

}

//逻辑同上,不过加了等待时间限制,所有的任务的累计时间不能超过指定值,如果超时直接返回Future列表

public List> invokeAll(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException {

if (tasks == null)

throw new NullPointerException();

//转换成纳秒

long nanos = unit.toNanos(timeout);

ArrayList> futures = new ArrayList>(tasks.size());

boolean done = false;

try {

//转换成Future

for (Callable t : tasks)

futures.add(newTaskFor(t));

//计算终止时间

final long deadline = System.nanoTime() + nanos;

final int size = futures.size();

for (int i = 0; i < size; i++) {

execute((Runnable)futures.get(i));

nanos = deadline - System.nanoTime(); //计算剩余时间

if (nanos <= 0L) //如果超时了则直接返回

return futures;

}

for (int i = 0; i < size; i++) {

Future f = futures.get(i);

if (!f.isDone()) { //任务未执行

if (nanos <= 0L)

return futures; //等待超时

try {

//等待任务执行完成

f.get(nanos, TimeUnit.NANOSECONDS);

} catch (CancellationException ignore) {

} catch (ExecutionException ignore) {

} catch (TimeoutException toe) {

return futures;

}

nanos = deadline - System.nanoTime();

}

}

done = true;

return futures;

} finally {

if (!done) //出现异常,取消掉剩余未执行的任务

for (int i = 0, size = futures.size(); i < size; i++)

futures.get(i).cancel(true);

}

}

4、invokeAny

//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉

public T invokeAny(Collection extends Callable> tasks)

throws InterruptedException, ExecutionException {

try {

return doInvokeAny(tasks, false, 0);

} catch (TimeoutException cannotHappen) {

assert false;

return null;

}

}

//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉

//如果指定时间内没有执行成功的,则抛出TimeoutException 异常

public T invokeAny(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

return doInvokeAny(tasks, true, unit.toNanos(timeout));

}

private T doInvokeAny(Collection extends Callable> tasks,

boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

//参数校验

if (tasks == null)

throw new NullPointerException();

int ntasks = tasks.size();

if (ntasks == 0)

throw new IllegalArgumentException();

ArrayList> futures = new ArrayList>(ntasks);

ExecutorCompletionService ecs =

new ExecutorCompletionService(this);

try {

ExecutionException ee = null;

final long deadline = timed ? System.nanoTime() + nanos : 0L;

Iterator extends Callable> it = tasks.iterator();

//提交一个任务

futures.add(ecs.submit(it.next()));

--ntasks;

int active = 1;

for (;;) {

//获取最新的已完成任务

Future f = ecs.poll();

if (f == null) {

//没有执行完的

if (ntasks > 0) {

--ntasks;

//继续添加下一个任务

futures.add(ecs.submit(it.next()));

++active;

}

else if (active == 0) //所有任务都执行失败了,没有执行成功的

break;

else if (timed) { //等待超时

f = ecs.poll(nanos, TimeUnit.NANOSECONDS);

if (f == null)

throw new TimeoutException();

//计算剩余等待时间

nanos = deadline - System.nanoTime();

}

else

//所有任务都提交了,阻塞等待某个任务执行完成

f = ecs.take();

}

if (f != null) {

--active;

try {

//某个任务已执行完成,如果抛出异常则执行下一个任务

return f.get();

} catch (ExecutionException eex) {

ee = eex;

} catch (RuntimeException rex) {

ee = new ExecutionException(rex);

}

}

} //for循环终止

//所有任务都执行失败了

if (ee == null)

ee = new ExecutionException();

throw ee;

} finally {

//返回前,将未执行完成的任务都取消掉

for (int i = 0, size = futures.size(); i < size; i++)

futures.get(i).cancel(true);

}

}

二、FutureTask

1、定义

FutureTask的类继承关系如下:

RunnableFuture接口没有新增方法,将Runnable的run方法由public改成包级访问了,如下:

该类包含的实例属性如下:

/** 执行的任务*/

private Callable callable;

/** 任务执行的结果或者执行过程中抛出的异常 */

private Object outcome; // non-volatile, protected by state reads/writes

/** 执行任务的线程 */

private volatile Thread runner;

/** 等待线程的链表*/

private volatile WaitNode waiters;

//状态

private volatile int state;

其中WaitNode是一个简单的内部类,其定义如下:

该类包含的静态属性都是字段偏移量,通过static代码块初始化,如下:

FutureTask定义了多个表示状态的常量,如下:

//初始状态

private static final int NEW = 0;

//是一个很短暂的中间状态,表示任务已执行完成,保存完执行结果后就流转成NORMAL或者EXCEPTIONAL

private static final int COMPLETING = 1;

//正常执行完成

private static final int NORMAL = 2;

//异常终止

private static final int EXCEPTIONAL = 3;

//任务被取消了

private static final int CANCELLED = 4;

//是一个很短暂的中间状态,调用interrupt方法后,会将状态流转成INTERRUPTED

private static final int INTERRUPTING = 5;

//任务执行已中断

private static final inthttp:// INTERRUPTED = 6;

可能的状态流转如下图:

2、构造方法

public FutureTask(Callable callable) {

if (callable == null)

throw new NullPointerException();

this.callable = callable;

this.state = NEW; //初始状态是NEW

}

public FutureTask(Runnable runnable, V result) {

//将Runnable适配成Callable

this.callable = Executors.callable(runnable, result);

this.state = NEW; // ensure visibility of callable

}

//Executors方法

public static Callable callable(Runnable task, T result) {

if (task == null)

throw new NullPointerException();

return new RunnableAdapter(task, result);

}

其中RunnableAdapter是Executors的一个静态内部类,其实现如下:

3、get

get方法用于阻塞当前线程直到任务执行完成,如果阻塞的过程中被中断则抛出异常InterruptedException,可以限制阻塞的时间,如果等待超时还是未完成则抛出异常TimeoutException。

//阻塞当前线程等待任务执行完成

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING) //如果未完成

s = awaitDone(false, 0L);

return report(s);

}

//同上,可以限制等待的时间

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

//阻塞当前线程,如果返回值还是未完成说明是等待超时了,则抛出异常

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

//timed为true表示等待指定的时间,否则是无期限等待

//该方法返回退出此方法时的状态

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

//计算等待的终止时间

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

if (Thread.interrupted()) { //如果当前线程被中断了,则从等待链表中移除,并抛出异常

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) { //如果任务已执行完

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) //正在状态流转的过程中,让出当前CPU时间片

Thread.yield();

//未开始执行

else if (q == null)

q = new WaitNode();

else if (!queued)

//修改waiters属性,插入到链表头

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

//已插入到链表中

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) { //等待超时,从链表中移除

removeWaiter(q);

return state;

}

//让当前线程休眠

LockSupport.parkNanos(this, nanos);

}

else

//让当前线程休眠

LockSupport.park(this);

}

}

private void removeWaiter(WaitNode node) {

if (node != null) {

node.thread = null;//将thread置为null

retry:

for (;;) { // restart on removeWaiter race

for (WaitNode pred = null, q = waiters, s; q != null; q = s) {

s = q.next;

if (q.thread != null)

pred = q;

//q.thread为null,需要被移除

else if (pred != null) {

pred.next = s; //将q从链表移除

if (pred.thread == null) //如果为null,则从头开始遍历

continue retry;

}

//q.thread为null,pred为null,之前没有有效节点,修改waiters,修改失败重试

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,

q, s))

continue retry;

}

break;

}

}

}

//awaitDone正常返回后调用此方法,此时状态应该是COMPLETING之后了

private V report(int s) throws ExecutionException {

Object x = outcome;

if (s == NORMAL) //如果是正常结束

return (V)x;

if (s >= CANCELLED) //如果被取消了

throw new CancellationException();

throw new ExecutionException((Throwable)x); //如果出现异常了

}

4、run / runAndReset

run方法是有线程池调用的,会执行Callable任务,保存执行的结果,如果出现异常则保存异常对象,并完成状态流转,最后将等待任务完成的阻塞中的线程唤醒。runAndReset和run类似,区别在于runAndReset正常执行完成后不会保存执行的结果,不会改变状态,状态还是NEW,如果是正常执行则返回true,该方法是子类使用的,其调用链如下:

这两方法的实现如下:

//由线程池中的某个线程调用此方法

public void run() {

//如果不等于NEW,说明其他某个线程正在执行任务

//如果等于NEW,则cas修改runner属性,修改失败说明其他某个线程也准备执行这个任务

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

//cas成功表示这个任务由当前线程抢占成功

try {

Callable c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

//执行任务

result = c.call();

ran = true;

} catch (Throwable ex) {

//出现异常

result = null;

ran = false;

setException(ex); //保存异常对象

}

if (ran)

//执行成功保存结果

set(result);

}

} finally {

//如果任务被cancel了,则上述setException和set方法因为状态不是NEW了会直接返回

runner = null;

int s = state;

if (s >= INTERRUPTING) //如果被中断,自旋等待中断完成

handlePossibleCancellationInterrupt(s);

}

}

//跟run方法相比区别就是正常执行完成不会保存结果,不会流转状态

protected boolean runAndReset() {

//如果state不是NEW或者cas修改runner失败

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return false;

boolean ran = false;

int s = state;

try {

Callable c = callable;

if (c != null && s == NEW) {

try {

//执行任务,但是不保存结果,状态就不会从NEW流转成NORMAL

YcjKCK c.call(); // don't set result

ran = true;

} catch (Throwable ex) {

setException(ex); //保存异常实例

}

}

} finally {

runner = null;

s = state;

if (s >= INTERRUPTING) //任务被中断了,自旋等待中断完成

handlePossibleCancellationInterrupt(s);

}

//返回任务是否正常完成

return ran && s == NEW;

}

//保存异常对象并修改状态

protected void setException(Throwable t) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

//只有原来的状态是NEW才进入下面的逻辑

outcome = t;

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

//任务执行完成,唤醒阻塞的线程

finishCompletion();

}

}

//保存执行结果并修改状态

protected void set(V v) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

//只有原来的状态是NEW才进入下面的逻辑

outcome = v;

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

//任务执行完成,唤醒阻塞的线程

finishCompletion();

}

}

private void handlePossibleCancellationInterrupt(int s) {

//正在中断的过程中

if (s == INTERRUPTING)

while (state == INTERRUPTING)

Thread.yield(); //自旋等待中断完成

}

private void finishCompletion() {

// assert state > COMPLETING;

for (WaitNode q; (q = waiters) != null;) {

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

//cas将waiters置为null

for (;;) {

Thread t = q.thread;

if (t != null) {

//唤醒阻塞的新线程

q.thread = null;

LockSupport.unpark(t);

}

//遍历下一个节点

WaitNode next = q.next;

if (next == null) //遍历结束,终止循环

break;

q.next = null; // unlink to help gc

q = next;

}

break; //终止外层循环

}

}

//执行完成的回调方法,默认是空实现,子类可改写此方法

done();

callable = null; // to reduce footprint

}

5、 cancel

cancel方法的参数为true,则会将当前状态由NEW改成INTERRUPTING,如果此任务已经开始执行了,则将正在执行任务的线程标记为已中断,如果该线程响应中断则可能抛出异常,如果不响应中断则继续执行,最后再将状态改成INTERRUPTED;如果方法的参数为false,则将当前状态由NEW改成CANCELLED,如果此任务已经开始执行了则会继续执行。上述两种情形下,状态流转完成后都会唤醒还在阻塞中的等待线程,如果任务已经开始执行并且继续执行,因为状态已经不是NEW了,直接结果不会保存下来。

//如果mayInterruptIfRunning为true,则会将正在执行任务的线程标记为已中断,线程有可能继续执行,也有可能响应中断抛出异常

//如果为false,则标记为CANCELLED,如果任务已经开始执行了则会继续执行

//如果未执行,则标记为CANCELLED或者INTERRUPTING都会让这任务不会被执行了

public boolean cancel(boolean mayInterruptIfRunning) {

//如果state不是NEW 或者cas修改失败,则返回false

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

if (mayInterruptIfRunning) {

try {

Thread t = runner;

if (t != null)

t.interrupt(); //将正在执行任务的线程标记为已中断

} finally {

//修改状态为已中断

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

//唤醒等待的线程

finishCompletion();

}

return true;

}

三、ExecutorCompletionService

1、定义

ExecutorCompletionService是一个帮助获取多个Future执行结果的工具类,其类继承关系如下:

CompletionService包含的方法如下:

后面会讲解各方法的用途,该类包含的属性如下:

//执行任务的线程池实现

private final Executor executor;

//调用其newTaskFor方法

private final AbstractExecutorService aes;

//已执行完成的Future阻塞队列

private final BlockingQueue> completionQueue;

其构造方法实现如下:

public ExecutorCompletionService(Executor executor) {

if (executor == null)

throw new NullPointerException();

this.executor = executor;

//如果executor继承自AbstractExecutorService,则aes为executor,否则为null

this.aes = (executor instanceof AbstractExecutorService) ?

(AbstractExecutorService) executor : null;

//没有指定队列,默认使用基于链表的无固定容量的LinkedBlockingQueue

this.completionQueue = new LinkedBlockingQueue>();

}

public ExecutorCompletionService(Executor executor,

BlockingQueue> completionQueue) {

if (executor == null || completionQueue == null)

throw new NullPointerException();

this.executor = executor;

this.aes = (executor instanceof AbstractExecutorService) ?

(AbstractExecutorService) executor : null;

this.completionQueue = completionQueue;

}

2、submit

submit方法将Callable或者Runnable任务包装成一个RunnableFuture,然后提交到线程池中,返回RunnableFuture实例。

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

//将其包装成RunnableFuture实现类

RunnableFuture f = newTaskFor(task);

//提交任务到线程池

executor.execute(new QueueingFuture(f));

return f;

}

public Future submit(Runnable task, V result) {

if (task == null) throw new NullPointerException();

RunnableFuture f = newTaskFor(task, result);

executor.execute(new QueueingFuture(f));

return f;

}

private RunnableFuture newTaskFor(Callable task) {

if (aes == null)

return new FutureTask(task); //默认使用FutureTask作为RunnableFuture的实现

else

return aes.newTaskFor(task);//如果aes不为null,则使用该类的特定实现

}

private RunnableFuture newTaskFor(Runnable task, V result) {

if (aes == null)

return new FutureTask(task, result);

else

return aes.newTaskFor(task, result);

}

其中QueueingFuture是一个内部类,继承自FutureTask,其实现如下:

重点改写了done方法的实现,如果任务已经执行完成,则会将该Future实例添加到阻塞队列中。

3、take / poll

这三方法就是从已完成的Future阻塞队列中获取并移除Future实例,如果队列为空,take方法会无期限阻塞阻塞,不带时间参数的poll方法不会阻塞返回null,带时间参数的poll方法会阻塞指定的时间,如果超时则返回null,其实现都是直接调用阻塞队列的方法,如下:

总结


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于python搭建FTP服务(python操作ftp文件)
下一篇:接口调用是什么意思?常用的接口调用方式分享
相关文章

 发表评论

暂时没有评论,来抢沙发吧~