Java 多线程并发AbstractQueuedSynchronizer详情

网友投稿 238 2022-07-22


目录AbstractQueuedSynchronizer核心思想为什么需要 AQS用法用法示例AQS 底层原理父类 AbstractOwnableSynchronizerCLH 队列Condition用于等待的方法用于唤醒的方法ConditionObjectSignalling methodsWaiting methodsenableWaitcanReacquireunlinkCancelledWaiters对外提供的等待方法awaitUninterruptiblyawaitawaitNanosawaitUntilawait(long, TimeUnit)acquire 方法

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 简称 AQS ,抽象队列同步器,用来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。这个类旨在为大多数依赖单个原子 int 值来表示同步状态的同步器提供基础的能力封装。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基于 AQS 实现的,我们也可以继承 AQS 实现自定义同步器。

核心思想

网络上常见的解释是:

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

个人理解,可以把 AQS 当成一把锁,它内部通过一个队列记录了所有要使用锁的请求线程,并且管理锁自己当前的状态(锁定、空闲等状态)。相当于 AQS 就是共享资源本身,当有线程请求这个资源是,AQS 将请求资源的线程记录当前工作线程,并将自身设置为锁定状态。后续其他线程请求这个 AQS 时,将请求线程记录到等待队列中,其他线程此时未获取到锁,进入阻塞等待状态。

为什么需要 AQS

在深入 AQS 前,我们应该持有一个疑问是为什么需要 AQS ?synchronized 关键字和 CAS 原子类都提供了丰富的同步方案了。

但在实际的需求中,对同步的需求是各式各样的,比如,我们需要对一个锁加上超时时间,那么光凭 synchronized 关键字或是 CAShttp:// 就无法实现了,需要对其进行二次封装。而 JDK 中提供了丰富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基于 AQS 实现的。

用法

这部分内容来自 JDK 的注释

要将此类用作同步器的基础,请在适用时重新定义以下方法,方法是使用 getState、setState 和/或 compareAndSetState 检查和/或修改同步状态:

tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively

默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该是短暂的而不是阻塞的。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为最终方法,因为它们不能独立变化。

您可能还会发现从 AbstractOwnableSynchronizer 继承的方法对于跟踪拥有独占同步器的线程很有用。 鼓励您使用它们——这使监视和诊断工具能够帮助用户确定哪些线程持有锁。

即使此类基于内部 FIFO 队列,它也不会自动执行 FIFO 采集策略。

独占同步的核心形式为:

Acquire:

while (!tryAcquire(arg)) {

enqueue thread if it is not already queued;

possibly block current thread;

}

Release:

if (tryRelease(arg))

unblock the first queued thread;

(共享模式类似,但可能涉及级联信号。)

因为在入队之前调用了获取中的检查,所以新获取的线程可能会抢在其他被阻塞和排队的线程之前。 但是,如果需要,您可以定义 tryAcquire 和/或 tryAcquireShared 以通过内部调用一个或多个检查方法来禁用插入,从而提供公平的 FIFO 获取顺序。 特别是,如果 hasQueuedPredecessors(一种专门为公平同步器使用的方法)返回 true,大多数公平同步器可以定义 tryAcquire 返回 false。 其他变化是可能的。

默认插入(也称为贪婪、放弃和避免护送)策略的吞吐量和可扩展性通常最高。 虽然这不能保证公平或无饥饿,但允许较早排队的线程在较晚的排队线程之前重新竞争,并且每次重新竞争都有无偏见的机会成功对抗传入线程。 此外,虽然获取不是通常意义上的“旋转”,但它们可能会在阻塞之前执行多次调用 tryAcquire 并穿插其他计算。 当独占同步只是短暂地保持时,这提供了自旋的大部分好处,而没有大部分责任。 如果需要,您可以通过预先调用获取具有“快速路径”检查的方法来增加这一点,可能会预先检查 hasContended 和/或 hasQueuedThreads 以仅在同步器可能不会被争用时才这样做。

此类通过将其使用范围专门用于可以依赖 int 状态、获取和释放参数以及内部 FIFO 等待队列的同步器,部分地为同步提供了高效且可扩展的基础。 如果这还不够,您可以使用原子类、您自己的自定义 java.util.Queue 类和 LockSupport 阻塞支持从较低级别构建同步器。

用法示例

这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。 虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。

它还支持条件并公开一些检测方法:

class Mutex implements Lock, java.io.Serializable {

// Our internal helper class

private static class Sync extends AbstractQueuedSynchronizer {

// Acquires the lock if state is zero

public boolean tryAcquire(int acquires) {

assert acquires == 1; // Otherwise unused

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

// Releases the lock by setting state to zero

protected boolean tryRelease(int releases) {

assert releases == 1; // Otherwise unused

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

setExclusiveOwnerThread(null);

setState(0);

return true;

}​

// Reports whether in locked state

public boolean isLocked() {

return getState() != 0;

}

public boolean isHeldExclusively() {

// a data race, but safe due to out-of-thin-air guarantees

return getExclusiveOwnerThread() == Thread.currentThread();

}​

// Provides a Condition

public Condition newCondition() {

return new ConditionObject();

}

// Deserializes properly

private void readObject(ObjectInputStream s)

throws IOException, ClassNotFoundException {

s.defaultReadObject();

setState(0); // reset to unlocked state

}

}

// The sync object does all the hard work. We just forward to it.

private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }

public boolean tryLock() { return sync.tryAcquire(1); }

public void unlock() { sync.release(1); }

public Condition newCondition() { return sync.newCondition(); }

public boolean isLocked() { return sync.isLocked(); }

public boolean isHeldByCurrentThread() {

return sync.isHeldExclusively();

}

public boolean hasQueuedThreads() {

return sync.hasQueuedThreads();

}

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

public boolean tryLock(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireNanos(1, unit.toNanos(timeout));

}

}

这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。 因为锁存器是非独占的,所以它使用共享的获取和释放方法。

class BooleanLatch {​

private static class Sync extends AbstractQueuedSynchronizer {

boolean isSignalled() { return getState() != 0; }

protected int tryAcquireShared(int ignore) {

return isSignalled() ? 1 : -1;

}

protected boolean tryReleaseShared(int ignore) {

setState(1);

return true;

}

}

private final Sync sync = new Sync();

public boolean isSignalled() { return sync.isSignalled(); }

public void signal() { sync.releaseShared(1); }

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

}

AQS 底层原理

父类 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {​

private static final long serialVersionUID = 3737899427754241961L;​

protected AbstractOwnableSynchronizer() { }

private transient Thread exclusiveOwnerThread;

// 设置当前持有锁的线程

protected final void setExclusiveOwnerThread(Thread thread) {

exclusiveOwnerThread = thread;

}

protected final Thread getExclusiveOwnerThread() {

return exclusiveOwnerThread;

}

}

AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。

CLH 队列

AQS 的等待队列是 CLH (Craig , Landin , and Hagersten) 锁定队列的变体,CLH 锁通常用于自旋锁。AQS 将每个请求共享资源的线程封装程一个 CLH 节点来实现的,这个节点的定义是:

/** CLH Nodes */

abstract static class Node {

volatile Node prev; // initially attached via casTail

volatile Node next; // visibly nonnull when signallable

Thread waiter; // visibly nonnull when enqueued

volatile int status; // written by owner, atomic bit ops by others

// methods for atomic operations

final boolean casPrev(Node c, Node v) { // for cleanQueue

return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值

}

final boolean casNext(Node c, Node v) { // for cleanQueue

return U.weakCompareAndSetReference(this, NEXT, c, v);

}

final int getAndUnsetStatus(int v) { // for signalling

return U.getAndBitwiseAndInt(this, STATUS, ~v);

}

final void setPrevRelaxed(Node p) { // for off-queue assignment

U.putReference(this, PREV, p);

}

final void setStatusRelaxed(int s) { // for off-queue assignment

U.putInt(this, STATUS, s);

}

final void clearStatus() { // for reducing unneeded signals

U.putIntOpaque(this, STATUS, 0);

}

private static final long STATUS = U.objectFieldOffset(Node.class, "status");

private static final long NEXT = U.objectFieldOffset(Node.class, "next");

private static final long PREV = U.objectFieldOffset(Node.class, "prev");

}

CLH 的节点的数据结构是一个双向链表的节点,只不过每个操作都是经过 CAS 确保线程安全的。要加入 CLH 锁队列,您可以将其自动拼接为新的尾部;要出队,需要设置 head 字段,以便下一个符合条件的等待节点成为新的头节点:

+------+ prev +-------+ prev +------+

| | <---- | | <---- | |

| head | next | first | next | tail |

| | ----> | | ----> | |

+------+ +-------+ +------+

Node 中的 status 字段表示当前节点代表的线程的状态。

status 存在三种状态:

static final int WAITING = 1; // must be 1

static final int CANCELLED = 0x80000000; // must be negative

static final int COND = 2; // in a condition wait

WAITING:表示等待状态,值为 1。CANCELLED:表示当前线程被取消,为 0x80000000。COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。

在上面的 COND 中,提到了一个条件等待队列的概念。

首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:

ExclusiveNodeSharedNodeConditionNode

前两者都是空实现:

static final class ExclusiveNode extends Node { }

static final class SharedNode extends Node { }

而最后的 ConditionNode 多了些内容:

static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {

ConditionNode nextWaiter;

// 检查线程是否中断或当前线程的状态已取消等待。

public final boolean isReleasable() {

return status <= 1 || Thread.currentThread().isInterrupted();

}

public final boolean block() {

while (!isReleasable()) LockSupport.park();

return true;

}

}

ConditionNode 拓展了两个方法:

检查线程状态是否处于等待。阻塞当前线程:当前线程正在等待执行,通过 LockSupport.park() 阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。

而到这一步,所有的信息都指向了一个相关的类 Condition 。

Condition

AQS 中的 Condition 的实现是内部类 ConditionObject :

public class ConditionObject implements Condition, java.io.Serializable

ConditionObject 实现了 Condition 接口和序列化接口,后者说明了该类型的对象可以进行序列化。而前者 Condition 接口,定义了一些行为能力:

public interface Condition {

void await() throws InterruptedException;​

void awaitUninterruptibly();​

long awaitNanos(long nanosTimeout) throws InterruptedException;​

boolean await(long time, TimeUnit unit) throws InterruptedException;​

boolean awaitUntil(Date deadline) throws InterruptedException;​

void signal();

void signalAll();

}

Condition 中定义的能力与 Java 的 Object 类中提供的同步相关方法(wait、notify 和 notifyAll) 代表的能力极为相似。前者提供了更丰富的等待方法。类比的角度来看,如果 Object 是配合 synchronized 关键字使用的,那么 Condition 就是用来配合基于 AQS 实现的锁来使用的接口。

可以将 Condition 的方法分为两组:等待和唤醒。

用于等待的方法

// 等待,当前线程在接到信号或被中断之前一直处于等待状态

void await() throws InterruptedException;

// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断

void awaitUninterruptibly();

//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态

long awaitNanos(long nanosTimeout) throws InterruptedException;

// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。

// 此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0

boolean await(long time, TimeUnit unit) throws InterruptedException;

// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态

boolean awaitUntil(Date deadline) throws InterruptedException;

用于唤醒的方法

// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

void signal();

// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

void signalAll();

ConditionObject

分析完 Condition ,继续来理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的实现:

public class ConditionObject implements Condition, java.io.Serializable {

/** condition 队列头节点 */

private transient ConditionNode firstWaiter;

/** condition 队列尾节点 */

private transient ConditionNode lastWaiter;

// ---- Signalling methods ----

// 移除一个或所有等待者并将其转移到同步队列。

private void doSignal(ConditionNode first, boolean all)

public final void signal()

public final void signalAll()​

// ---- Waiting methods ----

// 将节点添加到条件列表并释放锁定。

private int enableWait(ConditionNode node)

// 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。

private boolean canReacquire(ConditionNode node) ​

// 从条件队列中取消链接给定节点和其他非等待节点,除非已经取消链接。

private void unlinkCancelledWaiters(ConditionNode node)

// 实现不可中断的条件等待

public final void awaitUninterruptibly()​

public final void await()​

public final long awaitNanos(long nanosTimeout)​

public final boolean awaitUntil(Date deadline)​

public final boolean await(long time, TimeUnit unit)​

// ---- support for instrumentation ----​

// 如果此条件是由给定的同步对象创建的,则返回 true。

final boolean isOwnedBy(AbstractQueuedSynchronizer sync)​

// 查询是否有线程在此条件下等待。

protected final boolean hasWaiters()​

// 返回在此条件下等待的线程数的估计值。

protected final int getWaitQueueLength()

// 返回一个集合,其中包含可能正在等待此 Condition 的那些线程。

protected final Collection getWaitingThreads()

}

ConditionObject 实现了 Condition 能力的基础上,拓展了对 ConditionNode 相关的操作,方法通过其用途可以划分为三组:

SignallingWaiting其他方法

Signalling methods

public final void signal() {

ConditionNode first = firstWaiter;

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

if (first != null)

doSignal(first, false);

}

public final void signalAll() {

ConditionNode first = firstWaiter;

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

if (first != null)

doSignal(first, true);

}

唤醒方法主要逻辑是通过 doSignal(ConditionNode first, boolean all) 实现的。doSignal 方法根据参数,进行一个 while 循环,

两个方法传递进来的都是头节点,也就是从 ConditionNode 双向链表的头节点开始遍历,如果第二个参数 all 设置为 false ,只执行一次遍历中逻辑。循环中的逻辑是:

// 最终都调用了这个方法

private void doSignal(ConditionNode first, boolean all) {

while (first != null) {

// 取出 first 的下一个节点,设置为 next

ConditionNode next = first.nextWaiter;

// 如果 first 是链表中唯一的一个节点,设置 lastWaiter 为 null

if ((firstWaiter = next) == null) //

lastWaiter = null;

// 读取 first 的 status ,检查是否是 COND

if ((first.getAndUnsetStatus(COND) & COND) != 0) {

// first 处于 COND 状态,出队

enqueue(first);

// 通过 all 来判断是否将等待的线程都进行唤醒逻辑。

if (!all)

break;

}

first = next; // 循环指向下一个

}

}

关键方法 enqueue(ConditionNode) 是 AQS 中的方法:

final void enqueue(Node node) {

if (node != null) {

for (;;) {

// 获取尾节点

Node t = tail;

// 避免不必要的内存屏障

node.setPrevRelaxed(t);

if (t == null)

// 空队列首先初始化一个头节点

tryInitializeHead();

else if (casTail(t, node)) { // 更新 tail 指针为 node (这里不是将 t = node)

t.next = node; // 为节点 t 的 next 指针指向 node

if (t.status < 0) // t 的状态 < 0 一般代表后续节点需要运行了

LockSupport.unpark(node.waiter);

break;

}

}

}

}

可以看出 enqueue(ConditionNode) 中本质上是通过调用 LockSupport.unpark(node.waiter); 来唤醒线程的。

Waiting methods

对外提供的等待能力的方法包括:

// 实现不可中断的条件等待

public final void awaitUninterruptibly()

public final void await()​

public final long awaitNanos(long nanosTimeout)​

public final boolean awaitUntil(Date deadline)

public final boolean await(long time, TimeUnit unit)

它们内部都用到了公共的逻辑:

// 添加节点到 condition 列表并释放锁

private int enableWait(ConditionNode node)

private boolean canReacquire(ConditionNode node)

private void unlinkCancelledWaiters(ConditionNode node)

enableWait

private int enableWait(ConditionNode node) {

if (isHeldExclusively()) { // 如果是当前线程持有锁资源

node.waiter = Thread.currentThread(); // 将节点的绑定的线程设置为当前线程

node.setStatusRelaxed(COND | WAITING); // 设置节点状态

ConditionNode last = lastWaiter; // 获取 尾节点

if (last == null)

firstWaiter = node; // 如果列表为空, node 就是头节点

else

last.nextWaiter = node; // 否则,将尾节点的下一个节点设置为 node

lastWaiter = node; // 更新 lastWaiter 指针

int savedState = getState(); // 获取当前线程的同步状态

if (release(savedState)) // 在当前持有锁资源的线程尝试释放锁

return savedState;

}

node.status = CANCELLED; // 当前线程未持有锁资源,更新 node 的状态为 CANCELLED

throw new IllegalMonitorStateException(); // 并抛出 IllegalMonitorStateException

}

这个方法对传入的节点插入到等待队列的队尾,并根据当前线程的状态进行了检查。关键方法的 release(int) :

public final boolean release(int arg) {

if (tryRelease(arg)) { // 尝试释放锁资源

signalNext(head); // 释放成功,唤醒下一个等待中的线程

return true;

}

return false;

}

唤醒给定节点的下一个节点(如果存在),通过调用 LockSupport.unpark(s.waiter) 唤醒节点对应的线程。

private static void signalNext(Node h) {

Node s;

if (h != null && (s = h.next) != null && s.status != 0) {

s.getAndUnsetStatus(WAITING);

LockSupport.unpark(s.waiter);

}

}

canReacquire

检查传入的 node 是否在链表中,且不为头节点:

// 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。

private boolean canReacquire(ConditionNode node) {

// 检查传入的 node 是否在链表中,且不为头节点

return node != null && node.prev != null && isEnqueued(node);

}

// in AQS

final boolean isEnqueued(Node node) {

// 从 Node 双向链表尾部开始遍历,是否存在 node

for (Node t = tail; t != null; t = t.prev)

if (t == node)

return true;

return false;

}

unlinkCancelledWaiters

private void unlinkCancelledWaiters(ConditionNode node) {

// node 为空 / node 不是队尾 / node 是最后一个节点

if (node == null || node.nextWaiter != null || node == lastWaiter) {

ConditionNode w = firstWaiter, trail = null; // w = first , trail = null

// /从链表头节点开始遍历

while (w != null) {

ConditionNode next = w.nextWaiter; // 取出下一个节点

if ((w.status & COND) == 0) { // 当前节点的状态包含 COND

w.nextWaiter = null; // 当前节点的 next 设置为 null

if (trail == null) // 如果 trail 指针为空

firstWaiter = next; // firstWaiter 指向 next

else

trail.nextWaiter = next; // trail 指针不为空,尾指针的 next 指向当前节点的下一个节点

if (next == null)

lastWaiter = trail; // 最后将 lastWaiter 设置为 trail (过滤后的 trail 链表插入到队尾)

} else

trail = w; // 头节点状态不是 COND,当前节点设置为 trail 指针。

w = next; // 下一个循环

}

}

}

这个方法遍历 ConditionNode 队列,过滤掉状态不包含 COND 的节点。

对外提供的等待方法

上面三个方法是内部处理逻辑。而对外暴露的是以下五个方法:

public final void awaitUninterruptibly()​

public final void await()​

public final long awaitNanos(long nanosTimeout)​

public final boolean awaitUntil(Date deadline)​

public final boolean await(long time, TimeUnit unit)

除了awaitUninterruptibly() ,其他方法所代表的能力和 Condition 接口中定义的所代表的能力基本一致。

awaitUninterruptibly

awaitUninterruptibly() 是用于实现不可中断的条件等待:

public final void awaitUninterruptibly() {

ConditionNode node = new ConditionNode(); // 创建一个新的 node

int savedState = enableWait(node); // 将这个新 node 插入,并返回 node 的状态

LockSupport.setCurrentBlocker(this); // 设置 blocker

boolean interrupted = false, rejected = false; // flag:中断和拒绝

while (!canReacquire(node)) { // 当前线程关联的 node 不再等待队列

if (Thread.interrupted()) // 尝试中断线程

interrupted = true;

else if ((node.status & COND) != 0) { // 中断线程不成功的情况下,如果 node 状态包含 COND

// 尝试阻塞线程

try {

if (rejected)

node.block(); // 实际上也是 LockSupport.park

else

ForkJoinPool.managedBlock(node);

} catch (RejectedExecutionException ex) {

rejected = true; // 拒绝执行

} catch (InterruptedException ie) {

interrupted = true; // 中断

}

} else

Thread.onSpinWait(); // 当前线程无法继续执行

}

// 不是队列中的唯一节点时执行下面逻辑

LockSupport.setCurrentBlocker(null);

node.clearStatus(); // 清除 node 的 status

acquire(node, savedState, false, false, false, 0L); // 【*】重点方法

if (interrupted)

Thread.currentThread().interrupt();

}

在这个方法中,首先讲解两个方法:

Thread.onSpinWait() 表示调用者暂时无法继续,直到其他活动发生一个或多个动作。 通过在自旋等待循环构造的每次迭代中调用此方法,调用线程向运行时指示它正忙于等待。 运行时可能会采取措施来提高调用自旋等待循环构造的性能。ForkJoinPool.managedBlock(node) 则是通过 Blocker 来检查线程的运行状态,然后尝试阻塞线程。

最后是最关键的方法 acquire ,它的详细逻辑放到最后讲解, 这个方法的作用就是,当前线程进入等待后,需要将关联的线程开启一个自旋,挂起后能够持续去尝试获取锁资源。

await

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

ConditionNode node = new ConditionNode();

int savedState = enableWait(node);

LockSupport.setCurrentBlocker(this); // for back-compatibility

boolean interrupted = false, cancelled = false, rejected = false;

while (!canReacquire(node)) {

if (interrupted |= Thread.interrupted()) {

if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)

break; // else interrupted after signal

} else if ((node.status & COND) != 0) {

try {

if (rejected)

node.block();

else

ForkJoinPool.managedBlock(node);

} catch (RejectedExecutionException ex) {

rejected = true;

} catch (InterruptedException ie) {

interrupted = true;

}

} else

Thread.onSpinWait(); // awoke while enqueuing

}

LockSupport.setCurrentBlocker(null);

node.clearStatus();

acquire(node, savedState, false, false, false, 0L);

if (interrupted) {

if (cancelled) {

unlinkCancelledWaiters(node);

throw new InterruptedException();

}

Thread.currentThread().interrupt();

}

}

await() 方法相较于 awaitUninterruptibly(),while 逻辑基本一致,最后多了一步 cancelled 状态检查,如果 cancelled = true ,调用 unlinkCancelledWaiters(node),去清理等待队列。

awaitNanos

awaitNanos(long) 在 await() 之上多了对超时时间的计算和处理逻辑:

public final long awaitNanos(long nanosTimeout)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

ConditionNode node = new ConditionNode();

int savedState = enableWait(node);

long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;

long deadline = System.nanoTime() + nanos;

boolean cancelled = false, interrupted = false;

while (!canReacquire(node)) {

if ((interrupted |= Thread.interrupted()) ||

(nanos = deadline - System.nanoTime()) <= 0L) { // 多了一个超时条件

if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)

break;

} else

LockSupport.parkNanos(this, nanos);

}

node.clearStatus();

acquire(node, savedState, false, false, false, 0L);

if (cancelled) {

unlinkCancelledWaiters(node);

if (interrupted)

throw new InterruptedException();

} else if (interrupted)

Thread.currentThread().interrupt();

long remaining = deadline - System.nanoTime(); // avoid overflow

return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;

}

awaitUntil

awaitUntil(Date) 和 awaitNanos(long) 同理,只是将超时计算改成了日期计算:

long abstime = deadline.getTime();

// ...

boolean cancelled = false, interrupted = false;

while http://(!canReacquire(node)) {

if ((interrupted |= Thread.interrupted()) ||

System.currentTimeMillis() >= abstime) { // 时间检查

if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)

break;

} else

LockSupport.parkUntil(this, abstime);

}

await(long, TimeUnit)

await(long, TimeUnit) 则是逻辑更加与 awaitNanos(long) 相似了, 只是多了一步计算 awaitNanos(long nanosTimeout) 中的参数 nanosTimeout 的操作:

long nanosTimeout = unit.toNanos(time);

acquire 方法

在 wait 方法组中,最终都会调用到这个逻辑:

final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {

Thread current = Thread.currentThread();

byte spins = 0, postSpins = 0; // 在取消第一个线程时重试

boolean interrupted = false, first = false;

Node pred = null; // 入队时节点的前一个指针

/*

* 反复执行:

* 检查当前节点是否是 first

* 若是, 确保 head 稳定,否则确保有效的 prev

* 如果节点是第一个或尚未入队,尝试获取

* 否则,如果节点尚未创建,则创建这个它

* 否则,如果节点尚未入队,尝试入队一次

* 否则,如果通过 park 唤醒,重试,最多 postSpins 次

* 否则,如果 WAITING 状态未设置,设置并重试

* 否则,park 并且清除 WAITING 状态, 检查取消逻辑

*/

for (;;) {

if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {

if (pred.status < 0) {

cleanQueue(); // predecessor cancelled

continue;

} else if (pred.prev == null) {

Thread.onSpinWait(); // ensure serialization

continue;

}

}

if (first || pred == null) {

boolean acquired;

try {

if (shared)

acquired = (tryAcquireShared(arg) >= 0);

else

acquired = tryAcquire(arg);

} catch (Throwable ex) {

cancelAcquire(node, interrupted, false);

throw ex;

}

if (acquired) {

if (first) {

node.prev = null;

head = node;

pred.next = null;

node.waiter = null;

if (shared)

signalNextIfShared(node);

if (interrupted)

current.interrupt();

}

return 1;

}

}

if (node == null) { // allocate; retry before enqueue

if (shared)

node = new SharedNode();

else

node = new ExclusiveNode();

} else if (pred == null) { // try to enqueue

node.waiter = current;

Node t = tail;

node.setPrevRelaxed(t); // avoid unnecessary fence

if (t == null)

tryInitializeHead();

else if (!casTail(t, node))

node.setPrevRelaxed(null); // back out

else

t.next = node;

} else if (first && spins != 0) {

--spins; // reduce unfairness on rewaits

Thread.onSpinWait();

} else if (node.status == 0) {

node.status = WAITING; // enable signal and recheck

} else {

long nanos;

spins = postSpins = (byte)((postSpins << 1) | 1);

if (!timed)

LockSupport.park(this);

else if ((nanos = time - System.nanoTime()) > 0L)

LockSupport.parkNanos(this, nanos);

else

break;

node.clearStatus();

if ((interrupted |= Thread.interrupted()) && interruptible)

break;

}

}

return cancelAcquire(node, interrupted, interruptible);

}

这个方法会在 Node 关联的线程让出锁资源后,开启一个死循环尝试通过 tryAcquire 尝试获取锁资源,最后如果超时或尝试次数超出限制,会通过 LockSupport.park 阻塞自身。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JavaFx实现登录成功跳转到程序主页面
下一篇:JavaFX实现界面跳转
相关文章

 发表评论

暂时没有评论,来抢沙发吧~