企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 28 人齐了,一起行动—CyclicBarrier详解 > 青年是学习智慧的时期,中年是付诸实践的时期。 > —— 卢梭 上一节我们讲解了 CountDownLatch,它的作用是让多个线程完成后,再促使主线程继续向下执行。不过它有一定的局限性,无法被重复使用。本节我们学习的 CyclicBarrier 不会有这个问题。CyclicBarrier 从字面上理解为循环栅栏。栅栏自然起到的就是屏障的作用,阻止线程通过,而循环则是指其可以反复使用。下面我们就先看看如何使用 CyclicBarrier。 ## 1、CyclicBarrier 的使用 几年前北京的黑车盛行,西二旗地铁口,大量黑车司机在出口招揽生意:“软件园、软件园!5 块一位!还差最后一位!” 。等你上车,发现其实不是还差一位,而是只有你一位。而司机此时绝对不会发车,而是会等车上坐够 4 个人后才出发,然后下一辆黑车再次坐满 4 人后发车。下面我们就使用 CyclicBarrier 来模拟这个场景。 ~~~java public class Client { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> System.out.println("人满了发车") ); IntStream.range(1, 11).forEach(number -> { try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { try { System.out.println("第 " + number + " 乘客上车了!"); cyclicBarrier.await(); System.out.println("第 " + number + " 乘客出发了!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); }); } } ~~~ 代码中首先声明 cyclicBarrier 对象,构造方法有两个参数,第一个参数是计数器初始值,每有一个线程达成则会减 1 。减到 0 时,触发执行第二个参数传入的 Runnable 实现的 run 方法。我这里使用 lambda 的方式简化代码。如果你不需要这个 Runnable 的任务,那么只需要传入第一个参数即可。 接下来的代码中,模拟 10 位乘客上车,每次上车后调用 cyclicBarrier.await() 。这里就是屏障点,此时当前线程会阻塞在此处,并且计数器被减 1 。为了输出的效果便于观看,每次新线程启动前先 sleep 一会。 每当四个乘客完成上车操作,cyclicBarrier 就会触发 “人满了发车” 的操作。而最后两位乘客上车后,由于没有新的乘客上车,计数器不会被减到 0,导致无法越过屏障,所以永远不会发车。 cyclicBarrier运行的示意图如下: ![图片描述](https://img.mukewang.com/5dd5fb100001250a16000799.jpg) 代码运行输出如下: ~~~ 第 1 乘客上车了! 第 2 乘客上车了! 第 3 乘客上车了! 第 4 乘客上车了! 人满了发车 第 4 乘客出发了! 第 1 乘客出发了! 第 2 乘客出发了! 第 3 乘客出发了! 第 5 乘客上车了! 第 6 乘客上车了! 第 7 乘客上车了! 第 8 乘客上车了! 人满了发车 第 8 乘客出发了! 第 5 乘客出发了! 第 7 乘客出发了! 第 6 乘客出发了! 第 9 乘客上车了! 第 10 乘客上车了! ~~~ 可以看到每上车 4 人,才会触发发车,同时每个人的线程才会继续 cyclicBarrier.await() 后面的代码,输出 “第 n 乘客出发了!” 这个例子也验证了 CyclicBarrier 可以重复使用,每次满 4 人上车,都会触发发车。然后重新开始计数。 通过这个例子我们了解了 CyclicBarrier 的使用。在这里我们总结下 CyclicBarrier 涉及的几个概念: 1、计数器。初始值为构造 CyclicBarrier 传入的第一个参数,每当一个线程到达屏障点,计数器减1; 2、屏障点,线程中调用 cyclicBarrier.await() 后,该线程到达屏障点,等待 CyclicBarrier 打开,也就是计数器到 0 ; 3、冲出屏障后的任务。首先这个任务可选。不需要的话,在构造 CyclicBarrier 时只需要传入计数器初始值即可。这个任务在计数器到 0时被触发。 ## 2、CyclicBarrier 原理解析 ### 2.1、 CyclicBarrier 中的属性 ~~~java /** CyclicBarrier使用的拍他锁*/ private final ReentrantLock lock = new ReentrantLock(); /** barrier被冲破前,线程等待的condition*/ private final Condition trip = lock.newCondition(); /** barrier被冲破时,需要满足的参与线程数。*/ private final int parties; /* barrier被冲破后执行的方法。*/ private final Runnable barrierCommand; /** 当其轮次 */ private Generation generation = new Generation(); /** *目前等待剩余的参与者数量。从 parties倒数到0。每个轮次该值会被重置回parties */ private int count; ~~~ 可以看到 CyclicBarrier 内部通过 ReentrantLock 来实现的,而 ReentrantLock 的底层实现还是 AQS。 parties 在构造函数中被赋值,它的值永远不会变,因为 CyclicBarrier 会被重置复用。而每个轮次真正用来计数的变量是 count。每个轮次结束,count 会被重置为 parties 的值。 ### 2.2、 await() 方法解析 await 方法的调用,代表调用线程到达了屏障点,这个方法其实调用了 dowait 方法,我们直接分析 dowait 方法,它实现了 CyclicBarrier 的核心功能。 ~~~java /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //对共享资源count,generation操作前,需要先上锁保证线程安全 lock.lock(); try { //拿到当前轮次对象的引用 final Generation g = generation; //如果已经broken,那么抛出异常 if (g.broken) throw new BrokenBarrierException(); //如果被打断,通过breakBarrier方法设置当前轮次为broken状态,通知当前轮次所有等待的线程线程 //并且抛出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //count减1 int index = --count; //如果index为0,那么冲破屏障点 if (index == 0) { // tripped boolean ranAction = false; //冲破屏障点后,如果CyclicBarrier构造时传入Runnable,则被调用。 try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //这个方法中会进行重置,并且通知所有在屏障点阻塞的线程继续执行。 nextGeneration(); return 0; } finally { //正常情况由于运行了command后ranAction被置为true,并不会执行如下逻辑 //在command执行期间出了异常才会进入下面的逻辑,认为当前轮次被破坏了 if (!ranAction) breakBarrier(); } } //开始自旋,直到屏障被冲破,或者interrupted或者超时 for (;;) { try { if (!timed) //阻塞,此时会释放锁,以让其他线程进入await方法中。等待屏障被冲破后,向后执行 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果当前线程阻塞被interrupt了,并且本轮次还没有被break,那么修改本轮次状态为broken if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //如果本轮次被破坏,那么抛出异常 if (g.broken) throw new BrokenBarrierException(); //如果已经成功进入下一轮次,那么返回index if (g != generation) return index; //如果已经超时,那么本轮次被打破 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁 lock.unlock(); } } ~~~ 以上代码分为两大段逻辑,分别是自旋前,和自旋。 **A、自旋前的逻辑,核心逻辑如下:** 1. 计数器 -1; 2. 判断是否计数器到 0; 3. 如果到了,则冲破屏障点,执行传入的 Runnable; 4. 调用 nextGeneration() 来更新 Generation,重置计数器,并且通知本轮次等待的线程。 **B、如果计数器没有到 0,则进入自旋的逻辑:** 1. 开始等待,此时会释放锁,以让其它线程进入 lock 的代码块执行以上逻辑; 2. 当被唤醒时,可能因为当前 generation 被 break 了,或者计数器到 0,屏障被冲破; 3. 对比边刚进入 dowait 方法时获取的 generation 对象和最新 generation 是否一致。不一致说明已经换代了,也就是屏障被冲破,可以 return 了; 4. 如果等待超时或者 generation 被 break,分别抛出异常。 不同线程在 A 部分的逻辑会影响已经进入 B 部分逻辑的线程中止自旋。这些自旋的线程或者冲破屏障点,继续向下执行,也可能抛出异常。 我们再看下用于更新轮次的方法 nextGeneration(): ~~~java private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } ~~~ 三行代码做了三件事: 1、通知所有被阻塞在本轮次屏障点的线程。屏障点被冲破,可以继续向下执行了; 2、重置计数器为初始值; 3、更新轮次对象。这样自旋中的线程才会跳出自旋。 ## 3、总结 CyclicBarrier 和 CountDownLatch 相比,更为灵活,可以被重复使用。前者可以用来分段任务,假如有个任务需要分三个阶段来完成,每个阶段可以多线程并发执行,但是进入下一个阶段的时候,必须所有线程都完成了第一阶段的执行。那么通过 CyclicBarrier,在每个线程的每个阶段开始前都设置屏障点,可以很轻松地实现。 CyclicBarrier 的实现是通过 ReentrantLock 控制计数器的原子更新,通过条件变量来实现线程同步。