JUC 之Phaser

2019年12月06日 阅读数:32
这篇文章主要向大家介绍JUC 之Phaser,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

前言

在前面的几篇文章中详述了ForkJoin框架的若干组分,在相应的官方文档中总会不时地提起"Phaser",一样的,也提到Phaser能够用于帮助运行在ForkJoinPool中的ForkJoinTask运行时保持有效的执行并行度(其实特指其余task都在等待一个phase的前进时).node

熟悉JUC的朋友都知道它的大概组成部分包含:Containers(支持并发的容器),Synchronizers(同步器),Executors(线程池),BlockingQueue(阻塞队列),Atomic(原子类),Lock and Condition(锁).而Phaser和CyclicBarrier,Semaphore等同样是一个同步器.算法

本文主要介绍Phaser的内部实现,粗略介绍使用,它的源码相比于线程池较为简单,但最好能对比其余同步器来了解,读者最好拥有juc其余同步器,原子类,部分ForkJoin框架的基础.编程

同时,本文也会再次提到ForkJoinPool::managedBlock(blocker),以前在ForkJoinPool一文提到了实现和接口,而在CompletableFuture中见到了一个blocker的实现.并发

Phaser源码

首先来看一些与Phaser状态有关的简单的常量.框架

//64位整数表示Phaser的状态.
private volatile long state;

private static final int  MAX_PARTIES     = 0xffff;//最大parties,后16位表示.
private static final int  MAX_PHASE       = Integer.MAX_VALUE;//最大phase,最大整数值.
private static final int  PARTIES_SHIFT   = 16;//取parties使用的移位数,16
private static final int  PHASE_SHIFT     = 32;//取phase的移位数,32
private static final int  UNARRIVED_MASK  = 0xffff;      //未到的,取后16位.
private static final long PARTIES_MASK    = 0xffff0000L; //参加者,17-32位.
private static final long COUNTS_MASK     = 0xffffffffL; //数量,后32位.
private static final long TERMINATION_BIT = 1L << 63;//终止态,首位.

// 特殊值.
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.显然,它表示了一个ONE_ARRIVAL信息和PARTY信息.
private static final int  EMPTY           = 1;

//对一个state s计算unarrived的count,
private static int unarrivedOf(long s) {
    //直接取整数位,若是等于EMPTY(1)则返回0,不然取后16位.
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

//对一个state,取出parties信息,直接取state的17至32位.
private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}
//对于一个state,取出phase信息,直接取前32位.
private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}
//对于一个state,取出arrived信息
private static int arrivedOf(long s) {
    int counts = (int)s;
    //state的后32位等于1(EMPTY)返回0,不然返回parties(state的17至32位,参考上面的partiesOf方法)和UNARRIVED(state的后16位)的差.
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}

上面都是一些常量,没什么可分析的,简单来个总结.函数

Phaser用一个long型的state保存状态信息.工具

state的前32位表示phase,后16位表示unarrivied,17至32位表示parties,parties减去unarrived即arrived.单元测试

下面咱们看一些成员变量和有关函数.测试

//this的父,能够是null表示none
private final Phaser parent;

//phaser显然是个树的结果,root表明根,若是当前phaser不在树内,则root==this
private final Phaser root;


//偶数队列和奇数队列.它们存放等待线程栈的头,为了减小当添加线程与释放线程的竞态,
//这里使用了两个队列并互相切换,子phaser共享root的队列以加快释放.
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

//决定某个phase的等待线程队列.
private AtomicReference<QNode> queueFor(int phase) {
    //选择队列的方法,若是参数phase是偶数,使用evenQ,不然oddQ.
    return ((phase & 1) == 0) ? evenQ : oddQ;
}

//出现arrive事件时的边界异常信息.
private String badArrive(long s) {
    return "Attempted arrival of unregistered party for " +
        stateToString(s);
}

//注册时的边界异常信息.
private String badRegister(long s) {
    return "Attempt to register more than " +
        MAX_PARTIES + " parties for " + stateToString(s);
}
//他们都用到的stateToString(s),计算参数s对应的phase,parties,arrived.
private String stateToString(long s) {
    return super.toString() +
        "[phase = " + phaseOf(s) +
        " parties = " + partiesOf(s) +
        " arrived = " + arrivedOf(s) + "]";
}

为了便于理解,先来看队列的实现.ui

//表示等待队列的QNode,实现了ManagedBlocker
static final class QNode implements ForkJoinPool.ManagedBlocker {
    //存放所属phaser
    final Phaser phaser;
    //所属phase
    final int phase;
    //是否可扰动
    final boolean interruptible;
    //是否认时
    final boolean timed;
    //是否已扰动
    boolean wasInterrupted;
    //计时相关
    long nanos;
    final long deadline;
    //关联线程,当是null时,取消等待.
    volatile Thread thread; 
    //下一个QNode
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        //取当前线程.
        thread = Thread.currentThread();
    }
    //isReleasable方法
    public boolean isReleasable() {
        if (thread == null)
            //1.线程已置空(如2),返回true释放.
            return true;
        if (phaser.getPhase() != phase) {
            //2.发现phaser所处的phase不是构建QNode时的phase了,就置线程为空,返回true.
            thread = null;
            return true;
        }
        if (Thread.interrupted())
            //3.若是当前线程扰动了.
            wasInterrupted = true;
        if (wasInterrupted && interruptible) {
            //4.发现扰动标记,而且QNode配置为可扰动,则置线程null并返回true
            thread = null;
            return true;
        }
        if (timed) {
            //5.定时逻辑,还有nanos,计算新的时长.
            if (nanos > 0L) {
                nanos = deadline - System.nanoTime();
            }
            if (nanos <= 0L) {
                //已经到时间,返回true,线程置空.
                thread = null;
                return true;
            }
        }
        return false;
    }
    //block逻辑
    public boolean block() {
        if (isReleasable())
            return true;
        else if (!timed)
            //不定时的park
            LockSupport.park(this);
        else if (nanos > 0L)
            //定时的状况.
            LockSupport.parkNanos(this, nanos);
        //老规矩
        return isReleasable();
    }
}

前面介绍过CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,这一块的逻辑显然得心应手.

很明显,若是咱们在ForkJoinPool中使用它做为blocker,并在相应的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),将每一个ForkJoinWorkerThread在阻塞前构建一个QNode进入Phaser的等待队列(虽然尚未讲到相关内容,可是Phaser显然不用咱们直接操做内部类QNode),那么它将依照上述逻辑进行补偿,保障有效的并行度.

前面完成了承前启后,预热到此结束,开始看Phaser的核心方法.

//doArrive方法
//它是arrive和arriveAndDeregister方法的主要实现.手动调用这些方法能够加速经过和最小化竞态窗口期.
//参数表明要从当前state中减去的调整数值,它的单位依托于业务,当为arrive时减去的单位为ONE_ARRIVAL,
//当为arriveAndDeregister时减去的单位为ONE_DEREGISTER.
private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
        //1.变量s初始化,取决因而否当前Phaser是root.不是root将试图从root同步滞后的state.
        long s = (root == this) ? state : reconcileState();
        //计算phase,前32位.
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //2.负数直接返回.说明原来的state首位就是1,前面的TERMINATE_BIT就是64位置1.
            return phase;
        //取count,后32位.
        int counts = (int)s;
        //计算unarrived,和前面同样的逻辑.
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)//2.1
            //没有unarrived了,说明不该该调用此方法,抛出异常,信息就是前面介绍过的badArrive
            throw new IllegalStateException(badArrive(s));
        //3.尝试将state减去adjust数.
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
            //3.1cas成功后,unarrived余1,则前进一个phase
            if (unarrived == 1) {
                //3.1.1取出parties做为下一个state的基础.
                long n = s & PARTIES_MASK;
                //3.1.2 下一个unarrived,数值上等于parties.
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (root == this) {
                    //3.1.3当前Phaser是root,onAdvance返回true,则加上终止信号.
                    if (onAdvance(phase, nextUnarrived))
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        //3.1.4 onAdvance返回false,而计算得出的nextUnarrived是0,即没有parties,n加上一个empty(1)
                        n |= EMPTY;
                    else
                        //3.1.5nextUnArrived不是0,加到n上.
                        n |= nextUnarrived;
                    //3.1.6前面的流程完成了state的后32位(parties和unarrived),接下来处理前32位.
                    //限定在MAX_PHASE以内,对当前phase加1.
                    int nextPhase = (phase + 1) & MAX_PHASE;
                    //将nextPhase的值加到n的前32位.并用n去cas掉原来的state,由于有3处入口的cas,此处必定能成功
                    n |= (long)nextPhase << PHASE_SHIFT;
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                    //更新到新的phase,唤醒等待的waiter.
                    releaseWaiters(phase);
                }
                //3.1.7当前Phaser不是root,当nextUnarrived计算得0时,像父传递解除注册,参数ONE_DEREGISTER
                //会同时减去一个unarrived和一个parties.下轮循环正常应进入3.1.8
                else if (nextUnarrived == 0) { 
                    phase = parent.doArrive(ONE_DEREGISTER);
                    //完成传递后,将本身的state置empty.
                    UNSAFE.compareAndSwapLong(this, stateOffset,
                                              s, s | EMPTY);
                }
                else
                    //3.1.8,当前Phaser不是root,计算的nextUnarrived非0,像父传递一个arrive事件,减去一个unarrived.
                    phase = parent.doArrive(ONE_ARRIVAL);
            }
            //3.2返回当前phase,多是已进入3.1递增的.仅有此处可退出循环.
            return phase;
        }
    }
}

关于该方法的执行流程,咱们结合几个周边方法一并分析,先来看注册方法和onAdvance勾子.

//注册和批量注册.参数表明parties和unarrived字段的增长数,它必须大于0.
private int doRegister(int registrations) {
    // 1.用参数计算一个adjust,同时包含parties和arrive.
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    //循环尝试更改.
    for (;;) {
        //2.存在parent,则用root的phase调整this的state.
        long s = (parent == null) ? state : reconcileState();
        //取出当前state中保存的counts,parties,unarrived信息.
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            //要注册的数量大于了余量,抛出异常.
            throw new IllegalStateException(badRegister(s));
        //3.计算出phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //phase为负说明state为负,即终止态,终止.
            break;
        //4.当前state表示的参与数非空的逻辑,当前注册非首次注册.
        if (counts != EMPTY) {           
            if (parent == null || reconcileState() == s) {
                //this是root或者从root同步的state不变,继续执行,不然从新循环.
                if (unarrived == 0)  
                    //4.1本轮循环经过原state计算的unarrived为0,说明应等待下一phase,使用root等待      
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    //4.2本轮循环未发现应等待下一phase,尝试原子更新,增长adjust到state上.
                    break;
            }
        }
        //5.当前不存在counts,且自身就是root,表明root的首次注册.
        else if (parent == null) { 
            //5.1计算下一个state,由于没有参与数,使用phase初始化前32位,并使用adjust作后32位.        
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                //5.2 cas成功,退出,不成功,下轮循环.
                break;
        }
        //6.是首次注册,但也不是root的逻辑.表明非root的Phaser的首次注册.
        else {
            //6.1对当前Phaser加锁并double check,避免同时调用.加锁失败的线程将在后续进入2的逻辑.
            synchronized (this) {  
                //double check state未发生改变.   
                if (state == s) {
                    //6.2首先向父Phaser注册1.
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        //发现进入终止态,直接中止.
                        break;
                    
                    //6.3向父Phaser注册成功,循环尝试cas掉老的state,新state的算法同上,phase加adjust.
                    //在整个while循环中,再也不考虑phase进入终止态的状况,由于这些操做处于同一个"事务"中,
                    //且因竞态等缘由,若某次cas时计入了负数的phase,方法返回后也能够及时发现.
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        //若是cas不成功,则读取s为新的state,计算新的phase并从新循环.
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    //6.4cas成功后退出循环.
                    break;
                }
                //若是if(state==s)判断失败,说明有别的线程有当前线程进入synchronized块前已经加锁并执行了内部的逻辑且稍后释放了锁,
                //这样当前线程加锁成功,但if判断失败,它会当即释放锁并返回到2.
            }
        }

    }
    return phase;
}


//使用root的phase调整this的state,更新滞后的结果.这通常发生在root前进了phase可是
//子phaser尚未作到这一步,这种状况下,子phaser必须完成这个前进的步骤,这一过程当中,phase将
//被置为root的phase,unarrived则会重置为parties,若parties为0,则置为EMPTY.返回结果state.
private long reconcileState() {
    final Phaser root = this.root;
    long s = state;
    //不是root才进行下一步.
    if (root != this) {
        int phase, p;
        //cas,phase采用root,parties不变,unarrived重置为parties或EMPTY.
        while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
               (int)(s >>> PHASE_SHIFT) &&
                //phase滞后于root
                //尝试cas.
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                //肯定新state的前32位,使用root的phase.
                s = (((long)phase << PHASE_SHIFT) |
                    //新phase<0,后32位直接取this的state表示的counts.
                     ((phase < 0) ? (s & COUNTS_MASK) :
                    //phase有效,this的state表示的parties为0,则后32位使用empty
                      (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                        //不然,后32位使用parties.
                       ((s & PARTIES_MASK) | p))))))
            s = state;
    }
    return s;
}


//onAdvance勾子方法,参数为当前phase和注册的parties数.
//默认实现为parties数为0,方法返回true时,调用者会尝试终止Phaser.(参考前面的doArrive).随后调用isTerminated方法将返回true.
//执行此方法时抛出的运行时异常或Error将直接上抛给尝试advance相应的phase的线程,这种状况下不会发生phase的advance.
//方法的入参表示的是Phaser当前的state(未advance前),所以若在onAdvance方法中执行arrive,regist,waiting这三种操做的行为是不肯定的也不可靠的.
//若是当前Phaser是一个级联的成员,那么onAdvance只会由root在每次advance时调用.
//方法的默认实现返回true的场景目前只能是通过数次arriveAndDeregister调用形成parties归零的结果.咱们继承Phaser能够轻易地重写此行为,
//好比简单粗暴地返回false,那么将永远容许新的注册.
protected boolean onAdvance(int phase, int registeredParties) {
    return registeredParties == 0;
}

通过前面的代码分析,已经对Phaser的核心函数doRegister,doArrive有了全面的了解.

二者都会在一开始同步root的phase,且若是出现落后root的状况,同步了新的phase的同时,也会从新初始化unarrived,而且使用parties的值.

doArrive方法会每次调整unarrived数量(也可包含parties数量,若是使用了解除注册),当Phaser调用自身的arrive/arriveAndDeregister时,会作出相应的减小,并根据是否为root而决定向上递归.

Phaser减小自身unarrived信号(也可能同时有parties信号)后,若发现这已是最后一个unarrived信号,则进行接下来的判断:

1.当前Phaser是root,advance并唤醒waiter.(重要的唤醒操做执行点,root一轮完成)

2.当前Phaser不是root,且它已经不具有继续下一轮的条件(计算nextUnarrived为0,即parties已经被arriveAndDeregister置0),则从父Phaser减小一个unarrived和parties.

3.当前Phaser不是root,但它仍具备parties,知足进行下一轮的条件(计算nextUnarrived不是0),则从父Phaser减小一个unarrived,但不减小parties.

显然,子Phaser的最后一个unarrived的消失必定会形成父的unarrived减小,子Phaser不能继续下一phase的register和arrive时,从父Phaser中卸载.

若不是本Phaser的最后一个unarrived信号,则直接结束,至关于只进行了上面的减小信号操做.

doRegister方法的逻辑大体相反,不一样于doArrive,它的参数registrations同时做用于parties和unarrived,即两个位上同时加上registrations参数.它的大体逻辑:

1.当前注册并不是首次注册,且出现unarrived==0,即本轮已经完成了arrive,那么本轮将不能注册,须要等待root更新到下轮.(这也是咱们碰到的第一个阻塞)

2.当前注册并不是首次注册,unarrived也不是0,则在本phase进行注册,增长相应的parties和unarrived.

3.当前注册是root的首次注册,给root的state加上相应的parties和unarrived.

4.当前注册是非root的首次注册,加锁(this),对本身的state加上相应的parties和unarrived(同上,以registrations为单位),而对parent加上一个parties和unarrived单位.

很明显,对于单Phaser的状况很是好理解,每次减小unarrived数量(先不考虑减小parties),则最终致使Phaser自身进入下一个phase,而后从新初始化unarrived到下一轮,unarrived的新值是前一轮剩下的parties数量.

当咱们同时也尝试减小parties数量,即解除parties的注册,最终致使没有parties,那么Phaser将进入终止态.

整个过程当中,只要Phaser没进入终止态,随时能够进行新的注册,并增长parties和unarrived的数量.每一个arrive能够减小unarrived的数量为任何正整数,不必定是1.

对于多Phaser的状况,有两个特殊点:

1.对任意Phaser树中的某一个Phaser调用注册操做,会令自身加上相应参数个parties和unarrived单位,仅会在该Phaser第一次注册时增长父Phaser(极端可能,仅从一个叶子节点第一个注册的状况下可一直递归到root)的parties数和unarrived数各1单位(不论参数是多少).

2.对任意Phaser树中的某一个Phaser调用arrive操做,会令自身减去相应的参数个parties和unarrived单位,同时仅当本Phaser此时是最后一个unarrived时,会减去父Phaser的一个unarrived单位(当前子Phaser仍旧有parties能够构建下一phase),或减去父Phaser一个Parties和unarrived单位.(极端状况下,每一级都是最后一个unarrived时,减小叶子节点的最后一个unarrived会递归到root).

每新增一个子Phaser,父Phaser就会增长一个要完成触发phase的advance前必需要等到arrive的单位;每个子Phaser中全部的arrive完成,父Phaser都将减小一个要等待advance所必需触发的arrive.

目前没有看到await方法,但能够提早说明,等待操做彻底依赖于root是否完成本轮.也就是全部子Phaser都完成了同一轮(arrive打满),才能让父Phaser自己减去一个全部arrive单位,再触发父Phaser本轮的完成,此时对任何已完成的Phaser进入注册,都会进入上述的root.internalAwaitAdvance(phase, null)方法等待root进入下一phase.若是对已经完成全部arrive的Phaser继续进行arrive操做,由于unarrived已是0,则会抛出异常.

因此对于使用子Phaser的场景,若是发生很巧妙的状况,Phaser树上当前子Phaser的arrive结束条件知足了,使得新来的注册只能等待下一轮次,而其余分支的子Phaser又恰恰不能完成本轮次,那么新的phaser.doRegister方法将阻塞在此.

好在咱们使用Phaser可能会相似CyclicBarrier的使用方式,可对每一轮(phase)进行注册并等待(也许只等一轮,那么arrive就要带上deregister),每一轮最后一个线程arrive了,就会中止全部线程的等待,让全部线程继续执行,同时开启了下一轮次,这些线程此时又能够不经注册直接在新的轮次中进行等待,直到最后一个arrive了,再次唤醒全部线程并继续执行,同时Phaser再前进一轮,如此往复.中间使用arrive并deregister的线程会从本轮起减小一个unarrive数量(由于parties也减小了,因此再下一轮初始化unarrive数量时也会减小一次).咱们可让这些线程参与任意的轮次,但要注意的是,若是有线程中途不参加了,必定要解除注册,不然由于每轮初始化时,要等待arrive的数量都是上一轮剩下的parties数量,有线程中止了执行,却不减小parties数,那么下轮全部等待的线程将永远等不到phaser知足唤醒的条件.

上述的过程当中能够明显的看出,目前已介绍的两个重要核心函数:注册和arrive并无直接记录和操做线程的操做,相应的操做在等待方法和唤醒方法中(前面提到过release),咱们稍后介绍.

如今假设一个特殊的使用场景,也能够区别于CyclicBarrie和CountDownLatch的使用.仍是上面的例子,可是咱们准备的线程数与Phaser的parties数/unarrived数不一样(通常前者要多些),会发生什么事?

首先建立了Phaser,不指定最初parties数,并用每一个线程去注册(我甚至能够用一个线程去重复注册,每次的参数registrations还能够不一样,注册的做用并非将当前线程压入队列,而是为本phase设置一个unarrive数量,以控制到达下个phase前必须有多少次arrive的发生),则parties数和unarrived的初值彻底与此有关,是一个依托于咱们随意注册而产生的随意值.那么假定咱们的线程数量大于这个parties数(假定调用注册方法的线程和arrive及等待的线程无关),并令有的线程执行arrive(彻底能够一次arrive减去多个信号量,甚至一个线程屡次arrive),有的线程执行await等待信号advance到下一个phase(一个线程在一个周期只能调用一次),有的线程执行了arrive也等待phase前进(这种状况一个线程一周期也只能一次.其实这些分别对应了还未介绍的arrive,waitAdvance,arriveAndWaitAdvance等方法),单独进行await操做的线程能够是任意数量,执行arrive方法的线程加上执行arrive并wait的操做的线程和必须超过unarrived,这才能唤醒等待线程.

目前这些还比较抽象,等到咱们看过相应的几个方法便了然了.

onAdvance的方法默认实现就是判断本阶段注册的parties数量,若是已是0则说明没有parties了,Phaser应该结束.可是咱们其实能够从新实现,好比参数中同时传入了当前的phase,我能够规定上面的例子中phase最多只有3轮次,那么不论何时arrive,发现了当前phase已进入3轮,Phaser就被终止.固然,这一过程是由root执行的,可是子Phaser的phase会在每次注册和arrive发生时同步root,所以本例中对于phase数的判断能够粗放到全部Phaser,对于parties数则只能做用于root(事实上调用onAdvance的必定是root).

接下来看全量构造方法和若干和上面有关的公有函数.

//初始化一个Phaser,指定parent,指定未到来的参与者数(unarrived parties),但这只是一个初值,
//当咱们在任什么时候候调用注册方法时,还会相应的增长.
public Phaser(Phaser parent, int parties) {
    if (parties >>> PARTIES_SHIFT != 0)
        //太大了,超过了后16位能表示的整数.
        throw new IllegalArgumentException("Illegal number of parties");
    //初始phase为0.
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        //1.有parent的状况,共享parent的root,队列,并向parent中注册1个parties和unarrived,
        //同时同步一次phase(表面上是同步了parent的,实际上前面已经看过,会同步root).
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        //2.无parent的状况,root就是this,并初始化奇偶等待队列.它使用原子引用的形式存放一个QNode,而QNode咱们前面已介绍.
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();
    }
    //统一初始化state,后32位的决定依托于parties,若是parties是0则给予EMPTY,直接无论高32位.
    //不为0则给予phase设置为前32位,parties设置parties位和unarrived位.
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}


//注册方法,就是调用doRegister,参数1.
//它会向this添加一个unarrived的party,若是正巧root正在进行advance,它须要等待下个phase.
//若是this有parent,且它以前没有过注册的parties,则首次注册会触发自身向parent的注册.
//若是this已经终止了,那么尝试注册将会无效并返回负值.若是注册的数量大于了最大支持parties(后16位整数),
//会抛出IllegalStateException
public int register() {
    return doRegister(1);
}


//批量注册指定的信号量,并返回最新的phase.规则基本同上.
public int bulkRegister(int parties) {
    if (parties < 0)
        throw new IllegalArgumentException();
    if (parties == 0)
        //参数0直接查询最新的phase返回
        return getPhase();
    return doRegister(parties);
}


//arrive一个信号,不等待其余arrive事件,返回最新phase(终止态为负).
//当前Phaser的arrive事件已满,则对parent来讲也会触发一个arrive.(若是有parent)
public int arrive() {
    return doArrive(ONE_ARRIVAL);
}


//arrive并解除一个注册parties,也不阻塞等待其余arrive.若是当前Phaser的解除注册操做
//将parties减至0,且this有parent,这将致使parent也减小一个parties(本phaser解除在parent的注册).
public int arriveAndDeregister() {
    return doArrive(ONE_DEREGISTER);
}

接下来要看上面已经作足了铺垫的等待方法了,并结合前面的队列一块看.

//令当前线程"到达"此phaser并等待其余parties,它等效于awaitAdvance(arrive()).
//注意,按照道格的注释,若是你在一个未进行注册(调用register)的线程里调用此方法实际上是一个使用错误,
//可是从本方法和前面以及后面有关的方法来看,全部记录线程的方法均只与arrive和等待有关,与注册无关.
//所以Phaser自己没法规避这种使用错误,咱们彻底可使用另外一个线程去注册,而当前线程去arrive,将两个动做分开.
//方法会返回arrive时最新的phase号.终止时会是负值.
public int arriveAndAwaitAdvance() {
    //记录root,开始循环.
    final Phaser root = this.root;
    for (;;) {
        //1.预计算,首先同步state
        long s = (root == this) ? state : reconcileState();
        //计算phase
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //已终结直接返回最终phase.
            return phase;
        //计算counts,unarrived
        int counts = (int)s;
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            //已经没有空余的unarrived信号了,不能再调用arrive,抛出异常.
            throw new IllegalStateException(badArrive(s));
        //2.减余arrive的有关逻辑.尝试cas减去一个arrive
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
            if (unarrived > 1)
                //2.1当前要减的信号不是本Phaser的最后一个信号量,调用root的等待方法.参数2是node,传空.
                return root.internalAwaitAdvance(phase, null);
            if (root != this)
                //2.2当前要减的信号量是非root的Phaser的最后一个,递归给parent(虽然用了return,可是parent也可能在进入2.1后阻塞).
                return parent.arriveAndAwaitAdvance();
            //2.3当前要减的信号量是root的最后一个.
            //2.3.1准备计算下一个状态,先取出state的parties信息.
            long n = s & PARTIES_MASK; 
            //计算nextUnarrived,它是如今的parties.
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            //2.3.2前进phase逻辑.
            if (onAdvance(phase, nextUnarrived))
                //须要终止,给新state的计算基石n加上终止标记.
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                //计算的nextUnarrived是0,即没有parties,加上空标记位.
                n |= EMPTY;
            else
                //下一轮能正常进行,加上nextUnarrived位.
                n |= nextUnarrived;
            //2.3.3给n加上下一个phase.
            int nextPhase = (phase + 1) & MAX_PHASE;
            n |= (long)nextPhase << PHASE_SHIFT;
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                //用n进行cas不成功,将新的phase返回.
                //说明一下,由于方法执行到此前已经执行过2的入口cas,减去了最后一个unarrived,所以在2到此的过程当中如有新的注册,
                //它内部会读到0个unarrived,就会等待下一个phase(参考前面介绍过的注册方法),所以cas失败不会是由于2以后有新的注册.
                //在整个arrive系列的方法中,最后一次arrive发生后,本Phaser不可能有其余线程再去执行相似2处的减余的状况.
                //故出现这种状况的缘由目前来看有二,一是还未介绍的强制关闭Phaser的方法,此时也会突兀地改掉state形成cas恰巧失败,二是
                //出现一些用户作出的奇葩行为,好比重写了其余公有方法.咱们天然忽略第二种状况,doug大神也是简单注释了一个"terminated".
                return (int)(state >>> PHASE_SHIFT); // terminated
            //cas成功,释放等待队列中的线程,返回下一个phase(由于在此过程当中的register会等到advance,此时的phase已是nextPhase了).
            releaseWaiters(phase);
            return nextPhase;
        }
        //3.减余失败说明出现竞态,直接开启下轮循环从新减余.
    }
}


//等待当前Phaser从给定的phase前进结束,若是当前phase不等于给定的phase,或者Phaser已终止当即返回.
//1.传入phase为负,返回它自己.
//2.传入的phase不是最新的phase,返回最新的.
//3.传入了最新的phase,等待到advance并返回advance后的phase.
public int awaitAdvance(int phase) {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase)
        //匹配成功,等root前进.参数node为null
        return root.internalAwaitAdvance(phase, null);
    return p;
}


//参考前面的几个方法,区别是可扰动.
public int awaitAdvanceInterruptibly(int phase)
    throws InterruptedException {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        //1.参数phase小于0直接返回它自己.
        return phase;
    if (p == phase) {
        //2.参数phase匹配,回忆一个前面介绍的QNode,匹配当前Phaser和phase,配置为可扰动且不计时.
        QNode node = new QNode(this, phase, true, false, 0L);
        //3.放入root的等待队列阻塞.
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //4.等待结束,判断是不是扰动形成的结束,前面介绍过QNode的相关逻辑,
            //它实现了ForkJoinPool.ManagedBlocker,所以在managedBlock方法进行时,
            //会循环调用问询是否能release,当咱们配置了可扰动且扰动了,就会标记这个wasInterrupted,释放线程引用并返回.
            //发现此种状况抛出异常.
            //同时,当发现等待成功,也会结束,释放线程引用并返回,但不带有扰动标记.
            throw new InterruptedException();
    }
    //5.返回1处以前读取的phase或3处获得的最新phase值.
    return p;
}

//同上方法,但带有计时.
public int awaitAdvanceInterruptibly(int phase,
                                     long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    long nanos = unit.toNanos(timeout);
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase) {
        //不一样于上面方法的地方,创建的QNode带有计时和等待时长.
        QNode node = new QNode(this, phase, true, true, nanos);
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //被扰动的状况.
            throw new InterruptedException();
        else if (p == phase)
            //时间到了phase没有前进,超时.
            throw new TimeoutException();
    }
    return p;
}

前面的几个核心方法粗略过完,补充一些重要内容.

首先在前面曾分析过有线程阻塞等待下一个phase的状况,并无加上定时等待的考虑.在超时的状况下,阻塞的线程可能会收到异常并退出.

创建QNode能够限定是否认时和可扰动,这取决于咱们使用哪一个方法去await.

除最后一个线程arrive外,全部线程调用这些方法都会减小一个arrive并加入等待队列,直到(1)配置了定时且超时,(2)当前是可扰动等待且使用了Thread.interrupt(),(3)最后一个线程使用上述方法或arrive方法,使得Phaser前进了一个轮次,internalWaitAdvance结束.其中(1)(2)均会迁成arrive线程抛出异常,只有(3)才是正常的状况.

QNode前面已介绍,它是一个blocker,须要调用ForkJoinPool::managedBlock才会起做用(显然root的internalAwaitAdvance必然与此方法有关联).固然这个做用与任务是否运行在ForkJoinPool无关,若是等待phaser前进的线程是运行在ForkJoinPool中的ForkJoinWorkerThread,显然会在internalAwaitAdvance期间进行补偿.这一块可参考前面的"CompletableFuture与响应式编程"和"ForkJoin框架之ForkJoinPool"两篇文章.

另外,这些代码也再次说明了root的做用: (1)对一切非root的Phaser进行等待都会用root的internalAwaitAdvance;(2)每次注册或arrive必定会同步root的最新phase.

其中(1)也间接说明了为何构建Phaser时只有root建立等待队列,全部子Phaser共享.

上面还保留了一个疑问,提到了"强制关闭Phaser"形成arriveAndAwaitAdvance出现cas失败的问题,doug大神直接注释了一个terminated,咱们立刻来看这一块,以及一些周边的公共函数,加深理解,而后再来解决关于等待队列最后的一些问题.

//强制关闭Phaser,让Phaser进入终止态,可是这个过程不影响它已注册的parties,若是此Phaser是
//一个Phaser树中的成员,那么全部phaser集中的Phaser都会关闭,若是它已经关闭,此方法无效.此方法能够
//用于若干任务出现意料以外异常的状况下的协调恢复.
public void forceTermination() {
    // Only need to change root state
    final Phaser root = this.root;
    long s;
    //已经是终止态直接忽略.
    while ((s = root.state) >= 0) {
        //直接尝试给root的state加上终止位.显然加上了它,子Phaser在注册和arrive等方法同步回新的phase就是个负数,
        //所以更改root的phase为负至关于判了全部Phaser的死刑.惟一须要解决的是已经阻塞在root.internalAwaitAdvandce的线程.
        if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                      s, s | TERMINATION_BIT)) {
            // 加上终止位成功,前后唤醒偶数等待队列和奇数等待队列.
            releaseWaiters(0); // Waiters on evenQ
            releaseWaiters(1); // Waiters on oddQ
            //返回
            return;
        }
    }
}


//返回当前phase,直接用root的state去取.
public final int getPhase() {
    return (int)(root.state >>> PHASE_SHIFT);
}

//查询注册的parties数量.调用前面介绍过的partiesOf
public int getRegisteredParties() {
    return partiesOf(state);
}

//查询已经arrived的parties数量.调用介绍过的arriveOf

public int getArrivedParties() {
    return arrivedOf(reconcileState());
}

//查询未arrive的parties数量,调用前面介绍过的unarrivedOf
public int getUnarrivedParties() {
    return unarrivedOf(reconcileState());
}

//返回parent
public Phaser getParent() {
    return parent;
}

//返回root
public Phaser getRoot() {
    return root;
}

//判断当前Phaser是否终止,直接取root的state是否为负,可见,终止态彻底取决于root.
public boolean isTerminated() {
    return root.state < 0L;
}

这些方法都比较简单,只有forceTermination须要再强调一翻,前面介绍arrayAndAwaitAdvance时曾提过在减去最后一个unarrived信号后去cas到下一个phase失败的状况,doug大神简单注释了一句terminated,直接返回了当前的phase(显然只能是负),在周边方法重重加锁的前提下,那一次cas的失败惟一一处就是强制关闭,由于它只改关闭标记位,至关于动了phase,而没有动unarrived标记位和parties标记位.因此重写Phaser的方法要谨慎,极可能不当心打破了这个封装.

从上面的有关方法能够看出,子Phaser的终止态严重依赖于root,目前能够肯定的是root的phase一旦表现出终止态,全部新来的注册,arrive,arrive并await将会当即返回,惟一须要关注的就是root被设置了终止标记后,正陷入等待的线程怎么办的问题.

咱们下面就来看Phaser的等待机制,这里面又能见到道格大神很是有趣的玩法.

//工具方法,移除某个phase的等待者.
private void releaseWaiters(int phase) {
    QNode q;  //保存队列中的队首
    Thread t;  // 保存线程引用.
    //取队列,用phase的奇偶决定,phase是偶数就取偶数队列,不然取奇数队列.而这个phase其实只用来取队列了,后续的操做与它无关.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    //循环,找出全部phase不等于root的phase的(其实root是最大的,因此就是找出非最新phase加入进来的waiter QNode)
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        //找出了,利用原子引用将head指向next.
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {
            //发现阻塞者,唤醒线程.回忆下前面实现blocker方法中的isReleaseble和block方法都有将线程置空的操做.(三种状况,唤醒扰动超时都会置空)
            //可是那些方法并无将表明该阻塞线程的QNode移除队列,所以可能会发现thread已是null(表明无阻塞者)的状况,只须要移除队列便可.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}


//上面releaseWaiters方法的一个变种,但它只会处理遍历过程当中位于头部的元素,出现正常的等待节点就会当即返回.
//此方法在这一块能够有效的减小内存的占用.退出时返回当前的phase.
private int abortWait(int phase) {
    //一样,参数phase只是用来选择要处理的队列.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        //计算最新phase的值p
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            //1.出现q为null表明整队列元素已出队,直接返回p;
            //或者在出队过程当中head(q)记录的线程引用还在,说明未超时或扰动,且是本phase的等待节点,终止循环并返回最新phase.
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            //进入条件,参考1的条件,由于1会直接返回.故进入2的条件实际上是q非空且处于旧的phase.只有这种状况才能够出队.
            //2.将q出队,置空线程引用,释放线程.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}

//计算有效cpu,控制自旋.
private static final int NCPU = Runtime.getRuntime().availableProcessors();

//常量,每轮arrive等待的字旋数,取决于NCPU,小于2则取1,不小于2取2的8次幂.
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;


//珊珊来迟的内部等待方法.它可能会一直阻塞到phase的advance发生(除非取消了等待).
//此方法仅限root调用.参数phase表示当前的phase,参数node表示等待节点,用于追踪节点的扰动或超时.
//若是是null,表示是一次不可扰动的等待.返回值为当前最新的phase.
private int internalAwaitAdvance(int phase, QNode node) {
    // 1.调用releaseWaiters,传入参数phase的前一个phase,显然这只是决定释放哪个队列.参数绝对实时准确的状况下会先将老的队列释放掉.
    releaseWaiters(phase-1); 
    //节点入队标记,入队了就会变为true
    boolean queued = false;   
    //记录每一轮循环的unarrived数量,用于决定是否扩增自旋等待次数.
    int lastUnarrived = 0; 
    //自旋数,参考上面的计算逻辑.
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    //开启循环,直到phase前进为止或内部判断已取消等待.
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        //2.传入node是null,即非可扰动的模式逻辑.只有非可扰动模式才有自旋.
        if (node == null) {  
            //2.1每轮自读进入都会尝试计算新的unarrived,若是发现出现了变更(变大或者变小),
            //会将它保存到前面的lastUnarrived.        
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                //发现新变化的unarrived<NCPU,扩增自旋次数,继续自旋.
                //unarrived的变化,若没有大量新的parties注册,会在自旋过程当中变小,反之大量加入注册,大于了NCPU则放弃增长自旋次数.
                spins += SPINS_PER_ARRIVAL;
            //2.2,未发现本轮循环unarrived发生变化,或者增长了大量注册,形成大于NCPU的逻辑,首先记录此时的线程扰动状态.
            boolean interrupted = Thread.interrupted();
            //2.3接2.2,若是发现了线程被扰动了,或者经若干次自旋减小次数,自旋次数并未能在2.1进行增长,直至减为0,进入if.
            if (interrupted || --spins < 0) { // need node to record intr
                //2.4,知足2.3进入if的条件,再也不继续自旋了,由于参数没有提供node,此处初始化一个node,不定时,不可扰动,并保存扰动状态.
                //下轮循环将没法进入2.
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        //3.参数传入了node,或者在2.4进入了node的初始化,每一轮循环到此都先判断是否可释放(若能够,内部会置thread为null).
        else if (node.isReleasable()) 
            //发现node所处的phase已经达到或者取消了,则break掉循环.
            break;
        //4.未能在非扰动模式下自旋解决(2)或提早发现node的扰动且未将node入队的状况下,将node入队.
        else if (!queued) { 
            //选择当前phase表明的队列.
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            //这一行不起眼的if条件代码真的是一个悄无声息解决了一个大暗坑的地方,后面说.
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) 
                //double check避免脏入队,入队条件是(1)无头,(2)或者头元素的phase等于参数phase(由于相邻的两个phase绝对不会入同一个队).
                //知足(1)(2)的同时,还要知足(3),参数phase就是当前的state表示的phase(由于此方法只能root使用,故为root表示的最新phase).
                //条件知足,入队,取代原来的head,原来head表明的node成为node的next.而条件不知足进入下一循环,极可能while条件就不知足了退出循环.
                queued = head.compareAndSet(q, node);
        }
        //5.已经在某一轮循环入队了,使用ForkJoinPool的managedBlock管理block,其间可能会释放线程引用.
        else {
            try {
                //5.1它内部也有循环,且会调用前面看到过的isReleasable和block实现,显然它一旦结束(包含扰动),必定会形成下轮外循环终止于3处.
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                //5.2出现扰动异常catch住,并保存.下轮循环也会终止在3处.
                node.wasInterrupted = true;
            }
        }
    }
    //6.走出上面的while循环,多是root已经advance到下一个phase(2前的循环),也多是传入node的状况下出现了扰动或超时(5)形成(3)知足
    if (node != null) {
        //6.1node存在表明可能已经压入队列,结果要么是已出现扰动或超时(方法结束后会抛出异常),要么是已正常完成.
        //显然,代码执行到此处就要返回了,阻塞的线程会抛出异常结束(超时或扰动)或继续执行(正常advance),
        //没有必要去尝试唤醒能执行出前面while循环到达6立刻要返回的线程.
        if (node.thread != null)
            //6.2取消node中的线程引用,避免外面的线程尝试唤醒.
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            //6.3若是node自己设置了不可被扰动,但5.2处判断线程自己抛出了扰动异常,却被catch住了,此处扰动本线程.
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            //6.4发现phase并未前进.仍是参数传入的pahse,说明必定是扰动或超时的结果,abortWait对本phase使用的队列进行清理,
            //而清理的目标前面已论述过,是本队列头部开始的早于本phase的元素.(发现一个不知足条件的就中止了清理).
            return abortWait(phase); // possibly clean up on abort
    }
    //7.退出上面的while循环必定会到此帮助释放早于最新阶段的waiter.注意,是早于最新phase的,参数phase只是决定了选哪一个队列(奇偶).
    //若是是6.4表明的那种扰动超时状况,此处其实释放的是旧的结果.被唤醒的线程其实通常是执行在5.1处阻塞的.当前线程能运行到此绝对不须要唤醒.
    releaseWaiters(phase);
    return p;
}

到此Phaser的代码解析已完毕,咱们来分析关于队列,等待和唤醒的问题.

1.Phaser维护了两个"队列",不论加入等待队列仍是弹出等待队列,都是从头部进行,新加入的成员会成功队列的新头,原来的头会成为它的next,弹出时next成为新头.因此至关于一个对头部的"后进先出",考虑官方起名和注释,咱们依旧保持队列这个称呼.

2.唤醒时,会从队列的头部依次弹出node 的phase早于root的最新phase的node,.

3.等待时,入队的node成为新的头.

4.当轮次增长时,会使用和本轮不一样的队列增长元素,同时也会唤醒本轮中等待的node.

由于唤醒和等待同时进行,且各自操做各自的队列(不一样的phase),所以彼此之间没有竞态(尽管一个是头入一个是头出),能够说设计巧妙,下面咱们来脑洞大开,思考一个极端状况.

咱们假设一种极端的phase切换场景,奇数phase大量等待入队,偶数phase则迅速完成.假设当前phase对应的队列是奇数对列,轮次提高完成后,它去释放当前的队列元素,结果未等这个释放操做执行完毕,偶数队列的轮次很快执行完,奇数队列中积压了成千上万个node未能释放,轮次却又切回到了奇数队列,会出现什么事?

显然奇数队列若是一直保持这种极端场景,它会愈来愈庞大,逼近撑爆内存的同时,大量线程也会得不到释放,甚至于老一轮的线程须要等待新一轮的线程去释放.为何老一轮的线程会去等待新一轮的线程释放呢?

releaseWaiter的方法咱们已经看出,它只会释放phase早于最新的node,此时最新压入的元素属于当前最新的phase,显然不知足条件,那么会形成奇数队列中两轮前压入的元素不能获得清除,两轮前就在释放当时积压node的线程(那一轮最后一个arrive)发现不符合清理条件,就直接return并终止了,只能等待本轮最后一个arrive出现后继续进行释放.若是本轮最后一个arrive出现很晚,在下一轮依旧保持如此极端,往返数轮,确实会致使奇数队列中积压大量node,且第一轮就在等待该轮次结束的线程早就知足了释放条件(升到了2轮),事实上多是第n轮才获得释放,这还符合Phaser的定义吗?咱们使用它,就是要保证每一轮单独使用,每一轮次达到条件,线程释放并执行,下一轮次是下一轮次.

然而doug的代码就是这个样子,想遍各类极端,以为可能找到了bug,那么就须要仔细思考了.做者来简述一下这个趟坑的分析过程.

这个问题确实已经获得了极大的规避了,毕竟是个极端状况.

1.线程的唤醒真的很快,尽管此处除了唤醒还包含了原子引用的更新(每次出队都要cas).

2.若是没有注册,显然就没有arrive相关的状况,尽管能够单独调用,但必须保证在arrive时传入的数量此时已经注册了,所以每一轮次(phase)中可能积压等待唤醒的线程的操做必定是在注册以后,可是咱们回忆一下,注册方法的第一步就是要等待完成advance,并且传给internalAwaitAdvance的node会是null,即不能扰动和超时,因此当本轮次阻塞了必定数量的线程后,若是不去arrive,也不考虑超时和扰动的状况,那么线程将一直阻塞.咱们不可能在轮次advance前进行注册,也就不可能在advance以前进行新一phase的arrive.

3.当本轮次的最后一个arrive线程触发了轮次的更新后,才能够开启注册以及新轮次的arrive,可是此时使用了另外一个等待队列,而触发了轮次更新的上一轮的arrive线程将会当即进行前一个队列中积压的线程的唤醒操做.只有该唤醒操做足够慢,且新的轮次极快就完成了的状况,才可能形成在原arrive线程未能及时释放奇数队列的状况下,新一轮次再次向其中添加元素.

4.最重要的还在上面的internalAwaitAdvance方法,那一段被做者标上了入队条件的注释处,要想入队,必须if ((q == null || q.phase == phase) &&加上后面的条件,而这两个条件的限定已经很明显,要想入队,必须知足该等待队列没有元素或者队首是本轮的元素,而该方法又是下一轮首次注册时必须等待完成的,下一轮的arrive又必须发生在下一轮的首次注册以后,所以根本不会出现本轮wait的线程还要等下一轮甚至下N轮的线程去释放的极端状况,哪怕真的去作一个极端测试:让奇数轮大量积压线程,让偶数轮快速切换,而后测试第一轮压入的线程究竟是不是本轮释放的.(做者差点就要写单元测试去作这个极端测试了!)

这一段不经意的if,一个小小的条件,若是不注意真的忽略了,小代码大功效,谁能想到,这么深的暗坑就这样被规避了.

总结

前面已经详述了Phaser的源码以及若干趟坑辛路.其实已经没什么好总结的了,就在此顺便对比常见同步器CyclicBarrier,CountDownLatch,Semaphore的特征和实现.

从使用特征上看:

1.CountDownLatch是一次性的,只能初始化决定parties数量,等待者能够是多个,每次释放都会减小一个信号量,直到归0时为止,最后一个释放者将唤醒其余等待的线程.它也不能继续使用.

2.CyclicBarrier是可重用的,分代的,每一代之间彼此独立,可是每一代的初始parties是相同的,不可在运行期内动态调整,每一代最后一个线程会去开启一下代,并能够在此时运行一个用户指定的action,与此同时唤醒其余线程继续执行.它能够在运行完一代后继续被使用.而且它还支持重置.

3.Semaphore是一个资源量的典型,若是说CountDownLatch和CyclicBarrier或者Phaser都是等到"人够了"再放行,Semaphore倒是起到限流的做用,它控制了有限的令牌数,这个数量不能够动态地更改,在不能acquire到足够的令牌数时,线程将阻塞,直到其余线程释放了足量的令牌数并唤醒它为止.每个持有了令牌的线程均可以唤醒阻塞等待获取的线程.

4.Phaser的功能上不一样很明显,首先它的参与者数量几乎时刻可变(除了正在进入下一phase期间),随时能够增长减小parties数量,每一phase等待者能够是多个,每一phase中,每一个能从internalAwaitAdvance方法中走出循环的线程均可以帮助唤醒,固然最终能进入唤醒操做仍是要归功于最后一个arrive的线程(尽管它arrive后其余线程醒来后也会帮助唤醒).Phaser的唤醒者不必定是参与者.

从实现来看:

1.CountDownLatch借助了aqs来实现parties的释放,它使用cas+park的方式,不使用Lock.

2.CyclicBarrier须要借助重入锁和condition,每个await的线程都要全局加锁,阻塞时await在condition上.

3.Semaphore在实现上相似CountDownLatch,也是基于aqs,只不过它容许获取和释放,对state有增有减,总量不变.也是cas+park的方式阻塞,也不使用Lock

4.Phaser由于功能的要求,不基于AQS(它不能有构建时就固定的state,尽管能够初始化一个state,但它必须支持改变),它依托于原子引用实现了一个内部的队列,相应的等待/入队/唤醒等操做经过cas自旋+park的方式,一样不使用Lock.并利用双队列的方式规避了前一轮的释放和后一轮的响醒的阻塞.

此外还有两点结合前面的推理和自测验证的结论:

1.Phaser中的每个phase是保证了可见性的,经做者自测,在任何使用Phaser的代码中await先后,不会出现串phase读出的乱序状况(侧面说明每一个phase不会依赖后一个或几个phase的释放).

2.Phaser须要对await的线程进行阻塞时,是将它打包成一个node(blocker),利用ForkJoinPool来block的.若是使用Phaser同步的任务是运行在ForkJoinPool中的,它将会利用到相应的补偿机制,经做者自测,这将保证Phaser中block的每个任务必然获得执行,每个阻塞的线程必然获得释放.