java 中ThreadPoolExecutor原理分析

网友投稿 216 2023-05-28


java 中ThreadPoolExecutor原理分析

java 中ThreadPoolExecutor原理分析

线程池简介

Java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。

线程池使用

JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor。在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法。

在线程池配置方面,Executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor等,这些方法最终都是调用到了ThreadPoolExecutor的构造函数。

ThreadPoolExecutor的包含所有参数的构造函数是

/**

* @param corePoolSize the number of threads to keep in the pool, even

* if they are idle, unless {@code allowCoreThreadTimeOut} is set

* @param maximumPoolSize the maximum number of threads to allow in the

* pool

* @param keepAliveTime when the number of threads is greater than

* the core, this is the maximum time that excess idle threads

* will wait for new tasks before terminating.

* @param unit the time unit for the {@code keepAliveTime} argument

* @param workQueue the queue to use for holding tasks before they are

* executed. This queue will hold only the {@code Runnable}

* tasks submitted by the {@code execute} method.

* @param threadFactory the factory to use when the executor

* creates a new thread

* @param handler the handler to use when execution is blocked

* because the thread bounds and queue capacities are reached

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

corePoolSize设置线程池的核心线程数,当添加新任务时,如果线程池中的线程数小于corePoolSize,则不管当前是否有线程闲置,都会创建一个新的线程来执行任务。

maximunPoolSize是线程池中允许的最大的线程数

workQueue用于存放排队的任务

keepAliveTime是大于corePoolSize的线程闲置的超时时间

handler用于在任务逸出、线程池关闭时的任务处理 ,线程池的线程增长策略为,当前线程数小于corePoolSize时,新增线程,当线程数=corePoolSize且corePoolSize时,只有在workQueue不能存放新的任务时创建新线程,超出的线程在闲置keepAliveTime后销毁。

实现(基于JDK1.8)

ThreadPoolExecutor中保存的状态有

当前线程池状态, 包括RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

当前有效的运行线程的数量。

将这两个状态放到一个int变量中,前三位作为线程池状态,后29位作为线程数量。

例如0b11100000000000000000000000000001, 表示RUNNING, 一个线程。

submit、execute

execute的执行方式为,首先检查当前worker数量,如果小于corePoolSize,则尝试add一个core Worker。线程池在维护线程数量以及状态检查上做了大量检测。

public void execute(Runnable command) {

int c = ctl.get();

// 如果当期数量小于corePoolSize

if (workerCountOf(c) < corePoolSize) {

// 尝试增加worker

if (addWorker(command, true))

return;

c = ctl.get();

}

// 如果线程池正在运行并且成功添加到工作队列中

if (isRunning(c) && workQueue.offer(command)) {

// 再次检查状态,如果已经关闭则执行拒绝处理

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

// 如果工作线程都down了

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

addWorker方法实现

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

// 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

Worker类继承了AbstractQueuedSynchronizer获得了同步等待这样的功能。

private final class Worker

extends AbstractQueuedSynchronihCTXJazer

implements Runnable

{

/**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */

final Thread thread;

/** Initial task to run. Possibly null. */

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

// Lock methods

//

// The value 0 represents the unlocked state.

// The value 1 represents the locked state.

protected boolean isHeldExclusively() {

return getState() != 0;

}

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

public void lock() { acquire(1); }

public boolean tryLock() { return tryAcquire(1); }

public void unlock() { release(1); }

public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

runWorker(Worker)是Worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。Worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();http://

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

ThreadPoolExecutor的submit方法中将Callable包装成FutureTask后交给execute方法。

FutureTask

FutureTask继承于Runnable和Future,FutureTask定义的几个状态为

NEW, 尚未执行

COMPLETING, 正在执行

NORMAL, 正常执行完成得到结果

EXCEPTIONAL, 执行抛出异常

CANCELLED, 执行被取消

INTERRUPTING,执行正在被中断

INTERRUPTED, 已经中断。

其中关键的get方法

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING)

s = awaitDone(false, 0L);

return report(s);

}

先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitDone不断循环获取当前状态,如果没有结果,则将自己通过CAS的方式添加到等待链表的头部,如果设置了超时,则LockSupport.parkNanos到指定的时间。

static final class WaitNode {

volatile Thread thread;

volatile WaitNode next;

WaitNode() { thread = Thread.currentThread(); }

}

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

else if (q == null)

q = new WaitNode();

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

else

LockSupport.park(this);

}

}

FutureTask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为NEW并且将当前线程设置为执行线程,然后调用Callable的call获取结果后设置结果修改FutureTask状态。

public void run() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

http:// try {

Callable c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);

}

if (ran)

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java JVM虚拟机运行机制
下一篇:详解Spring整合Ehcache管理缓存
相关文章

 发表评论

暂时没有评论,来抢沙发吧~