java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore

网友投稿 222 2022-09-07


java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore

目录CountDownLatchSemaphoreCyclicBarrier总结

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据,

这时我们就可以使用CountDownLatch实现主线程等待所有sheet线程完成sheet的解析操作后,再继续执行自己的任务。

public class CountDownLatchTest {

private static class WorkThread extends Thread {

private CountDownLatch cdl;

public WorkThread(String name, CountDownLatch cdl) {

super(name);

this.cdl = cdl;

}

public void run() {

System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());

System.out.println(this.getName() + "我要统计每个sheet的行数");

try {

cdl.await();

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis());

}

}

private static class sheetThread extends Thread {

private CountDownLatch cdl;

public sheetThread(String name, CountDownLatch cdl) {

super(name);

this.cdl = cdl;

}

public void run() {

try {

System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());

Thread.sleep(1000); //模拟任务执行耗时

cdl.countDown();

System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis() + " sheet的行数为:" + (int) (Math.random()*100));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) throws Exception {

CountDownLatch cdl = new CountDownLatch(2);

WorkThread wt0 = new WorkThread("WorkThread", cdl );

wt0.start();

sheetThread dt0 = new sheetThread("sheetThread1", cdl);

sheetThread dt1 = new sheetThread("sheetThread2", cdl);

dt0.start();

dt1.start();

}

}

执行结果:

WorkThread启动了,时间为1640054503027

WorkThread我要统计每个sheet的行数

sheetThread1启动了,时间为1640054503028

sheetThread2启动了,时间为1640054503029

sheetThread2执行完了,时间为1640054504031 sheet的行数为:6

sheetThread1执行完了,时间为1640054504031 sheet的行数为:44

WorkThread执行完了,时间为1640054505036

可以看到,首先WorkThread执行await后开始等待,WorkThread在等待sheetThread1和sheetThread2都执行完自己的任务后,WorkThread立刻继续执行后面的代码。

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。

由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。

用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

我们继续根据上面的测试案例流程,一步一步的分析CountDownLatch 源码。

第一步看CountDownLatch的构造方法,传入一个不能小于0的int类型的参数作为计数器

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

/**

* Synchronization control For CountDownLatch.

* Uses AQS state to represent count.

*/

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

setState(count);

}

int getCount() {

return getState();

}

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

看它的注释,说的非常清楚,Sync就是CountDownLatch的同步控制器了,而它也是继承了AQS,并且第3行注释说到使用了AQS的state去代表count值。

第二步就是工作线程调用await()方法

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

如果线程中断,抛出异常,否则开始调用tryAcquireShared(1),其内部类Sync的实现也非常简单,就是判断state也就是CountDownLatch的计数是否等于0,

如果等于0,则该方法返回1,第5行的if判断不成立,否则该方法返回-1,第5行的if判断成立,继续执行doAcquireSharedInterruptibly(1)。

/**

* Acquires in shared interruptible mode.

* @param arg the acquire argument

*/

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

这个方法其实就是去获取共享模式下的锁,获取失败就park住。正如我们测试案例中的WorkThread线程应该次数就被park住了,那么它又是何时被唤醒的呢?

下面就到countDown()方法了

public void countDown() {

sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

tryReleaseShared(1)方法尝试去释放共享锁

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

在for循环中,先获取CountDownLatch的计数也就是当前state,如果等于0返回false,否则将state更新为state-1,并返回最新的state是否等于0。

因此在我们的测试案例中,我们需要调用两次countDown方法,才会将全局的state更新为0,然后继续执行doReleaseShared()方法。

/**

* Release action for shared mode -- signals successor and ensures

* propagation. (Note: For exclusive mode, release just amounts

* to calling unparkSuccessor of head if it needs signal.)

*/

private void doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases. This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

/**

* Wakes up node's successor, if one exists.

*

* @param node the node

*/

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

LockSupport.unpark(s.thread),唤醒线程的方法被调用后,WorkThread线程就可以继续执行了。

至此我们简单分析了整个测试案例中CountDownLatch的代码流程。

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,相当于一个并发控制器,构造的时候传入可供管理的信号量的数值,这个数值就是用来控制并发数量的,

每个线程执行前先通过acquire方法获取信号,执行后通过release归还信号 。每次acquire返回成功后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,

acquire调用就会阻塞,等待有release调用释放信号后,acquire才会得到信号并返回。

下面我们看个测试案例

public class SemaphoreTest {

public static void main(String[] args) {

final Semaphore semaphore = new Semaphore(5);

Runnable runnable = () -> {

try {

semaphore.acquire();

System.out.println(Thread.currentThread().getName() + "获得了信号量>>>>>,时间为" + System.currentTimeMillis());

Thread.sleep(1000);

System.out.println(Thread.currentThread().getName() + "释放了信号量<<<<<,时间为" + System.currentTimeMillis());

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

semaphore.release();

}

};

Thread[] threads = new Thread[10];

for (int i = 0; i < threads.length; i++)

threads[i] = new Thread(runnable);

for (int i = 0; i < threads.length; i++)

threads[i].start();

}

}

执行结果:

Thread-0获得了信号量>>>>>,时间为1640058647604

Thread-1获得了信号量>>>>>,时间为1640058647604

Thread-2获得了信号量>>>>>,时间为1640058647604

Thread-3获得了信号量>>>>>,时间为1640058647605

Thread-4获得了信号量>>>>>,时间为1640058647605

Thread-0释放了信号量<<<<<,时间为1640058648606

Thread-1释放了信号量<<<<<,时间为1640058648606

Thread-5获得了信号量>>>>>,时间为1640058648607

Thread-4释放了信号量<<<<<,时间为1640058648607

Thread-3释放了信号量<<<<<,时间为1640058648607

Thread-7获得了信号量>>>>>,时间为1640058648607

Thread-8获得了信号量>>>>>,时间为1640058648607

Thread-2释放了信号量<<<<<,时间为1640058648606

Thread-6获得了信号量>>>>>,时间为1640058648607

Thread-9获得了信号量>>>>>,时间为164005864860http://7

Thread-7释放了信号量<<<<<,时间为1640058649607

Thread-6释放了信号量<<<<<,时间为1640058649607

Thread-8释放了信号量<<<<<,时间为1640058649607

Thread-9释放了信号量<<<<<,时间为1640058649608

Thread-5释放了信号量<<<<<,时间为1640058649607

我们使用for循环同时创建10个线程,首先是线程 0 1 2 3 4获得了信号量,再后面的10行打印结果中,线程1到5分别释放信号量,相同线程间隔也是1000毫秒,然后线程5 6 7 8 9才能继续获得信号量,而且保持最大获取信号量的线程数小于等于5。

看下Semaphore的构造方法

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

它支持传入一个int类型的permits,一个布尔类型的fair,因此Semaphore也有公平模式与非公平模式。

/**

* Synchronization implementation for semaphore. Uses AQS state

* to represent permits. Subclassed into fair and nonfair

* versions.

*/

abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {

setState(permits);

}

final int getPermits() {

return getState();

}

final int nonfairTryAcquireShared(int acquires) {

for (;;) {

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

protected final boolean tryReleaseShared(int releases) {

for (;;) {

int current = getState();

int next = current + releases;

if (next < current) // overflow

throw new Error("Maximum permit count exceeded");

if (compareAndSetState(current, next))

return true;

}

}

final void reducePermits(int reductions) {

for (;;) {

int current = getState();

int next = current - reductions;

if (next > current) // underflow

throw new Error("Permit count underflow");

if (compareAndSetState(current, next))

return;

}

}

final int drainPermits() {

for (;;) {

int current = getState();

if (current == 0 || compareAndSetState(current, 0))

return current;

}

}

}

第9行代码可见Semaphore也是通过AQS的state来作为信号量的计数的

第12行 getPermits() 方法获取当前的可用的信号量,即还有多少线程可以同时获得信号量

第15行nonfairTryAcquireShared方法尝试获取共享锁,逻辑就是直接将可用信号量减去该方法请求获取的数量,更新state并返回该值。

第24行tryReleaseShared 方法尝试释放共享锁,逻辑就是直接将可用信号量加上该方法请求释放的数量,更新state并返回。

再看下Semaphore的公平锁

/**

* Fair version

*/

static final class FairSync extends Sync {

private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

for (;;) {

if (hasQueuedPredecessors())

return -1;

int available = getState();

int remaining = available - acquires;

TvKXXt if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

}

看尝试获取共享锁的方法中,多了个 if (hasQueuedPredecessors) 的判断,在java多线程6:ReentrantLock,

分析过hasQueuedPredecessors其实就是判断当前等待队列中是否存在等待线程,并判断第一个等待的线程(head.next)是否是当前线程。

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

一组线程同时被唤醒,让我们想到了ReentrantLock的Condition,它的signalAll方法可以唤醒await在同一个condition的所有线程。

下面我们还是从一个简单的测试案例先了解下CyclicBarrier的用法

public class CyclicBarrierTest extends Thread {

private CyclicBarrier cb;

private int sleepSecond;

public CyclicBarrierTest(CyclicBarrier cb, int sleepSecond) {

this.cb = cb;

this.sleepSecond = sleepSecond;

}

public void run() {

try {

System.out.println(this.getName() + "开始, 时间为" + System.currentTimeMillis());

Thread.sleep(sleepSecond * 1000);

cb.await();

System.out.println(this.getName() + "结束, 时间为" + System.currentTimeMillis());

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

Runnable runnable = new Runnable() {

public void run() {

System.out.println("CyclicBarrier的barrierAction开始运行, 时间为" + System.currentTimeMillis());

}

};

CyclicBarrier cb = new CyclicBarrier(2, runnable);

CyclicBarrierTest cbt0 = new CyclicBarrierTest(cb, 3);

CyclicBarrierTest cbt1 = new CyclicBarrierTest(cb, 6);

cbt0.start();

cbt1.start();

}

}

执行结果:

Thread-1开始, 时间为1640069673534

Thread-0开始, 时间为1640069673534

CyclicBarrier的barrierAction开始运行, 时间为1640069679536

Thread-1结束, 时间为1640069679536

Thread-0结束, 时间为1640069679536

可以看到Thread-0和Thread-1同时运行,而自定义的线程barrierAction是在6000毫秒后开始执行,说明Thread-0在await之后,等待了3000毫秒,和Thread-1一起继续执行的。

看下CyclicBarrier 的一个更高级的构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

parties就是设定需要多少线程在屏障前等待,只有调用await方法的线程数达到才能唤醒所有的线程,还有注意因为使用CyclicBarrier的线程都会阻塞在await方法上,所以在线程池中使用CyclicBarrier时要特别小心,如果线程池的线程过少,那么就会发生死锁。

Runnable barrierAction用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

/**

* Main barrier code, covering the various policies.

*/

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

http:// // "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

首先是ReentrantLock加锁,全局的count值-1,然后判断count是否等于0,如果不等于0,则循环,condition执行await等待,直到触发、中断、中断或超时,如果count值等于0,先执行barrierAction线程,然后condition开始唤醒所有等待的线程。

简单是使用之后,有人会觉得CyclicBarrier和CountDownLatch有点像,其实它们两者有些细微的差别:

1:CountDownLatch是在多个线程都进行了latch.countDown()后才会触发事件,唤醒await()在latch上的线程,而执行countDown()的线程,是不会阻塞的;

CyclicBarrier是一个栅栏,用于同步所有调用await()方法的线程,线程执行了await()方法之后并不会执行之后的代码,而只有当执行await()方法的线程数等于指定的parties之后,这些执行了await()方法的线程才会同时运行。

2:CountDownLatch不能循环使用,计数器减为0就减为0了,不能被重置;CyclicBarrier本是就是支持循环使用parties,而且提供了reset()方法,可以重置计数器。

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python 可视化文本的情绪库:PyPlutchik(python怎么读)
下一篇:python入门之后须掌握的知识点(模块化编程、时间模块)【一】python入门之后须掌握的知识点(excel文件处理+邮件发送+实战:批量化发工资条)【2】
相关文章

 发表评论

暂时没有评论,来抢沙发吧~