详解Java回环屏障CyclicBarrier

网友投稿 258 2022-11-22


详解Java回环屏障CyclicBarrier

上一篇说的CountDownLatch是一个计数器,类似线程的join方法,但是有一个缺陷,就是当计数器的值到达0之后,再调用CountDownLatch的await和countDown方法就会立刻返回,就没有作用了,那么反正是一个计数器,为什么不能重复使用呢?于是就出现了这篇说的CyclicBarrier,它的状态可以被重用;

一.简单例子

用法其实和CountDownLatch差不多,也就是一个计数器,当计数器的值变为0之后,就会把阻塞的线程唤醒:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class Study0216 {

// 注意这里的构造器,第一个参数表示计数器初始值

// 第二个参数表示当计数器的值变为0的时候就触发的任务

static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {

System.out.println("cyclicBarrier task ");

ZqfyD });

public static void main(String[] args) {

// 新建两个线程的线程池

ExecutorService pool = Executors.newFixedThreadPool(2);

// 线程1放入线程池中

pool.submit(() -> {

try {

System.out.println("Thread1----await-begin");

cyclicBarrier.await();

System.out.println("Thread1----await-end");

} catch (Exception e) {

e.printStackTrace();

}

});

// 线程2放到线程池中

pool.submit(() -> {

try {

System.out.println("Thttp://hread2----await-begin");

cyclicBarrier.await();

System.out.println("Thread2----await-end");

} catch (Exception e) {

e.printStackTrace();

}

});

// 关闭线程池,此时还在执行的任务会继续执行

pool.shutdown();

}

}

我们再看看CyclicBarrier的复用性,这里比如有一个任务,有三部分组成,分别是A,B,C,然后创建两个线程去执行这个任务,必须要等到两个线程都执行完成A部分,然后才能开始执行B,只有两个线程都执行完成B部分,才能执行C:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class Study0216 {

// 这里的构造器,只有一个参数,表示计数器初始值

static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {

// 新建两个线程的线程池

ExecutorService pool = Executors.newFixedThreadPool(2);

// 线程1放入线程池中

pool.submit(() -> {

try {

System.out.println("Thread1----stepA-start");

cyclicBarrier.await();

System.out.println("Thread1----stepB-start");

cyclicBarrier.await();

System.out.println("Thread1----stepC-start");

} catch (Exception e) {

e.printStackTrace();

}

});

// 线程2放到线程池中

pool.submit(() -> {

try {

System.out.println("Thread2----stepA-start");

cyclicBarrier.await();

System.out.println("Thread2----stepB-start");

cyclicBarrier.await();

System.out.println("Thread2----stepC-start");

} catch (Exception e) {

e.printStackTrace();

}

});

// 关闭线程池,此时还在执行的任务会继续执行

pool.shutdown();

}

}

二.基本原理

我们看看一些重要属性:

public class CyclicBarrier {

//这个内部类只有一个boolean值

private static class Generation {

boolean broken = false;

}

//独占锁

private final ReentrantLock lock = new ReentrantLock();

//条件变量

private final Condition trip = lock.newCondition();

//保存线程的总数

private final int parties;

//这是一个任务,通过构造器传递一个任务,当计数器变为0之后,就可以执行这个任务

private final Runnable barrierCommand;

//这类内部之后一个boolean的值,表示屏障是否被打破

private Generation generation = new Generation();

//计数器

private int count;

}

构造器:

//我们的构造器初始值设置的是parties

public CyclicBarrier(int parties) {

this(parties, null);

}

//注意,这里开始的时候是count等于parties

//为什么要有两个变量呢?我们每次调用await方法的时候count减一,当count的值变为0之后,怎么又还原成初始值呢?

//直接就把parties的值赋值给count就行了呀,简单吧!

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw nhttp://ew IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

然后再看看await方法:

public int await() throws InterruptedException, BrokenBarrierException {

try {

//调用的是dowait方法

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

//假设count等于3,有三个线程都在调用这个方法,默认超时时间为0,那么首每次都只有一个线程可以获取锁,将count减一,不为0

//就会到下面的for循环中扔到条件队列中挂起;直到第三个线程调用这个dowait方法,count减一等于0,那么当前线程执行任务之后,

//就会唤醒条件变量中阻塞的线程,并重置count为初始值3

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {

//获取锁

final ReentrantLock lock = this.lock;

lock.lock();

try {

//g中只有一个boolean值

final Generation g = generation;

//如果g中的值为true的时候,抛错

if (g.broken)

throw new BrokenBarrierException();

//如果当前线程中断,就抛错

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

//count减一,再赋值给index

int index = --count;

//如果index等于0的时候,说明所有的线程已经到屏障点了,就可以

if (index == 0) { // tripped

boolean ranAction = false;

try {

//执行当前线程的任务

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

//唤醒其他因为调用了await方法阻塞的线程

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

//能到这里来,说明是count不等于0,也就是还有的线程没有到屏障点

for (;;) {

try {

//wait方法有两种情况,一种是设置超时时间,一种是不设置超时时间

//这里就是对超时时间进行的一个判断,如果设置的超时时间为0,则会在条件队列中无限的等待下去,直到被唤醒

//设置了超时时间,那就等待该时间

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

//释放锁

lock.unlock();

}

}

//唤醒其他因为调用了await方法阻塞的线程

private void nextGeneration() {

//唤醒条件变量中所有线程

trip.signalAll();

//重置count的值

count = parties;

generation = nehttp://w Generation();

}

private void breakBarrier() {

generation.broken = true;

//重置count为初始值parties

count = parties;

//唤醒条件队列中的所有线程

trip.signalAll();

}

以上就是详解Java回环屏障CyclicBarrier的详细内容,更多关于Java CyclicBarrier的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java lambda 表达式中的双冒号的用法说明 ::
下一篇:详解Java线程同步器CountDownLatch
相关文章

 发表评论

暂时没有评论,来抢沙发吧~