详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

网友投稿 354 2022-12-06


详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

public class {

//Synchronization control For CountDownLatch.

//Uses AQS state to represent count.

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

setState(count);

}

int getCount() {

return getState();

}

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

private final Sync sync;

... ...//

}

public class CyclicBarrier {

/**

* Each use of the barrier is represented as a generation instance.

* The generation changes whenever the barrier is tripped, or

* is reset. There can be many generations associated with threads

* using the barrier - due to the non-deterministic way the WPQxQaklock

* may be allocated to waiting threads - but only one of these

* can be active at a time (the one to which {@code count} applies)

* and all the rest are either broken or tripped.

* There need not be an active generation if there has been a break

* but no subsequent reset.

*/

private static class Generation {

boolean broken = false;

}

/** The lock for guarding barrier entry */

private final ReentrantLock lock = new ReentrantLock();

/** Condition to wait on until tripped */

private final Condition trip = lock.newCondition();

/** The number of parties */

private final int parties;

/* The command to run when tripped */

private final Runnable barrierCommand;

/** The current generation */

private Generation generation = new Generation();

/**

* Number of parties still waiting. Counts down from parties to 0

* on each generation. It is reset to parties on each new

* generation or when broken.

*/

private int count;

/**

* Updates state on barrier trip and wakes up everyone.

* Called only while holding lock.

*/

private void nextGeneration() {

// signal completion of last generation

trip.signalAll();

// set up next generation

count = parties;

generation = new Generation();

}

/**

* Sets current barrier generatiWPQxQakon as broken and wakes up everyone.

* Called only while holding lock.

*/

private void breakBarrier() {

generation.broken = true;

count = parties;

trip.signalAll();

}

/**

* Main barrier code, covering the various policies.

*/

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

... ... //

}

实战 - 展示各自的使用场景

/**

*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行

*/

public class UseCountDownLatch {

static CountDownLatch latch = new CountDownLatch(6);

/*初始化线程*/

private static class InitThread implements Runnable{

public void run() {

System.out.println("Thread_"+Thread.currentThread().getId()

+" ready init work......");

latch.countDown();

for(int i =0;i<2;i++) {

System.out.println("Thread_"+Thread.currentThread().getId()

+" ........continue do its work");

}

}

}

/*业务线程等待latch的计数器为0完成*/

private static class BusiThread implements Runnable{

public void run() {

try {

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

for(int i =0;i<3;i++) {

System.out.println("BusiThread_"+Thread.currentThread().getId()

+" do business-----");

}

}

}

public static void main(String[] args) throws InterruptedException {

new Thread(new Runnable() {

public void run() {

SleepTools.ms(1);

System.out.println("Thread_"+Thread.currentThread().getId()

+" ready init work step 1st......");

latch.countDown();

System.out.println("begin step 2nd.......");

SleepTools.ms(1);

System.out.println("Thread_"+Thread.currentThread().getId()

+" ready init work step 2nd......");

latch.countDown();

}

}).start();

new Thread(new BusiThread()).start();

for(int i=0;i<=3;i++){

Thread thread = new Thread(new InitThread());

thread.start();

}

latch.await();

System.out.println("Main do ites work........");

}

}

/**

*类说明:共4个子线程,他们全部完成工作后,交出自己结果,

*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串

*/

class UseCyclicBarrier {

private static CyclicBarrier barrier

= new CyclicBarrier(4,new CollectThread());

//存放子线程工作结果的容器

private static ConcurrentHashMap resultMap

= new ConcurrentHashMap();

public static void main(String[] args) {

for(int i=0;i<4;i++){

Thread thread = new Thread(new SubThread());

thread.start();

}

}

/*汇总的任务*/

private static class CollectThread implements Runnable{

@Override

public void run() {

StringBuilder result = new StringBuilder();

for(Map.Entry workResult:resultMap.entrySet()){

result.append("["+workResult.getValue()+"]");

}

System.out.println(" the result = "+ result);

System.out.println("do other business........");

}

}

/*相互等待的子线程*/

private static class SubThread implements Runnable{

@Override

public void run() {

long id = Thread.currentThread().getId();WPQxQak

resultMap.put(Thread.currentThread().getId()+"",id);

try {

Thread.sleep(1000+id);

System.out.println("Thread_"+id+" ....do something ");

barrier.await();

Thread.sleep(1000+id);

System.out.println("Thread_"+id+" ....do its business ");

barrier.await();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Security 实现“记住我”功能及原理解析
下一篇:Spring Security 实现短信验证码登录功能
相关文章

 发表评论

暂时没有评论,来抢沙发吧~