# JUC 之Phaser

2019年12月06日 阅读数：32

## Phaser源码

``````//64位整数表示Phaser的状态.
private volatile long state;

private static final int  MAX_PARTIES     = 0xffff;//最大parties,后16位表示.
private static final int  MAX_PHASE       = Integer.MAX_VALUE;//最大phase,最大整数值.
private static final int  PARTIES_SHIFT   = 16;//取parties使用的移位数,16
private static final int  PHASE_SHIFT     = 32;//取phase的移位数,32
private static final int  UNARRIVED_MASK  = 0xffff;      //未到的,取后16位.
private static final long PARTIES_MASK    = 0xffff0000L; //参加者,17-32位.
private static final long COUNTS_MASK     = 0xffffffffL; //数量,后32位.
private static final long TERMINATION_BIT = 1L << 63;//终止态,首位.

// 特殊值.
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.显然,它表示了一个ONE_ARRIVAL信息和PARTY信息.
private static final int  EMPTY           = 1;

//对一个state s计算unarrived的count,
private static int unarrivedOf(long s) {
//直接取整数位,若是等于EMPTY(1)则返回0,不然取后16位.
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

//对一个state,取出parties信息,直接取state的17至32位.
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
//对于一个state,取出phase信息,直接取前32位.
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
//对于一个state,取出arrived信息
private static int arrivedOf(long s) {
int counts = (int)s;
//state的后32位等于1(EMPTY)返回0,不然返回parties(state的17至32位,参考上面的partiesOf方法)和UNARRIVED(state的后16位)的差.
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
``````

Phaser用一个long型的state保存状态信息.工具

state的前32位表示phase,后16位表示unarrivied,17至32位表示parties,parties减去unarrived即arrived.单元测试

``````//this的父,能够是null表示none
private final Phaser parent;

//phaser显然是个树的结果,root表明根,若是当前phaser不在树内,则root==this
private final Phaser root;

//偶数队列和奇数队列.它们存放等待线程栈的头,为了减小当添加线程与释放线程的竞态,
//这里使用了两个队列并互相切换,子phaser共享root的队列以加快释放.
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

//决定某个phase的等待线程队列.
private AtomicReference<QNode> queueFor(int phase) {
//选择队列的方法,若是参数phase是偶数,使用evenQ,不然oddQ.
return ((phase & 1) == 0) ? evenQ : oddQ;
}

//出现arrive事件时的边界异常信息.
return "Attempted arrival of unregistered party for " +
stateToString(s);
}

//注册时的边界异常信息.
return "Attempt to register more than " +
MAX_PARTIES + " parties for " + stateToString(s);
}
//他们都用到的stateToString(s),计算参数s对应的phase,parties,arrived.
private String stateToString(long s) {
return super.toString() +
"[phase = " + phaseOf(s) +
" parties = " + partiesOf(s) +
" arrived = " + arrivedOf(s) + "]";
}
``````

``````//表示等待队列的QNode,实现了ManagedBlocker
static final class QNode implements ForkJoinPool.ManagedBlocker {
//存放所属phaser
final Phaser phaser;
//所属phase
final int phase;
//是否可扰动
final boolean interruptible;
//是否认时
final boolean timed;
//是否已扰动
boolean wasInterrupted;
//计时相关
long nanos;
//关联线程,当是null时,取消等待.
//下一个QNode
QNode next;

QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
//取当前线程.
}
//isReleasable方法
public boolean isReleasable() {
//1.线程已置空(如2),返回true释放.
return true;
if (phaser.getPhase() != phase) {
//2.发现phaser所处的phase不是构建QNode时的phase了,就置线程为空,返回true.
return true;
}
//3.若是当前线程扰动了.
wasInterrupted = true;
if (wasInterrupted && interruptible) {
//4.发现扰动标记,而且QNode配置为可扰动,则置线程null并返回true
return true;
}
if (timed) {
//5.定时逻辑,还有nanos,计算新的时长.
if (nanos > 0L) {
}
if (nanos <= 0L) {
//已经到时间,返回true,线程置空.
return true;
}
}
return false;
}
//block逻辑
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
//不定时的park
LockSupport.park(this);
else if (nanos > 0L)
//定时的状况.
LockSupport.parkNanos(this, nanos);
//老规矩
return isReleasable();
}
}
``````

``````//doArrive方法
//它是arrive和arriveAndDeregister方法的主要实现.手动调用这些方法能够加速经过和最小化竞态窗口期.
//参数表明要从当前state中减去的调整数值,它的单位依托于业务,当为arrive时减去的单位为ONE_ARRIVAL,
//当为arriveAndDeregister时减去的单位为ONE_DEREGISTER.
final Phaser root = this.root;
for (;;) {
//1.变量s初始化,取决因而否当前Phaser是root.不是root将试图从root同步滞后的state.
long s = (root == this) ? state : reconcileState();
//计算phase,前32位.
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
//2.负数直接返回.说明原来的state首位就是1,前面的TERMINATE_BIT就是64位置1.
return phase;
//取count,后32位.
int counts = (int)s;
//计算unarrived,和前面同样的逻辑.
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)//2.1
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
//3.1cas成功后,unarrived余1,则前进一个phase
if (unarrived == 1) {
//3.1.1取出parties做为下一个state的基础.
long n = s & PARTIES_MASK;
//3.1.2 下一个unarrived,数值上等于parties.
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
//3.1.5nextUnArrived不是0,加到n上.
n |= nextUnarrived;
//3.1.6前面的流程完成了state的后32位(parties和unarrived),接下来处理前32位.
//限定在MAX_PHASE以内,对当前phase加1.
int nextPhase = (phase + 1) & MAX_PHASE;
//将nextPhase的值加到n的前32位.并用n去cas掉原来的state,由于有3处入口的cas,此处必定能成功
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
//更新到新的phase,唤醒等待的waiter.
releaseWaiters(phase);
}
//3.1.7当前Phaser不是root,当nextUnarrived计算得0时,像父传递解除注册,参数ONE_DEREGISTER
//会同时减去一个unarrived和一个parties.下轮循环正常应进入3.1.8
else if (nextUnarrived == 0) {
phase = parent.doArrive(ONE_DEREGISTER);
//完成传递后,将本身的state置empty.
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
//3.1.8,当前Phaser不是root,计算的nextUnarrived非0,像父传递一个arrive事件,减去一个unarrived.
phase = parent.doArrive(ONE_ARRIVAL);
}
//3.2返回当前phase,多是已进入3.1递增的.仅有此处可退出循环.
return phase;
}
}
}``````

``````//注册和批量注册.参数表明parties和unarrived字段的增长数,它必须大于0.
private int doRegister(int registrations) {
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
//循环尝试更改.
for (;;) {
//2.存在parent,则用root的phase调整this的state.
long s = (parent == null) ? state : reconcileState();
//取出当前state中保存的counts,parties,unarrived信息.
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
//要注册的数量大于了余量,抛出异常.
//3.计算出phase
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
//phase为负说明state为负,即终止态,终止.
break;
//4.当前state表示的参与数非空的逻辑,当前注册非首次注册.
if (counts != EMPTY) {
if (parent == null || reconcileState() == s) {
//this是root或者从root同步的state不变,继续执行,不然从新循环.
if (unarrived == 0)
//4.1本轮循环经过原state计算的unarrived为0,说明应等待下一phase,使用root等待
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
break;
}
}
//5.当前不存在counts,且自身就是root,表明root的首次注册.
else if (parent == null) {
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
//5.2 cas成功,退出,不成功,下轮循环.
break;
}
//6.是首次注册,但也不是root的逻辑.表明非root的Phaser的首次注册.
else {
//6.1对当前Phaser加锁并double check,避免同时调用.加锁失败的线程将在后续进入2的逻辑.
synchronized (this) {
//double check state未发生改变.
if (state == s) {
//6.2首先向父Phaser注册1.
phase = parent.doRegister(1);
if (phase < 0)
//发现进入终止态,直接中止.
break;

//在整个while循环中,再也不考虑phase进入终止态的状况,由于这些操做处于同一个"事务"中,
//且因竞态等缘由,若某次cas时计入了负数的phase,方法返回后也能够及时发现.
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
//若是cas不成功,则读取s为新的state,计算新的phase并从新循环.
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
//6.4cas成功后退出循环.
break;
}
//若是if(state==s)判断失败,说明有别的线程有当前线程进入synchronized块前已经加锁并执行了内部的逻辑且稍后释放了锁,
//这样当前线程加锁成功,但if判断失败,它会当即释放锁并返回到2.
}
}

}
return phase;
}

//使用root的phase调整this的state,更新滞后的结果.这通常发生在root前进了phase可是
//子phaser尚未作到这一步,这种状况下,子phaser必须完成这个前进的步骤,这一过程当中,phase将
//被置为root的phase,unarrived则会重置为parties,若parties为0,则置为EMPTY.返回结果state.
private long reconcileState() {
final Phaser root = this.root;
long s = state;
//不是root才进行下一步.
if (root != this) {
int phase, p;
//cas,phase采用root,parties不变,unarrived重置为parties或EMPTY.
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
//phase滞后于root
//尝试cas.
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
//肯定新state的前32位,使用root的phase.
s = (((long)phase << PHASE_SHIFT) |
//新phase<0,后32位直接取this的state表示的counts.
((phase < 0) ? (s & COUNTS_MASK) :
//phase有效,this的state表示的parties为0,则后32位使用empty
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
//不然,后32位使用parties.
s = state;
}
return s;
}

//默认实现为parties数为0,方法返回true时,调用者会尝试终止Phaser.(参考前面的doArrive).随后调用isTerminated方法将返回true.
//方法的默认实现返回true的场景目前只能是通过数次arriveAndDeregister调用形成parties归零的结果.咱们继承Phaser能够轻易地重写此行为,
//好比简单粗暴地返回false,那么将永远容许新的注册.
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
``````

doArrive方法会每次调整unarrived数量(也可包含parties数量,若是使用了解除注册),当Phaser调用自身的arrive/arriveAndDeregister时,会作出相应的减小,并根据是否为root而决定向上递归.

Phaser减小自身unarrived信号(也可能同时有parties信号)后,若发现这已是最后一个unarrived信号,则进行接下来的判断:

2.当前Phaser不是root,且它已经不具有继续下一轮的条件(计算nextUnarrived为0,即parties已经被arriveAndDeregister置0),则从父Phaser减小一个unarrived和parties.

3.当前Phaser不是root,但它仍具备parties,知足进行下一轮的条件(计算nextUnarrived不是0),则从父Phaser减小一个unarrived,但不减小parties.

doRegister方法的逻辑大体相反,不一样于doArrive,它的参数registrations同时做用于parties和unarrived,即两个位上同时加上registrations参数.它的大体逻辑:

1.当前注册并不是首次注册,且出现unarrived==0,即本轮已经完成了arrive,那么本轮将不能注册,须要等待root更新到下轮.(这也是咱们碰到的第一个阻塞)

2.当前注册并不是首次注册,unarrived也不是0,则在本phase进行注册,增长相应的parties和unarrived.

3.当前注册是root的首次注册,给root的state加上相应的parties和unarrived.

4.当前注册是非root的首次注册,加锁(this),对本身的state加上相应的parties和unarrived(同上,以registrations为单位),而对parent加上一个parties和unarrived单位.

1.对任意Phaser树中的某一个Phaser调用注册操做,会令自身加上相应参数个parties和unarrived单位,仅会在该Phaser第一次注册时增长父Phaser(极端可能,仅从一个叶子节点第一个注册的状况下可一直递归到root)的parties数和unarrived数各1单位(不论参数是多少).

2.对任意Phaser树中的某一个Phaser调用arrive操做,会令自身减去相应的参数个parties和unarrived单位,同时仅当本Phaser此时是最后一个unarrived时,会减去父Phaser的一个unarrived单位(当前子Phaser仍旧有parties能够构建下一phase),或减去父Phaser一个Parties和unarrived单位.(极端状况下,每一级都是最后一个unarrived时,减小叶子节点的最后一个unarrived会递归到root).

``````//初始化一个Phaser,指定parent,指定未到来的参与者数(unarrived parties),但这只是一个初值,
//当咱们在任什么时候候调用注册方法时,还会相应的增长.
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
//太大了,超过了后16位能表示的整数.
throw new IllegalArgumentException("Illegal number of parties");
//初始phase为0.
int phase = 0;
this.parent = parent;
if (parent != null) {
//1.有parent的状况,共享parent的root,队列,并向parent中注册1个parties和unarrived,
//同时同步一次phase(表面上是同步了parent的,实际上前面已经看过,会同步root).
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
//2.无parent的状况,root就是this,并初始化奇偶等待队列.它使用原子引用的形式存放一个QNode,而QNode咱们前面已介绍.
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
//统一初始化state,后32位的决定依托于parties,若是parties是0则给予EMPTY,直接无论高32位.
//不为0则给予phase设置为前32位,parties设置parties位和unarrived位.
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

//注册方法,就是调用doRegister,参数1.
//若是this有parent,且它以前没有过注册的parties,则首次注册会触发自身向parent的注册.
//若是this已经终止了,那么尝试注册将会无效并返回负值.若是注册的数量大于了最大支持parties(后16位整数),
//会抛出IllegalStateException
public int register() {
return doRegister(1);
}

//批量注册指定的信号量,并返回最新的phase.规则基本同上.
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
//参数0直接查询最新的phase返回
return getPhase();
return doRegister(parties);
}

//arrive一个信号,不等待其余arrive事件,返回最新phase(终止态为负).
//当前Phaser的arrive事件已满,则对parent来讲也会触发一个arrive.(若是有parent)
public int arrive() {
return doArrive(ONE_ARRIVAL);
}

//arrive并解除一个注册parties,也不阻塞等待其余arrive.若是当前Phaser的解除注册操做
//将parties减至0,且this有parent,这将致使parent也减小一个parties(本phaser解除在parent的注册).
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
``````

``````//令当前线程"到达"此phaser并等待其余parties,它等效于awaitAdvance(arrive()).
//注意,按照道格的注释,若是你在一个未进行注册(调用register)的线程里调用此方法实际上是一个使用错误,
//可是从本方法和前面以及后面有关的方法来看,全部记录线程的方法均只与arrive和等待有关,与注册无关.
//所以Phaser自己没法规避这种使用错误,咱们彻底可使用另外一个线程去注册,而当前线程去arrive,将两个动做分开.
//方法会返回arrive时最新的phase号.终止时会是负值.
//记录root,开始循环.
final Phaser root = this.root;
for (;;) {
//1.预计算,首先同步state
long s = (root == this) ? state : reconcileState();
//计算phase
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
//已终结直接返回最终phase.
return phase;
//计算counts,unarrived
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
//已经没有空余的unarrived信号了,不能再调用arrive,抛出异常.
//2.减余arrive的有关逻辑.尝试cas减去一个arrive
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
//2.1当前要减的信号不是本Phaser的最后一个信号量,调用root的等待方法.参数2是node,传空.
if (root != this)
//2.2当前要减的信号量是非root的Phaser的最后一个,递归给parent(虽然用了return,可是parent也可能在进入2.1后阻塞).
//2.3当前要减的信号量是root的最后一个.
//2.3.1准备计算下一个状态,先取出state的parties信息.
long n = s & PARTIES_MASK;
//计算nextUnarrived,它是如今的parties.
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
//2.3.2前进phase逻辑.
//须要终止,给新state的计算基石n加上终止标记.
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
//计算的nextUnarrived是0,即没有parties,加上空标记位.
n |= EMPTY;
else
//下一轮能正常进行,加上nextUnarrived位.
n |= nextUnarrived;
//2.3.3给n加上下一个phase.
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
//用n进行cas不成功,将新的phase返回.
//说明一下,由于方法执行到此前已经执行过2的入口cas,减去了最后一个unarrived,所以在2到此的过程当中如有新的注册,
//它内部会读到0个unarrived,就会等待下一个phase(参考前面介绍过的注册方法),所以cas失败不会是由于2以后有新的注册.
//在整个arrive系列的方法中,最后一次arrive发生后,本Phaser不可能有其余线程再去执行相似2处的减余的状况.
//故出现这种状况的缘由目前来看有二,一是还未介绍的强制关闭Phaser的方法,此时也会突兀地改掉state形成cas恰巧失败,二是
//出现一些用户作出的奇葩行为,好比重写了其余公有方法.咱们天然忽略第二种状况,doug大神也是简单注释了一个"terminated".
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
//3.减余失败说明出现竞态,直接开启下轮循环从新减余.
}
}

//等待当前Phaser从给定的phase前进结束,若是当前phase不等于给定的phase,或者Phaser已终止当即返回.
//1.传入phase为负,返回它自己.
//2.传入的phase不是最新的phase,返回最新的.
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
//匹配成功,等root前进.参数node为null
return p;
}

//参考前面的几个方法,区别是可扰动.
throws InterruptedException {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
//1.参数phase小于0直接返回它自己.
return phase;
if (p == phase) {
//2.参数phase匹配,回忆一个前面介绍的QNode,匹配当前Phaser和phase,配置为可扰动且不计时.
QNode node = new QNode(this, phase, true, false, 0L);
//3.放入root的等待队列阻塞.
if (node.wasInterrupted)
//4.等待结束,判断是不是扰动形成的结束,前面介绍过QNode的相关逻辑,
//它实现了ForkJoinPool.ManagedBlocker,所以在managedBlock方法进行时,
//会循环调用问询是否能release,当咱们配置了可扰动且扰动了,就会标记这个wasInterrupted,释放线程引用并返回.
//发现此种状况抛出异常.
//同时,当发现等待成功,也会结束,释放线程引用并返回,但不带有扰动标记.
throw new InterruptedException();
}
//5.返回1处以前读取的phase或3处获得的最新phase值.
return p;
}

//同上方法,但带有计时.
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
//不一样于上面方法的地方,创建的QNode带有计时和等待时长.
QNode node = new QNode(this, phase, true, true, nanos);
if (node.wasInterrupted)
//被扰动的状况.
throw new InterruptedException();
else if (p == phase)
//时间到了phase没有前进,超时.
throw new TimeoutException();
}
return p;
}
``````

``````//强制关闭Phaser,让Phaser进入终止态,可是这个过程不影响它已注册的parties,若是此Phaser是
//一个Phaser树中的成员,那么全部phaser集中的Phaser都会关闭,若是它已经关闭,此方法无效.此方法能够
//用于若干任务出现意料以外异常的状况下的协调恢复.
public void forceTermination() {
// Only need to change root state
final Phaser root = this.root;
long s;
//已经是终止态直接忽略.
while ((s = root.state) >= 0) {
//直接尝试给root的state加上终止位.显然加上了它,子Phaser在注册和arrive等方法同步回新的phase就是个负数,
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
// 加上终止位成功,前后唤醒偶数等待队列和奇数等待队列.
releaseWaiters(0); // Waiters on evenQ
releaseWaiters(1); // Waiters on oddQ
//返回
return;
}
}
}

//返回当前phase,直接用root的state去取.
public final int getPhase() {
return (int)(root.state >>> PHASE_SHIFT);
}

//查询注册的parties数量.调用前面介绍过的partiesOf
public int getRegisteredParties() {
return partiesOf(state);
}
``````

//查询已经arrived的parties数量.调用介绍过的arriveOf

``````public int getArrivedParties() {
return arrivedOf(reconcileState());
}

//查询未arrive的parties数量,调用前面介绍过的unarrivedOf
public int getUnarrivedParties() {
return unarrivedOf(reconcileState());
}

//返回parent
public Phaser getParent() {
return parent;
}

//返回root
public Phaser getRoot() {
return root;
}

//判断当前Phaser是否终止,直接取root的state是否为负,可见,终止态彻底取决于root.
public boolean isTerminated() {
return root.state < 0L;
}
``````

``````//工具方法,移除某个phase的等待者.
private void releaseWaiters(int phase) {
QNode q;  //保存队列中的队首
//取队列,用phase的奇偶决定,phase是偶数就取偶数队列,不然取奇数队列.而这个phase其实只用来取队列了,后续的操做与它无关.
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
//循环,找出全部phase不等于root的phase的(其实root是最大的,因此就是找出非最新phase加入进来的waiter QNode)
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
(t = q.thread) != null) {
//发现阻塞者,唤醒线程.回忆下前面实现blocker方法中的isReleaseble和block方法都有将线程置空的操做.(三种状况,唤醒扰动超时都会置空)
LockSupport.unpark(t);
}
}
}

//上面releaseWaiters方法的一个变种,但它只会处理遍历过程当中位于头部的元素,出现正常的等待节点就会当即返回.
//此方法在这一块能够有效的减小内存的占用.退出时返回当前的phase.
private int abortWait(int phase) {
//一样,参数phase只是用来选择要处理的队列.
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;) {
//计算最新phase的值p
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
//1.出现q为null表明整队列元素已出队,直接返回p;
return p;
if (head.compareAndSet(q, q.next) && t != null) {
//进入条件,参考1的条件,由于1会直接返回.故进入2的条件实际上是q非空且处于旧的phase.只有这种状况才能够出队.
//2.将q出队,置空线程引用,释放线程.
LockSupport.unpark(t);
}
}
}

//计算有效cpu,控制自旋.
private static final int NCPU = Runtime.getRuntime().availableProcessors();

//常量,每轮arrive等待的字旋数,取决于NCPU,小于2则取1,不小于2取2的8次幂.
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;

//此方法仅限root调用.参数phase表示当前的phase,参数node表示等待节点,用于追踪节点的扰动或超时.
//若是是null,表示是一次不可扰动的等待.返回值为当前最新的phase.
private int internalAwaitAdvance(int phase, QNode node) {
// 1.调用releaseWaiters,传入参数phase的前一个phase,显然这只是决定释放哪个队列.参数绝对实时准确的状况下会先将老的队列释放掉.
releaseWaiters(phase-1);
//节点入队标记,入队了就会变为true
boolean queued = false;
//记录每一轮循环的unarrived数量,用于决定是否扩增自旋等待次数.
int lastUnarrived = 0;
//自旋数,参考上面的计算逻辑.
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
//开启循环,直到phase前进为止或内部判断已取消等待.
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
//2.传入node是null,即非可扰动的模式逻辑.只有非可扰动模式才有自旋.
if (node == null) {
//2.1每轮自读进入都会尝试计算新的unarrived,若是发现出现了变更(变大或者变小),
//会将它保存到前面的lastUnarrived.
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
//发现新变化的unarrived<NCPU,扩增自旋次数,继续自旋.
//unarrived的变化,若没有大量新的parties注册,会在自旋过程当中变小,反之大量加入注册,大于了NCPU则放弃增长自旋次数.
spins += SPINS_PER_ARRIVAL;
//2.2,未发现本轮循环unarrived发生变化,或者增长了大量注册,形成大于NCPU的逻辑,首先记录此时的线程扰动状态.
//2.3接2.2,若是发现了线程被扰动了,或者经若干次自旋减小次数,自旋次数并未能在2.1进行增长,直至减为0,进入if.
if (interrupted || --spins < 0) { // need node to record intr
//2.4,知足2.3进入if的条件,再也不继续自旋了,由于参数没有提供node,此处初始化一个node,不定时,不可扰动,并保存扰动状态.
//下轮循环将没法进入2.
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable())
//发现node所处的phase已经达到或者取消了,则break掉循环.
break;
//4.未能在非扰动模式下自旋解决(2)或提早发现node的扰动且未将node入队的状况下,将node入队.
else if (!queued) {
//选择当前phase表明的队列.
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
//这一行不起眼的if条件代码真的是一个悄无声息解决了一个大暗坑的地方,后面说.
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase)
//double check避免脏入队,入队条件是(1)无头,(2)或者头元素的phase等于参数phase(由于相邻的两个phase绝对不会入同一个队).
//知足(1)(2)的同时,还要知足(3),参数phase就是当前的state表示的phase(由于此方法只能root使用,故为root表示的最新phase).
}
//5.已经在某一轮循环入队了,使用ForkJoinPool的managedBlock管理block,其间可能会释放线程引用.
else {
try {
//5.1它内部也有循环,且会调用前面看到过的isReleasable和block实现,显然它一旦结束(包含扰动),必定会形成下轮外循环终止于3处.
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
//5.2出现扰动异常catch住,并保存.下轮循环也会终止在3处.
node.wasInterrupted = true;
}
}
}
if (node != null) {
//6.1node存在表明可能已经压入队列,结果要么是已出现扰动或超时(方法结束后会抛出异常),要么是已正常完成.
//没有必要去尝试唤醒能执行出前面while循环到达6立刻要返回的线程.
//6.2取消node中的线程引用,避免外面的线程尝试唤醒.
node.thread = null;       // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
//6.3若是node自己设置了不可被扰动,但5.2处判断线程自己抛出了扰动异常,却被catch住了,此处扰动本线程.
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
//6.4发现phase并未前进.仍是参数传入的pahse,说明必定是扰动或超时的结果,abortWait对本phase使用的队列进行清理,
//而清理的目标前面已论述过,是本队列头部开始的早于本phase的元素.(发现一个不知足条件的就中止了清理).
return abortWait(phase); // possibly clean up on abort
}
//7.退出上面的while循环必定会到此帮助释放早于最新阶段的waiter.注意,是早于最新phase的,参数phase只是决定了选哪一个队列(奇偶).
//若是是6.4表明的那种扰动超时状况,此处其实释放的是旧的结果.被唤醒的线程其实通常是执行在5.1处阻塞的.当前线程能运行到此绝对不须要唤醒.
releaseWaiters(phase);
return p;
}
``````

1.Phaser维护了两个"队列",不论加入等待队列仍是弹出等待队列,都是从头部进行,新加入的成员会成功队列的新头,原来的头会成为它的next,弹出时next成为新头.因此至关于一个对头部的"后进先出",考虑官方起名和注释,咱们依旧保持队列这个称呼.

2.唤醒时,会从队列的头部依次弹出node 的phase早于root的最新phase的node,.

3.等待时,入队的node成为新的头.

4.当轮次增长时,会使用和本轮不一样的队列增长元素,同时也会唤醒本轮中等待的node.

releaseWaiter的方法咱们已经看出,它只会释放phase早于最新的node,此时最新压入的元素属于当前最新的phase,显然不知足条件,那么会形成奇数队列中两轮前压入的元素不能获得清除,两轮前就在释放当时积压node的线程(那一轮最后一个arrive)发现不符合清理条件,就直接return并终止了,只能等待本轮最后一个arrive出现后继续进行释放.若是本轮最后一个arrive出现很晚,在下一轮依旧保持如此极端,往返数轮,确实会致使奇数队列中积压大量node,且第一轮就在等待该轮次结束的线程早就知足了释放条件(升到了2轮),事实上多是第n轮才获得释放,这还符合Phaser的定义吗?咱们使用它,就是要保证每一轮单独使用,每一轮次达到条件,线程释放并执行,下一轮次是下一轮次.

1.线程的唤醒真的很快,尽管此处除了唤醒还包含了原子引用的更新(每次出队都要cas).

3.当本轮次的最后一个arrive线程触发了轮次的更新后,才能够开启注册以及新轮次的arrive,可是此时使用了另外一个等待队列,而触发了轮次更新的上一轮的arrive线程将会当即进行前一个队列中积压的线程的唤醒操做.只有该唤醒操做足够慢,且新的轮次极快就完成了的状况,才可能形成在原arrive线程未能及时释放奇数队列的状况下,新一轮次再次向其中添加元素.

4.最重要的还在上面的internalAwaitAdvance方法,那一段被做者标上了入队条件的注释处,要想入队,必须if ((q == null || q.phase == phase) &&加上后面的条件,而这两个条件的限定已经很明显,要想入队,必须知足该等待队列没有元素或者队首是本轮的元素,而该方法又是下一轮首次注册时必须等待完成的,下一轮的arrive又必须发生在下一轮的首次注册以后,所以根本不会出现本轮wait的线程还要等下一轮甚至下N轮的线程去释放的极端状况,哪怕真的去作一个极端测试:让奇数轮大量积压线程,让偶数轮快速切换,而后测试第一轮压入的线程究竟是不是本轮释放的.(做者差点就要写单元测试去作这个极端测试了!)

## 总结

1.CountDownLatch是一次性的,只能初始化决定parties数量,等待者能够是多个,每次释放都会减小一个信号量,直到归0时为止,最后一个释放者将唤醒其余等待的线程.它也不能继续使用.

2.CyclicBarrier是可重用的,分代的,每一代之间彼此独立,可是每一代的初始parties是相同的,不可在运行期内动态调整,每一代最后一个线程会去开启一下代,并能够在此时运行一个用户指定的action,与此同时唤醒其余线程继续执行.它能够在运行完一代后继续被使用.而且它还支持重置.

3.Semaphore是一个资源量的典型,若是说CountDownLatch和CyclicBarrier或者Phaser都是等到"人够了"再放行,Semaphore倒是起到限流的做用,它控制了有限的令牌数,这个数量不能够动态地更改,在不能acquire到足够的令牌数时,线程将阻塞,直到其余线程释放了足量的令牌数并唤醒它为止.每个持有了令牌的线程均可以唤醒阻塞等待获取的线程.

1.CountDownLatch借助了aqs来实现parties的释放,它使用cas+park的方式,不使用Lock.

2.CyclicBarrier须要借助重入锁和condition,每个await的线程都要全局加锁,阻塞时await在condition上.

3.Semaphore在实现上相似CountDownLatch,也是基于aqs,只不过它容许获取和释放,对state有增有减,总量不变.也是cas+park的方式阻塞,也不使用Lock

4.Phaser由于功能的要求,不基于AQS(它不能有构建时就固定的state,尽管能够初始化一个state,但它必须支持改变),它依托于原子引用实现了一个内部的队列,相应的等待/入队/唤醒等操做经过cas自旋+park的方式,一样不使用Lock.并利用双队列的方式规避了前一轮的释放和后一轮的响醒的阻塞.

1.Phaser中的每个phase是保证了可见性的,经做者自测,在任何使用Phaser的代码中await先后,不会出现串phase读出的乱序状况(侧面说明每一个phase不会依赖后一个或几个phase的释放).

2.Phaser须要对await的线程进行阻塞时,是将它打包成一个node(blocker),利用ForkJoinPool来block的.若是使用Phaser同步的任务是运行在ForkJoinPool中的,它将会利用到相应的补偿机制,经做者自测,这将保证Phaser中block的每个任务必然获得执行,每个阻塞的线程必然获得释放.