IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    JDK源码分析之concurrent包(四) -- CyclicBarrier与CountDownLatch - Yancey.Han

    Yancey.Han发表于 2015-07-07 09:37:00
    love 0
    上一篇我们主要通过ExecutorCompletionService与FutureTask类的源码,对Future模型体系的原理做了了解,本篇开始解读concurrent包中的工具类的源码。首先来看两个非常实用的工具类CyclicBarrier与CountDownLatch是如何实现的。 CyclicBarrierCyclicBarrier直译过来是“循环屏障”,作用是可以使固定数量的线程都达到某个屏障点(调用await方发处)后,才继续向下执行。关于用法和实例本文就不做过多说明,现在直接进入CyclicBarrier的源码。首先,来看下CyclicBarrier的几个标志性的成员变量:1privatestaticclassGeneration {2booleanbroken =false;3}4/**The number of parties*/5privatefinalintparties;6/*The command to run when tripped*/7privatefinalRunnable barrierCommand;8/**The current generation*/9privateGeneration generation =newGeneration();1011/**12* Number of parties still waiting. Counts down from parties to 013* on each generation. It is reset to parties on each new14* generation or when broken.15*/16privateintcount;这几个成员变量有以下说明:说明1:parties是final的,在构造时,传入的固定线程数,不可变;说明2:count是计数器,如果有线程到达了屏障点,count就减1;说明3:直到count=0时,其它线程才可以向下执行;说明4:barrierCommand是Runnable任务,在所有线程到达屏障点是,就执行barrierCommand,barrierCommand是构造时传入的,可以为空;说明5:generation比较复杂,是静态内部类Generation的实例,一个generation对象代表一代的屏障,就是说,如果generation对象不同,就代表进入了下一次的屏障,所以说,这个线程屏障是可循环的(Cyclic)。说明6:另外,generation的唯一的一个名为broken的成员变量代表屏障是否被破坏掉,破坏的原因可能是线程中断、失败或者超时等。如果被破坏,则所有线程都将抛出异常。了解上述成员变量的说明后,基本上就可以知道了CyclicBarrier的实现原理,下面我们来看看代码是如何写的。其实实现很简单,我们只需通过await()方法就可以说明:1publicintawait()throwsInterruptedException, BrokenBarrierException {2try{3returndowait(false, 0L);4}catch(TimeoutException toe) {5thrownewError(toe);//cannot happen;6}7}await()方法调用了真是的执行方法dowait(),这个方法里涵盖了所有乾坤:1/**2* Main barrier code, covering the various policies.3*/4privateintdowait(booleantimed,longnanos)5throwsInterruptedException, BrokenBarrierException,6TimeoutException {7finalReentrantLock lock =this.lock;8lock.lock();9try{10finalGeneration g =generation;1112if(g.broken)13thrownewBrokenBarrierException();1415if(Thread.interrupted()) {16breakBarrier();17thrownewInterruptedException();18}1920intindex = --count;21if(index == 0) {//tripped22booleanranAction =false;23try{24finalRunnable command =barrierCommand;25if(command !=null)26command.run();27ranAction =true;28nextGeneration();29return0;30}finally{31if(!ranAction)32breakBarrier();33}34}3536//loop until tripped, broken, interrupted, or timed out37for(;;) {38try{39if(!timed)40trip.await();41elseif(nanos > 0L)42nanos =trip.awaitNanos(nanos);43}catch(InterruptedException ie) {44if(g == generation && !g.broken) {45breakBarrier();46throwie;47}else{48//We're about to finish waiting even if we had not49//been interrupted, so this interrupt is deemed to50//"belong" to subsequent execution.51Thread.currentThread().interrupt();52}53}5455if(g.broken)56thrownewBrokenBarrierException();5758if(g !=generation)59returnindex;6061if(timed && nanos <= 0L) {62breakBarrier();63thrownewTimeoutException();64}65}66}finally{67lock.unlock();68}69}代码第20行对应“说明2”。代码第21行对应“说明3”。代码第26行对应“说明4”。代码第28行对应“说明5”,nextGeneration()方法中使用generation = new Generation();表示屏障已经换代,并唤醒所有线程。nextGeneration()请自行查看源码。代码第16行、第45行等所有调用breakBarrier()方法处,对应“说明6”,表示屏障被破坏,breakBarrier()方法中将generation.broken = true,唤醒所有线程,抛出异常。最后,代码第40行处trip.await(),表示持有trip的线程进入等待被唤醒状态。另外,实现中还有一个很重要的点,就是第8行的lock和第67行的unlock,保证同步状态下执行这段逻辑,也就保证了count与generation.broken的线程安全。以上就是CyclicBarrier(循环使用的屏障)的源码实现,是不是比较简单。CountDownLatchCountDownLatch直译过来是“倒计数锁”。在线程的countDown()动作将计数减至0时,所有的await()处的线程将可以继续向下执行。CountDownLatch的功能与CyclicBarrier有一点点像,但实现方式却很不同,下面直接来观察CountDownLatch的两个最重要的方法:1publicvoidawait()throwsInterruptedException {2sync.acquireSharedInterruptibly(1);3}45publicvoidcountDown() {6sync.releaseShared(1);7}可以看到,这两个方法实际是由静态内部类Sync来实现的。这个Sync我们在上一篇FutureTask的实现中也见过,那我们就先简单介绍下Sync究竟是用来做什么的:SyncextendsAbstractQueuedSynchronizer这个抽象类AbstractQueuedSynchronizer是一个框架,这个框架使用了“共享”与“独占”两张方式通过一个int值来表示状态的同步器。类中含有一个先进先出的队列用来存储等待的线程。这个类定义了对int值的原子操作的方法,并强制子类定义int的那种状态是获取,哪种状态是释放。子类可以选择“共享”和“独占”的一种或两种来实现。共享方式的实现方式是死循环尝试获取对象状态,类似自旋锁。独占方式的实现方式是通过实现Condition功能的内部的类,保证独占锁。而我们正在解读的CountDownLatch中的内部类Sync是使用的共享方式,对于AbstractQueuedSynchronizer的解读本篇不打算详细说明,因为笔者对“独占”方式还没彻底弄通,如果以后有机会再来补充。接下来就直接观察CountDownLatch.Sync的源码:1/**2* Synchronization control For CountDownLatch.3* Uses AQS state to represent count.4*/5privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer {6privatestaticfinallongserialVersionUID = 4982264981922014374L;78Sync(intcount) {9setState(count);10}1112intgetCount() {13returngetState();14}1516publicinttryAcquireShared(intacquires) {17returngetState() == 0? 1 : -1;18}1920publicbooleantryReleaseShared(intreleases) {21//Decrement count; signal when transition to zero22for(;;) {23intc =getState();24if(c == 0)25returnfalse;26intnextc = c-1;27if(compareAndSetState(c, nextc))28returnnextc == 0;29}30}31}结合最初列出的await()和countDown()方法,通过上述代码第9行可以看到,CountDownLatch将构造时传入的用来倒计数的count作为状态值。通过上述代码第17行可以看到,CountDownLatch定义了当count=0时表示可以共享获取状态(在await()方法中调用的sync.acquireSharedInterruptibly(1)会死循环尝试获取状态)。通过上述代码第26行可以看到,CountDownLatch定义了当count-1表示一次共享释放状态(在countDown()方法中调用的sync.releaseShared(1)会涉及)。以上就是CountDownLatch的源码实现。总结CyclicBarrier与CountDownLatch有一点相似之处,但是有很大区别。它们的异同我个人总结如下:类似功能CyclicBarrier与CountDownLatch都是通过计数到达一定标准后,使得在await()处的线程继续向下执行。不同之处CyclicBarrier的实现是通过线程的等待唤醒;CountDownLatch的实现是通过死循环访问状态的自旋机制CyclicBarrier在线程改变计数后不能向下执行(await()改变计数);CountDownLatch在线程改变计数后继续向下执行(countDown()改变计数)CyclicBarrier的计数可以被重置,循环使用;CountDownLatch的计数只能使用一次本文链接:JDK源码分析之concurrent包(四) -- CyclicBarrier与CountDownLatch,转载请注明。


沪ICP备19023445号-2号
友情链接