在JAVA 1.7引入了一个新的并发API:Phaser(移相器),一个可重用的同步barrier。在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,但是Phaser更加灵活,而且侧重于“重用”。

一、简述

1、注册机制:与其他barrier不同的是,Phaser中的“注册的同步者(parties)”会随时间而变化,Phaser可以通过构造器初始化parties个数,也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties。运行时可以随时加入、注销parties,只会影响Phaser内部的计数器,它建立任何内部的bookkeeping(账本),因此task不能查询自己是否已经注册了,当然你可以通过实现子类来达成这一设计要求。

Java代码

//伪代码 Phaserphaser=newPhaser(); phaser.register();//partiescount:1 .... phaser.arriveAndDeregister()://count:0; ....

此外,CyclicBarrier、CountDownLatch需要在初始化的构造函数中指定同步者的个数,且运行时无法再次调整。

Java代码

CountDownLatchcountDownLatch=newCountDownLatch(12); //countderegisterpartiesafterall //partiescountis12allthetimes //ifyouwantchangethenumberofparties,youshouldcreateanewinstance. CyclicBarriercyclicBarrier=newCyclicBarrier(12);

2、同步机制:类似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果类似于CyclicBarrier的await()。Phaser的每个周期(generation)都有一个phase数字,phase 从0开始,当所有的已注册的parties都到达后(arrive)将会导致此phase数字自增(advance),当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既可以标记和控制parties的wait行为、唤醒等待的时机。

1)Arrival:Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞(block),但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。

2)Waiting:Phaser中的awaitAdvance()方法,需要指定一个phase数字,表示此Thread阻塞直到phase推进到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期开始(或者当前phase结束)。不像CyclicBarrier,即使等待Thread已经interrupted,awaitAdvance方法会继续等待。Phaser提供了Interruptible和Timeout的阻塞机制,不过当线程Interrupted或者timeout之后将会抛出异常,而不会修改Phaser的内部状态。如果必要的话,你可以在遇到此类异常时,进行相应的恢复操作,通常是在调用forceTermination()方法之后。

Phaser通常在ForkJoinPool中执行tasks,它可以在有task阻塞等待advance时,确保其他tasks的充分并行能力。

3、中断(终止):Phaser可以进入Termination状态,可以通过isTermination()方法判断;当Phaser被终止后,所有的同步方法将会立即返回(解除阻塞),不需要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。当然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即所有的parties都会被deregister,即register个数为0。

4、Tiering(分层):Phaser可以“分层”,以tree的方式构建Phaser来降低“竞争”。如果一个Phaser中有大量parties,这会导致严重的同步竞争,所以我们可以将它们分组并共享一个parent Phaser,这样可以提高吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。当Child Phaser中中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;当Child Phaser中的parties变为0时(比如由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

5、监控:同步的方法只会被register操作调用,对于当前state的监控方法可以在任何时候调用,比如getRegisteredParties()获取已经注册的parties个数,getPhase()获取当前phase周期数等;因为这些方法并非同步,所以只能反映当时的瞬间状态。

二、常用的Barrier比较

1、CountDownLatch

Java代码

//创建时,就需要指定参与的parties个数 intparties=12; CountDownLatchlatch=newCountDownLatch(parties); //线程池中同步task ExecutorServiceexecutor=Executors.newFixedThreadPool(parties); for(inti=0;i<parties;i ){ executor.execute(newRunnable(){ @Override publicvoidrun(){ try{ //可以在任务执行开始时执行,表示所有的任务都启动后,主线程的await即可解除 //latch.countDown(); //run //.. Thread.sleep(3000); }catch(Exceptione){ } finally{ //任务执行完毕后:到达 //表示所有的任务都结束,主线程才能继续 latch.countDown(); } } }); } latch.await();//主线程阻塞,直到所有的parties到达 //latch上所有的parties都达到后,再次执行await将不会有效, //即barrier是不可重用的 executor.shutdown();

2、CyclicBarrier

Java代码

//创建时,就需要指定参与的parties个数 intparties=12; CyclicBarrierbarrier=newCyclicBarrier(parties); //线程池中同步task ExecutorServiceexecutor=Executors.newFixedThreadPool(parties); for(inti=0;i<parties;i ){ executor.execute(newRunnable(){ @Override publicvoidrun(){ try{ inti=0; while(i<3&&!barrier.isBroken()){ System.out.println("generationbegin:" i ",tid:" Thread.currentThread().getId()); Thread.sleep(3000); //如果所有的parties都到达,则开启新的一次周期(generation) //barrier可以被重用 barrier.await(); i ; } }catch(Exceptione){ e.printStackTrace(); } finally{ } } }); } Thread.sleep(100000);

3、Phaser

Java代码

//创建时,就需要指定参与的parties个数 intparties=12; //可以在创建时不指定parties //而是在运行时,随时注册和注销新的parties Phaserphaser=newPhaser(); //主线程先注册一个 //对应下文中,主线程可以等待所有的parties到达后再解除阻塞(类似与CountDownLatch) phaser.register(); ExecutorServiceexecutor=Executors.newFixedThreadPool(parties); for(inti=0;i<parties;i ){ phaser.register();//每创建一个task,我们就注册一个party executor.execute(newRunnable(){ @Override publicvoidrun(){ try{ inti=0; while(i<3&&!phaser.isTerminated()){ System.out.println("Generation:" phaser.getPhase()); Thread.sleep(3000); //等待同一周期内,其他Task到达 //然后进入新的周期,并继续同步进行 phaser.arriveAndAwaitAdvance(); i ;//我们假定,运行三个周期即可 } }catch(Exceptione){ } finally{ phaser.arriveAndDeregister(); } } }); } //主线程到达,且注销自己 //此后线程池中的线程即可开始按照周期,同步执行。 phaser.arriveAndDeregister();

三、API简述

1、Phaser():构造函数,创建一个Phaser;默认parties个数为0。此后我们可以通过register()、bulkRegister()方法来注册新的parties。每个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了所有的waiter,即因为advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每个QNode保存一个waiter的信息,比如Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只需要响应另一个Queue中的waiters即可,避免出现混乱。

2、Phaser(int parties):构造函数,初始一定数量的parties;相当于直接regsiter此数量的parties。

3、arrive():到达,阻塞,等到当前phase下其他parties到达。如果没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,如果Phaser已经终止,则返回负数。

4、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会导致Phaser内部的parties个数减一(只影响当前phase),即下一个phase需要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。

5、arriveAndAwaitAdvance():到达,且阻塞直到其他parties都到达,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞机制支持timeout、interrupted响应,可以使用类似的其他方法(参见下文)。如果你希望到达后且注销,而且阻塞等到当前phase下其他的parties到达,可以使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。

6、awaitAdvance(int phase):阻塞方法,等待phase周期数下其他所有的parties都到达。如果指定的phase与Phaser当前的phase不一致,则立即返回。

7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程如果被外部中断,则此方法立即返回,并抛出InterrutedException。

8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。

9、forceTermination():强制终止,此后Phaser对象将不可用,即register等将不再有效。此方法将会导致Queue中所有的waiter线程被唤醒。

10、register():新注册一个party,导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,如果Phaser已经中断,将会返回负数。

11、bulkRegister(int parties):批量注册多个parties数组,规则同10、。

12、getArrivedParties():获取已经到达的parties个数。

13、getPhase():获取当前phase周期数。如果Phaser已经中断,则返回负值。

14、getRegisteredParties():获取已经注册的parties个数。

15、getUnarrivedParties():获取尚未到达的parties个数。

16、onAdvance(int phase,int registeredParties):这个方法比较特殊,表示当进入下一个phase时可以进行的事件处理,如果返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),否则可以继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。

默认实现为:return registeredParties == 0;在很多情况下,开发者可以通过重写此方法,来实现自定义的advance时间处理机制。

protected boolean onAdvance(int phase, int registeredParties) { return registeredParties == 0; }

内部原理,比较简单(简述):

1)两个计数器:分别表示parties个数和当前phase。register和deregister会触发parties变更(CAS),全部parties到达(arrive)会触发phase变更。

2)一个主要的阻塞队列:非AQS实现,对于arriveAndWait的线程,会被添加到队列中并被park阻塞,直到当前phase中最后一个party到达后触发唤醒。

四、父子关系

Phaser通过status字段来实现同步逻辑,status是一个64位的long变量,它有包含了四个维度的语义:

1、第0-15位,当前未到达的parties,调用arriveXXX时,该值-1,调用register时 1;

2、第16-31位,当前总parties,调用register时 1,deRegister时-1;

3、第32-62位,phase,即Phaser的年龄,当未到达的parties减到0(即所有parties已到达)时,phase自动加1,并且把16-31位的parties数复制到0-15位,从而该Phaser可以继续复用;

java编程容器类型(Java并发编程-Phaser类的使用)(1)

当Phaser的parties数比较大的高并发场景下,Phaser的status变量的竞争会非常激烈,register、arrive等操作发起的CAS操作预测将会大概率失败导致大量CAS操作被重复调用,增加CPU开销。可以通过构造Phaser分层树的方式来分离竞争,子Phaser第一次register时,把该子Phaser注册到父Phaser,当子Phaser所有parties都已经arrive时,把它从父Phaser中反注册。

java编程容器类型(Java并发编程-Phaser类的使用)(2)

当根Phaser的所有子Phaser的parties都已经arrive时,整个Phaser树升级phase递增,通过这种方式,所有的arrive、register操作在子Phaser进行就可以,根Phaser只需负责Phaser的升级,这样可以把部分对status的访问修改分离到子Phaser中,通过分散竞争点提高Phaser的吞吐量。

java编程容器类型(Java并发编程-Phaser类的使用)(3)

五、使用案例

1、例子1(基本使用)

使用默认的onAdvance,线程自己注册party和注销party,只要registeredParties为0,phaser将会终止。

private static void test1() { Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println(Thread.currentThread().getName() ": onAdvance,registeredParties=" getRegisteredParties() ", phase=" getPhase() ", isTerminated=" isTerminated() ); return super.onAdvance(phase, registeredParties); } }; System.out.println(Thread.currentThread().getName() ": 主线程开始执行异步任务,registeredParties=" phaser.getRegisteredParties() ", phase=" phaser.getPhase() ", isTerminated=" phaser.isTerminated()); phaser.register(); for (int i = 0; i < 5; i ) { phaser.register(); System.out.println(Thread.currentThread().getName() ": 注册一个屏障,registeredParties=" phaser.getRegisteredParties() ", phase=" phaser.getPhase() ", isTerminated=" phaser.isTerminated()); int sleep = i; new Thread(() -> { try { TimeUnit.SECONDS.sleep(sleep); System.out.println(Thread.currentThread().getName() ": 到达屏障,等待其他线程," sleep ", registeredParties=" phaser.getRegisteredParties() ", phase=" phaser.getPhase() ", isTerminated=" phaser.isTerminated()); phaser.arriveAndAwaitAdvance(); TimeUnit.SECONDS.sleep(sleep); System.out.println(Thread.currentThread().getName() ": 屏障打开,开始执行剩下任务," sleep ", registeredParties=" phaser.getRegisteredParties() ", phase=" phaser.getPhase() ", isTerminated=" phaser.isTerminated()); phaser.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } },"thread-" i).start(); } phaser.arriveAndDeregister(); System.out.println(Thread.currentThread().getName() ": 主线程执行完毕,registeredParties=" phaser.getRegisteredParties() ", phase=" phaser.getPhase() ", isTerminated=" phaser.isTerminated() ); }

输出结果:

main: 主线程开始执行异步任务,registeredParties=0, phase=0, isTerminated=false main: 注册一个屏障,registeredParties=2, phase=0, isTerminated=false main: 注册一个屏障,registeredParties=3, phase=0, isTerminated=false main: 注册一个屏障,registeredParties=4, phase=0, isTerminated=false main: 注册一个屏障,registeredParties=5, phase=0, isTerminated=false main: 注册一个屏障,registeredParties=6, phase=0, isTerminated=false main: 主线程执行完毕,registeredParties=5, phase=0, isTerminated=false thread-0: 到达屏障,等待其他线程,0, registeredParties=4, phase=0, isTerminated=false thread-1: 到达屏障,等待其他线程,1, registeredParties=5, phase=0, isTerminated=false thread-2: 到达屏障,等待其他线程,2, registeredParties=5, phase=0, isTerminated=false thread-3: 到达屏障,等待其他线程,3, registeredParties=5, phase=0, isTerminated=false thread-4: 到达屏障,等待其他线程,4, registeredParties=5, phase=0, isTerminated=false thread-4: onAdvance,registeredParties=5, phase=0, isTerminated=false thread-0: 屏障打开,开始执行剩下任务,0, registeredParties=5, phase=1, isTerminated=false thread-1: 屏障打开,开始执行剩下任务,1, registeredParties=4, phase=1, isTerminated=false thread-2: 屏障打开,开始执行剩下任务,2, registeredParties=3, phase=1, isTerminated=false thread-3: 屏障打开,开始执行剩下任务,3, registeredParties=2, phase=1, isTerminated=false thread-4: 屏障打开,开始执行剩下任务,4, registeredParties=1, phase=1, isTerminated=false thread-4: onAdvance,registeredParties=0, phase=1, isTerminated=false

2、例子2(自行控制每个阶段)

Phaser这个类的使用场景为N个线程分阶段并行的问题。有这么一个任务为“做3道题“,每个学生一个进程,5个学生可以并行做,这个就是常规的并发,但是如果加一个额外的 限制条件,必须等所有人都做完了第一题,才能开始做第二题,必须等所有人都做完了第二题,才能做第三题,这个问题就转变成了分阶段并发的问题,最适合用Phaser来解题。

private static void test2() { MyPhaser phaser = new MyPhaser(); StudentTask[] studentTask = new StudentTask[5]; for (int i = 0; i < studentTask.length; i ) { studentTask[i] = new StudentTask(phaser); phaser.register(); //注册一次表示phaser维护的线程个数 } Thread[] threads = new Thread[studentTask.length]; for (int i = 0; i < studentTask.length; i ) { threads[i] = new Thread(studentTask[i], "Student" i); threads[i].start(); } //等待所有线程执行结束 for (int i = 0; i < studentTask.length; i ) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Phaser has finished:" phaser.isTerminated()); } static class MyPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { //在每个阶段执行完成后回调的方法 switch (phase) { case 0: return studentArrived(); case 1: return finishFirstExercise(); case 2: return finishSecondExercise(); case 3: return finishExam(); default: return true; } } private boolean studentArrived(){ System.out.println("学生准备好了,学生人数:" getRegisteredParties()); return false; } private boolean finishFirstExercise(){ System.out.println("第一题所有学生做完"); return false; } private boolean finishSecondExercise(){ System.out.println("第二题所有学生做完"); return false; } private boolean finishExam(){ System.out.println("第三题所有学生做完,结束考试"); return true; } } static class StudentTask implements Runnable { private Phaser phaser; public StudentTask(Phaser phaser) { this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName() " 到达考试"); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() " 做第1题开始..."); doExercise1(); System.out.println(Thread.currentThread().getName() " 做第1题完成..."); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() " 做第2题开始..."); doExercise2(); System.out.println(Thread.currentThread().getName() " 做第2题完成..."); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() " 做第3题开始..."); doExercise3(); System.out.println(Thread.currentThread().getName() " 做第3题完成..."); phaser.arriveAndAwaitAdvance(); } private void doExercise1() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise2() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise3() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } }

输出结果:

Student1 到达考试 Student0 到达考试 Student3 到达考试 Student2 到达考试 Student4 到达考试 学生准备好了,学生人数:5 Student4 做第1题开始... Student3 做第1题开始... Student1 做第1题开始... Student2 做第1题开始... Student0 做第1题开始... Student1 做第1题完成... Student2 做第1题完成... Student4 做第1题完成... Student0 做第1题完成... Student3 做第1题完成... 第一题所有学生做完 Student4 做第2题开始... Student1 做第2题开始... Student2 做第2题开始... Student3 做第2题开始... Student0 做第2题开始... Student1 做第2题完成... Student3 做第2题完成... Student2 做第2题完成... Student0 做第2题完成... Student4 做第2题完成... 第二题所有学生做完 Student4 做第3题开始... Student2 做第3题开始... Student3 做第3题开始... Student1 做第3题开始... Student0 做第3题开始... Student4 做第3题完成... Student3 做第3题完成... Student2 做第3题完成... Student1 做第3题完成... Student0 做第3题完成... 第三题所有学生做完,结束考试 Phaser has finished:true

3、例子3(最多进行到3个phase就会终止)

private static void test3() { int iterations = 2; final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (int i=0;i<5;i ) { phaser.register(); new Thread() { public void run() { do { System.out.println(Thread.currentThread().getName() " start."); int phase = phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() " end. phase:" phase); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }

输出结果:

Thread-0 start. Thread-1 start. Thread-3 start. Thread-4 start. Thread-2 start. Thread-4 end. phase:1 Thread-0 end. phase:1 Thread-3 end. phase:1 Thread-1 end. phase:1 Thread-1 start. Thread-2 end. phase:1 Thread-3 start. Thread-0 start. Thread-4 start. Thread-2 start. Thread-2 end. phase:2 Thread-2 start. Thread-1 end. phase:2 Thread-1 start. Thread-0 end. phase:2 Thread-0 start. Thread-4 end. phase:2 Thread-4 start. Thread-3 end. phase:2 Thread-3 start. Thread-3 end. phase:3 Thread-4 end. phase:-2147483645 Thread-0 end. phase:-2147483645 Thread-2 end. phase:-2147483645 Thread-1 end. phase:-2147483645

4、例子4(phaser的父子关系)

private static void test4() { final Phaser parent = new Phaser(); int phaserParty1 = 5; int phaserParty2 = 4; // 在Phaser的构造函数中,parent 将会调用doRegister进行注册(phase = parent.doRegister(1)) final Phaser phaser1 = new Phaser(parent,phaserParty1); final Phaser phaser2 = new Phaser(parent,phaserParty2); System.out.println("tasks start"); for(int i = 0;i<phaserParty1;i ){ final int index = i; new Thread("ThreadA-" i){ @Override public void run() { System.out.println(Thread.currentThread().getName() ", index:" index ); phaser1.arriveAndAwaitAdvance(); } }.start(); } for(int i = 0;i<phaserParty2;i ){ final int index = i; new Thread("ThreadB-" i){ @Override public void run() { System.out.println(Thread.currentThread().getName() ", index:" index ); phaser2.arriveAndAwaitAdvance(); } }.start(); } System.out.println("wait for all tasks to be finished. phase:" parent.getPhase() ", RegisteredParties:" parent.getRegisteredParties()); int phase = parent.awaitAdvance(parent.getPhase()); System.out.println("tasks done. phase:" phase); }

输出结果:

tasks start ThreadA-2, index:2 ThreadA-1, index:1 ThreadA-4, index:4 ThreadA-3, index:3 wait for all tasks to be finished. phase:0, RegisteredParties:2 ThreadA-0, index:0 ThreadB-2, index:2 ThreadB-3, index:3 ThreadB-0, index:0 ThreadB-1, index:1 tasks done. phase:1

-----------------------------------------------------------

转载自:

https://www.iteye.com/blog/shift-alt-ctrl-2302923

https://www.cnblogs.com/huangzifu/p/7612547.html

https://baijiahao.baidu.com/s?id=1642041314065933482&wfr=spider&for=pc

,