在实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等待其他线程都完成某一阶段后再执行,等所有线程都到达某一个阶段后再统一执行。
JDK:
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
对于失败的同步尝试,CyclicBarrier 使用了一种快速失败的、要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么其他所有线程(甚至是那些尚未从以前的 await() 中恢复的线程)也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
JDK地址:
//构造方法摘要 CyclicBarrier(int parties) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。 CyclicBarrier(int parties, Runnable barrierAction) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
Example1:
package tags; import java.util.Random; import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier调用CyclicBarrier.await()进入等待的线程数, * 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 * CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。 * CyclicBarrier初始时还可带一个Runnable的参数,此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。 */ public class CyclicBarrierTest { public static class ComponentThread implements Runnable { CyclicBarrier barrier;// 计数器 int ID; // 组件标识 int[] array; // 数据数组 // 构造方法 public ComponentThread(CyclicBarrier barrier, int[] array, int ID) { this.barrier = barrier; this.ID = ID; this.array = array; } public void run() { try { array[ID] = new Random().nextInt(100); System.out.println("Component " + ID + " generates: " + array[ID]); System.out.println("Component " + ID + " sleep"); // 在这里等待Barrier处 barrier.await(); System.out.println("Component " + ID + " awaked"); // 计算数据数组中的当前值和后续值 int result = array[ID] + array[ID + 1]; System.out.println("Component " + ID + " result: " + result); } catch (Exception ex) { } } } /** * 测试CyclicBarrier的用法 */ public static void testCyclicBarrier() { final int[] array = new int[3]; CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { // 在所有线程都到达Barrier时执行 public void run() { System.out.println("testCyclicBarrier run"); array[2] = array[0] + array[1]; } }); // 启动线程 new Thread(new ComponentThread(barrier, array, 0)).start(); new Thread(new ComponentThread(barrier, array, 1)).start(); } public static void main(String[] args) { CyclicBarrierTest.testCyclicBarrier(); } }
结果:
Component 1 generates: 40
Component 0 generates: 8
Component 1 sleep
Component 0 sleep
testCyclicBarrier run
Component 0 awaked
Component 0 result: 48
Component 1 awaked
Component 1 result: 88
两个线程分别执行,互不影响 ,执行到barrier.await();时该线程进入等待状态,
当两个线程都执行到barrier.await();时,达到CyclicBarrier启动所需的阻塞线程数,进入到new CyclicBarrier(2, new Runnable()...)里面的方法, 执行完里面的方法后,等待的两个线程再次被唤醒,继续各自执行线程后面的语句。
Example2:
比如有几个旅行团需要途经深圳、广州、韶关、长沙最后到达武汉。旅行团中有自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要等待其他旅行团到达此地后再同时出发,直到都到达终点站武汉。
这时候CyclicBarrier就可以派上用场。CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。
package tags; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCyclicBarrier { // 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan private static int[] timeWalk = { 5, 8, 15}; // 自驾游 private static int[] timeSelf = { 1, 3, 4}; // 旅游大巴 private static int[] timeBus = { 2, 4, 6}; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ": "; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.tourName = tourName; this.barrier = barrier; } public void run() { try { Thread.sleep(times[0] * 1000); //使用 times岔开各组时间,使其显示不同的now() System.out.println(now() + tourName + " Reached Shenzhen"); barrier.await(); Thread.sleep(times[1] * 1000); System.out.println(now() + tourName + " Reached Guangzhou"); barrier.await(); Thread.sleep(times[2] * 1000); System.out.println(now() + tourName + " Reached Shaoguan"); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main(String[] args) { // 三个旅行团 CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService exec = Executors.newFixedThreadPool(3); exec.submit(new Tour(barrier, "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf)); exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); } }
结果:
15:13:11: SelfTour Reached Shenzhen
15:13:12: BusTour Reached Shenzhen
15:13:15: WalkTour Reached Shenzhen
15:13:18: SelfTour Reached Guangzhou
15:13:19: BusTour Reached Guangzhou
15:13:23: WalkTour Reached Guangzhou
15:13:27: SelfTour Reached Shaoguan
15:13:29: BusTour Reached Shaoguan
15:13:38: WalkTour Reached Shaoguan
例子来源:http://conkeyn.iteye.com/blog/546280(测试时稍有修改)
源码分析:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); /** 达到的数量 */ private final int parties; /* 数量满足后执行的线程 */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); private static class Generation { boolean broken = false; } public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException,BrokenBarrierException,TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // index为0时,线程数量达到屏障数量,执行barrierCommand,并唤醒所有线程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //若未达到屏障数量,每一个线程进入时都阻塞在这 //此时会释放锁,以便下一线程进入 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch(InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } private void nextGeneration() { // 唤醒所有线程 trip.signalAll(); // set up next generation // 重置屏障数量,使其可循环使用 count = parties; generation = new Generation(); }
其实就是使用一个重入锁,一个Condition实现。
源码:
/* * CyclicBarrier可重复使用reset();可打断breakBarrier() */ public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which <tt>count</tt> applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); } /** * Returns the number of parties required to trip this barrier. * * @return the number of parties required to trip this barrier */ public int getParties() { return parties; } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, * then all other waiting threads will throw * {@link BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was * broken when {@code await} was called, or the barrier * action (if present) failed due an exception. */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier, or the specified waiting time elapses. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>The specified timeout elapses; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then {@link TimeoutException} * is thrown. If the time is less than or equal to zero, the * method will not wait at all. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while * waiting, then all other waiting threads will throw {@link * BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @param timeout the time to wait for the barrier * @param unit the time unit of the timeout parameter * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the specified timeout elapses * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was broken * when {@code await} was called, or the barrier action (if * present) failed due an exception */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * Queries if this barrier is in a broken state. * * @return {@code true} if one or more parties broke out of this * barrier due to interruption or timeout since * construction or the last reset, or a barrier action * failed due to an exception; {@code false} otherwise. */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * Returns the number of parties currently waiting at the barrier. * This method is primarily useful for debugging and assertions. * * @return the number of parties currently blocked in {@link #await} */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
相关推荐
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该barrier在释放等待...
CyclicBarrier的使用以及注意事项
CyclicBarrier可以被看作是一个屏障,当所有线程都到达这个屏障时,所有线程才能继续执行。与CountDownLatch不同的是,CyclicBarrier可以重复使用,即当所有线程都到达屏障后,屏障会自动重置,可以继续使用。 ...
《java并发编程》中CountDownLatch和CyclicBarrier用法实例大全,几乎包含了所有重要的用法
用CyclicBarrier,reentrantlock,condition来完成同时购买,同步购买的功能 JUC系列之模拟抢票(N人同时抢票,票不足系统补仓,N-M人继续抢票) http://blog.csdn.net/crazyzxljing0621/article/details/77891620
NULL 博文链接:https://hubowei1.iteye.com/blog/2312471
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用,其相当...
1.2 CyclicBarrier概述 CyclicBarrier的基本用法 2.1 创建CyclicBarrier对象 2.2 await()方法 控制线程协作数量 3.1 场景介绍 3.2 使用CyclicBarrier控制线程协作 循环使用CyclicBarrier 4.1 场景介绍 4.2 使用...
主要为大家详细分析了Java并发系列之CyclicBarrier源码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
主要介绍了Java并发实例之CyclicBarrier的使用,涉及CyclicBarrier的介绍,以及相关的并发实例,具有一定参考价值,需要的朋友可以了解下。
CyclicBarrier和CountDownLatch一样,都是关于线程的计数器。用法略有不同,测试代码如下:
CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数,当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续...
java并发编程中CountDownLatch和CyclicBarrier的使用.pdf
java并发编程中CountDownLatch和CyclicBarrier的使用借鉴.pdf
主要介绍了java多线程之CyclicBarrier的使用方法的相关资料,希望通过本文能帮助到大家,让大家理解掌握这部分内容,需要的朋友可以参考下
主要介绍了Java并发编程(CyclicBarrier)实例详解的相关资料,JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作。
CountDownLatch 和 CyclicBarrier 为线程同步的辅助工具,通过它可以做到使一条线程一直阻塞等待,直到其他线程完成其所处理的任务。