Java并发编程之ConcurrentLinkedQueue队列详情

网友投稿 445 2022-08-13


Java并发编程之ConcurrentLinkedQueue队列详情

ConcurrentLinkedQueue

JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后则使用CAS非阻塞算法实现。

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile 类型的 Node 节点分别用来存放队列的首、尾节点。从下面的无参构造函数可知,默认头、尾节点都是指向 item 为null 的哨兵节点。新元素会被插入队列末尾,出队时从队列头部获取一个元素。

public ConcurrentLinkedQueue() {

head = tail = new Node(null);

}

在 Node 节点内部则维护一个使用volatile 修饰的变量 item,用来存放节点的值;next用来存放链表的下一个节点,从而链接为一个单向无界链表。其内部则使用 UNSafe 工具类提供的CAS 算法来保证出入队时操作链表的原子性。

下面通过介绍ConcurrentLinkedQueue的几个方法来介绍其实现原理。

offer操作: offer操作是在队列末尾添加一个元素,如果传递的参数是null则抛出NPE异常,否则由于ConcurrentLinkedQueue是无界队列,该方法一直会返回true。另外,由于使用CAS无阻塞算法,因此该方法不会阻塞挂起调用线程。下面具体看下实现原理。

public boolean offer(E e) {

//(1)e为null这抛出空指针异常

checkNotNull(e);

//(2)构造Node节点,在构造函数内部调用unsafe.putObject

final Node newNode = new Node(e);

//(3) 从尾节点插入

for (Node t = tail, p = t;;) {

Node q = p.next;

//(4) 如果q==null说明p是尾节点,则执行插入

if (q == null) {

// p is last node

//(5)使用CAS设置p节点的next节点

if (p.casNext(null, newNode)) {

// Successful CAS is the linearization point

// for e to become an element of this queue,

// and for newNode to become "live".

//(6)CAS成功,则说明新增节点已经放入链表,然后设置当前尾巴节点

if (p != t) // hop two nodes at a time

casTail(t, newNode); // Failure is OK.

return true;

}

// Lost CAS race to another thread; re-read next

}

else if (p == q)

// We have fallen off list. If tail is unchanged, it

// will also be off-list, in which case we need to

// jump to head, from which all live nodes are always

// reachable. Else the new tail is a better bet.

p = (t != (t = tail)) ? t : head;

else

// Check for tail updates after two hops.

p = (p != t && t != (t = tail)) ? t : q;

}

}

首先看当一个线程调用offer(item)时的情况。首先代码(1)对传参进行空检查, 由于使用 如果为null 则抛出NPE 异常,否则执行代码(2)并使用item作为构造函数参数创建一 个新的节点,然后代码(3)从队列尾部节点开始循环,打算从队列尾部添加元素。这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的next 节点为null,所以这里q也指向null。代码(4)发现q->null则执行代码(5),通过CAS 原子操作判断p节点的next节点是否为null,如果为null 则使用节点newNode替换p的next节点,然后执行代码(6),这里由于p=t所以没有设置尾部节点,然后退出 offer方法。上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用 ofer(item2),同时执行到代码(5)p.casNext(null, newNode)。由于CAS的比较设置操作是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的 next 节点确实是null,则会原子性地更新next节点为iteml,这时候线程B也会判断p的next节点是否为null,结果发现不是null(因为线程A已经设置了p的next节点为iteml),则会跳到代码(3),然后执行到代码(4)。

可见,offer 操作中的关键步骤是代码(5),通过原子CAS 操作来控制某时只有一个线程可以追加元素到队列末尾。进行CAS 竞争失败的线程会通过循环一次次尝试进行 CAS操作,直到CAS 成功才会返回,也就是通过使用无限循环不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。

add操作:

add操作是在链表尾部添加一个元素,其实在内部调用的还是offer操作。

public boolean add(E e) {

return offer(e);

}

poll操作:

poll操作是在队列头部获取并移除一个元素,如果队列为空则返回null。

public E poll() {

restartFromHead:

for (;;) {

for (Node h = head, p = h, q;;) {

E item = p.item;

if (item != null && p.casItem(item, null)) {

// Successful CAS is the linearization point

// for item to be removed from this queue.

if (p != h) // hop two nodes at a time

updateHead(h, ((q = p.next) != null) ? q : p);

return item;

}

else if ((q = p.next) == null) {

updateHead(h, p);

return null;

}

else if (p == q)

continue restartFromHead;

else

p = q;

}

}

}

poll方法在移除一个元素时,只是简单地使用aumbjqDdp CAS操作把当前节点的item值设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。

peak操作:

peak操作是获取队列头部获一个元素,如果队列为空则返回null。

public E peek() {

restartFromHead:

for (;;) {

for (Node h = head, p = h, q;;) {

E item = p.item;

//注释

if (item != null || (q = p.next) == null) {

updateHead(h, p);

return item;

}

else if (p == q)

continue restartFromHead;

else

p = q;

}

}

}

Peek操作的代码结构与poll操作类似,不同之处在于我们在代码中标记注释的地方中少了castItem操作。其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一次执行peek时在注释处会发现item==null,然后执行q=p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:深入理解Java并发编程之LinkedBlockingQueue队列
下一篇:Java工具之ja
相关文章

 发表评论

暂时没有评论,来抢沙发吧~