news 2026/5/1 8:57:00

【Java线程安全实战】⑧ 阶段同步的艺术:Phaser 与 Condition 的高阶玩法

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【Java线程安全实战】⑧ 阶段同步的艺术:Phaser 与 Condition 的高阶玩法

📖目录

  • 1. 为什么需要Phaser和Condition?
  • 2. Phaser:动态阶段同步的智能调度系统
    • 2.1 Phaser的核心概念
    • 2.2 Phaser与CyclicBarrier的对比
    • 2.3 Phaser的典型应用场景
  • 3. Condition:线程的"个人等待区"
    • 3.1 Condition的核心概念
    • 3.2 Condition与wait/notify的对比
  • 4. 实战示例
    • 示例1:Phaser基础用法(模拟快递分拣流程)
    • 示例2:Phaser动态添加线程(快递中心临时增派人手)
    • 示例3:Condition实现生产者-消费者模型(快递仓库的进销存)
    • 示例4:Phaser与Condition结合使用(快递中心的多阶段运营)
  • 5. Phaser与Condition的深度对比
  • 6. 最佳实践与常见陷阱
    • 6.1 Phaser最佳实践
    • 6.2 Condition最佳实践
    • 6.3 常见陷阱
  • 7. 结语
  • 8. 经典书籍推荐
  • 9. 参考链接
  • 10. 往期博客回顾

1. 为什么需要Phaser和Condition?

在多线程编程中,我们常常需要控制多个线程的执行顺序和同步点。想象一下你是一个快递分拣中心的经理,需要管理多个分拣员(线程)在不同阶段(分拣、打包、装车)的工作。传统的同步工具就像一个固定的计时器,一旦设定就无法更改。而Phaser和Condition则像是一个智能调度系统,可以根据实际情况动态调整。

  • Phaser:就像一个智能分拣调度系统,可以动态添加/移除分拣员,每个分拣员可以在不同阶段等待。
  • Condition:就像一个分拣员的个人等待区,每个分拣员可以等待特定条件(如"包裹数量达到10个")。

2. Phaser:动态阶段同步的智能调度系统

Phaser是JDK 7引入的并发工具,用于管理多个线程在不同阶段的同步。相比CyclicBarrier,Phaser更加灵活,可以动态添加和移除线程,可以设置阶段。

2.1 Phaser的核心概念

  • 阶段(Phase):Phaser将整个任务分为多个阶段,每个阶段有特定的同步点。
  • 注册(Register):线程需要注册到Phaser,才能参与同步。
  • 到达(Arrive):线程到达同步点。
  • 等待(Await):线程等待其他线程到达同步点。
  • 终止(Termination):当所有线程都到达并离开,Phaser进入下一个阶段。

2.2 Phaser与CyclicBarrier的对比

特性CyclicBarrierPhaser
阶段管理固定阶段动态阶段
线程注册无法动态添加可以动态添加/移除
重用性可重用可重用
灵活性

2.3 Phaser的典型应用场景

  • 多阶段任务处理(如数据处理、计算)
  • 动态调整线程池大小
  • 需要多个同步点的复杂流程

3. Condition:线程的"个人等待区"

Condition是Lock接口的一部分,用于替代Object的wait/notify机制。它提供了更细粒度的线程同步控制,允许一个Lock有多个等待队列。

3.1 Condition的核心概念

  • 等待队列(Wait Queue):每个Condition有一个独立的等待队列。
  • await():线程等待特定条件。
  • signal():唤醒一个等待的线程。
  • signalAll():唤醒所有等待的线程。

3.2 Condition与wait/notify的对比

特性wait/notifyCondition
锁关联与对象锁关联与Lock关联
等待队列一个多个
灵活性
适用场景简单同步复杂同步

4. 实战示例

示例1:Phaser基础用法(模拟快递分拣流程)

importjava.util.concurrent.Phaser;publicclassPhaserBasicExample{publicstaticvoidmain(String[]args){intparties=3;Phaserphaser=newPhaser(parties);for(inti=0;i<parties;i++){newThread(()->{System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");phaser.arriveAndAwaitAdvance();// 到达并等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 到达并等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 2 (装车完成)");}).start();}// 模拟主线程等待try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}}

执行结果:

Thread-0 is arriving at phase 0 (分拣开始) Thread-1 is arriving at phase 0 (分拣开始) Thread-2 is arriving at phase 0 (分拣开始) Thread-2 is at phase 1 (打包开始) Thread-1 is at phase 1 (打包开始) Thread-0 is at phase 1 (打包开始) Thread-0 is at phase 2 (装车完成) Thread-1 is at phase 2 (装车完成) Thread-2 is at phase 2 (装车完成)

示例2:Phaser动态添加线程(快递中心临时增派人手)

importjava.util.concurrent.Phaser;publicclassPhaserDynamicExample{publicstaticvoidmain(String[]args){Phaserphaser=newPhaser();// 启动第一个分拣员(先注册)newThread(()->{phaser.register();// 注册当前线程System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");// 等待500毫秒,确保第二个线程也注册了try{// 这里要等得足够长,故可以适当调大Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is done (装车完成)");}).start();// 等待500毫秒,让第一个线程等待try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}// 动态添加第二个分拣员(注册并启动)newThread(()->{phaser.register();// 注册当前线程System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is done (装车完成)");}).start();// 等待所有线程完成phaser.awaitAdvance(0);}}

执行结果:

Thread-0 is arriving at phase 0 (分拣开始) Thread-1 is arriving at phase 0 (分拣开始) Thread-0 is at phase 1 (打包开始) Thread-1 is at phase 1 (打包开始) Thread-0 is done (装车完成) Thread-1 is done (装车完成)

示例3:Condition实现生产者-消费者模型(快递仓库的进销存)

importjava.util.LinkedList;importjava.util.Queue;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassConditionProducerConsumerExample{privatestaticfinalintMAX_CAPACITY=5;privatestaticfinalQueue<Integer>warehouse=newLinkedList<>();privatestaticfinalLocklock=newReentrantLock();privatestaticfinalConditionnotFull=lock.newCondition();privatestaticfinalConditionnotEmpty=lock.newCondition();publicstaticvoidmain(String[]args){newThread(()->{for(inti=0;i<10;i++){produce(i);}},"快递员-生产").start();newThread(()->{for(inti=0;i<10;i++){consume();}},"快递员-配送").start();}privatestaticvoidproduce(intitem){lock.lock();try{while(warehouse.size()==MAX_CAPACITY){System.out.println("仓库已满,等待空间...");notFull.await();// 等待仓库有空位}warehouse.offer(item);System.out.println("【生产】入库: 包裹ID "+item);notEmpty.signal();// 通知配送员有新包裹}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{lock.unlock();}}privatestaticvoidconsume(){lock.lock();try{while(warehouse.isEmpty()){System.out.println("仓库为空,等待新包裹...");notEmpty.await();// 等待仓库有包裹}Integeritem=warehouse.poll();System.out.println("【配送】出库: 包裹ID "+item);notFull.signal();// 通知生产者可以继续生产}catch(InterruptedExceptione){thrownewRuntimeException(e);}finally{lock.unlock();}}}

执行结果:

【生产】入库: 包裹ID 0 【配送】出库: 包裹ID 0 仓库为空,等待新包裹... 【生产】入库: 包裹ID 1 【生产】入库: 包裹ID 2 【生产】入库: 包裹ID 3 【生产】入库: 包裹ID 4 【生产】入库: 包裹ID 5 仓库已满,等待空间... 【配送】出库: 包裹ID 1 【配送】出库: 包裹ID 2 【配送】出库: 包裹ID 3 【配送】出库: 包裹ID 4 【配送】出库: 包裹ID 5 仓库为空,等待新包裹... 【生产】入库: 包裹ID 6 【生产】入库: 包裹ID 7 【生产】入库: 包裹ID 8 【生产】入库: 包裹ID 9 【配送】出库: 包裹ID 6 【配送】出库: 包裹ID 7 【配送】出库: 包裹ID 8 【配送】出库: 包裹ID 9

示例4:Phaser与Condition结合使用(快递中心的多阶段运营)

importjava.util.concurrent.Phaser;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassCorrectPhaserConditionExample{privatestaticfinalintSTAGES=3;privatestaticfinalintWORKERS=2;privatestaticfinalPhaserphaser=newPhaser(WORKERS);privatestaticfinalLocklock=newReentrantLock();privatestaticfinalConditioncondition=lock.newCondition();privatestaticvolatilebooleanisApproved=false;publicstaticvoidmain(String[]args){// 启动工作线程for(inti=1;i<=WORKERS;i++){intfinalI=i;newThread(()->workerTask(finalI),"快递员-"+i).start();}// 独立协调线程(关键改进!)newThread(()->{for(intstage=0;stage<STAGES;stage++){System.out.println("\n【协调线程】等待所有快递员完成阶段 "+(stage+1)+"...");phaser.awaitAdvance(stage);// 等待所有工作线程完成Phaser同步System.out.println("【协调线程】阶段 "+(stage+1)+" 完成,质检通过!");lock.lock();try{isApproved=true;condition.signalAll();// 安全唤醒:此时工作线程已进入await()}finally{lock.unlock();}try{Thread.sleep(500);// 模拟质检时间}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}System.out.println("\n【协调线程】全部流程结束!");},"协调线程").start();}privatestaticvoidworkerTask(intid){for(intstage=0;stage<STAGES;stage++){System.out.println("快递员-"+id+" 准备进入阶段 "+(stage+1));// 1. 先完成Phaser同步(等待所有线程到达)phaser.arriveAndAwaitAdvance();System.out.println("快递员-"+id+" 已到达阶段 "+(stage+1)+",开始工作");// 2. 进入Condition等待(关键:确保已进入等待状态)lock.lock();try{// 必须用while循环避免虚假唤醒while(!isApproved){System.out.println("快递员-"+id+" 在阶段 "+(stage+1)+" 等待质检放行...");condition.await();}System.out.println("快递员-"+id+" 获得放行,进入下一阶段!");}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{lock.unlock();}}System.out.println("快递员-"+id+" 所有阶段完成!");}}

执行结果:

快递员-1 准备进入阶段 1 快递员-2 准备进入阶段 1 快递员-1 已到达阶段 1,开始工作 快递员-2 已到达阶段 1,开始工作 快递员-1 在阶段 1 等待质检放行... 快递员-2 在阶段 1 等待质检放行... 【协调线程】等待所有快递员完成阶段 1... 【协调线程】阶段 1 完成,质检通过! 快递员-1 获得放行,进入下一阶段! 快递员-1 准备进入阶段 2 快递员-2 获得放行,进入下一阶段! 快递员-2 准备进入阶段 2 快递员-2 已到达阶段 2,开始工作 快递员-2 获得放行,进入下一阶段! 快递员-2 准备进入阶段 3 快递员-1 已到达阶段 2,开始工作 快递员-1 获得放行,进入下一阶段! 快递员-1 准备进入阶段 3 快递员-1 已到达阶段 3,开始工作 快递员-1 获得放行,进入下一阶段! 快递员-1 所有阶段完成! 快递员-2 已到达阶段 3,开始工作 快递员-2 获得放行,进入下一阶段! 快递员-2 所有阶段完成! 【协调线程】等待所有快递员完成阶段 2... 【协调线程】阶段 2 完成,质检通过! 【协调线程】等待所有快递员完成阶段 3... 【协调线程】阶段 3 完成,质检通过! 【协调线程】全部流程结束!

5. Phaser与Condition的深度对比

特性PhaserCondition
核心用途多阶段同步细粒度等待/唤醒
同步方式基于阶段基于条件
线程管理动态添加/移除与Lock关联
适用场景多阶段任务流程生产者-消费者、等待特定条件
代码复杂度中高
性能

6. 最佳实践与常见陷阱

6.1 Phaser最佳实践

  1. 合理设置阶段:不要设置过多阶段,避免过度同步。
  2. 动态注册:在需要时动态注册线程,避免固定数量限制。
  3. 使用arriveAndDeregister:当线程不需要参与后续阶段时,使用arriveAndDeregister来移除。

6.2 Condition最佳实践

  1. 始终使用Lock:Condition必须与Lock一起使用。
  2. 使用while循环:在await()前使用while循环检查条件,避免虚假唤醒。
  3. 信号通知:使用signal()signalAll(),根据需求选择。

6.3 常见陷阱

  1. Phaser阶段计数错误:忘记正确注册线程,导致线程一直等待。
  2. Condition虚假唤醒:没有使用while循环检查条件,导致线程在条件不满足时继续执行。
  3. 死锁:在使用Condition时,没有正确管理Lock的获取和释放。

7. 结语

Phaser和Condition是Java并发编程中非常强大的工具,它们提供了比传统同步机制更灵活、更强大的控制能力。理解并正确使用它们,可以让你在处理复杂的多线程问题时更加得心应手。

通过本文的四个示例,你已经掌握了Phaser和Condition的基本用法和高阶玩法。在实际项目中,根据具体需求选择合适的同步机制,是提升系统性能和可维护性的关键。


8. 经典书籍推荐

《Java并发编程实战》(Java Concurrency in Practice)
作者:Brian Goetz 等
地位:Java并发领域的"圣经"
价值:不仅详细讲解了Phaser、Condition等核心组件,更重要的是传授了一套完整的并发设计思想和最佳实践。尽管出版较早,但其核心原理至今仍是金科玉律,是每一位Java工程师的必读书目。

《Java并发编程的艺术》
作者:方腾飞, 魏鹏, 程晓明
地位:国内优秀并发编程著作
价值:对Phaser、Condition、ReentrantLock等JUC组件的源码剖析极为深入,非常适合希望从源码层面理解Java并发机制的读者。


9. 参考链接

  1. Phaser API文档
  2. Condition API文档

10. 往期博客回顾

  • 【Java线程安全实战】① 从ArrayList并发翻车说起:2025年主流线程安全集合全景图解
  • 【Java线程安全实战】② ConcurrentHashMap 源码深度拆解:如何做到高性能并发?
  • 【Java线程安全实战】③ ThreadLocal 源码深度拆解:如何做到线程隔离?
  • 【Java线程安全实战】④ 可重入锁ReentrantLock深度拆解:如何实现线程安全的同步?
  • 【Java线程安全实战】⑤ 原子类(Atomic)深度解析:无锁编程(Lock-Free)的终极奥义
  • 【Java线程安全实战】⑥ 秒级达百万高并发框架-Disruptor:揭秘LMAX的“快递小哥“如何让系统跑得更快
  • 【Java线程安全实战】⑦ 线程间协作的艺术:CountDownLatch 与 CyclicBarrier 深度剖析
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:18:02

JLink驱动安装后USB通信超时的完整示例分析

JLink驱动安装后USB通信超时&#xff1f;一文搞懂底层机制与实战排查 你有没有遇到过这样的场景&#xff1a;J-Link插上电脑&#xff0c;设备管理器里“通用串行总线控制器”中赫然显示着“J-Link”&#xff0c;但Keil点下载却弹出“Connection timed out”&#xff1b;或者J-…

作者头像 李华
网站建设 2026/4/12 3:32:03

Matlab实现GNMF测试阶段投影:将新数据映射到低维表示

在实际应用非负矩阵分解(NMF)或图正则化非负矩阵分解(GNMF)时,我们通常会先在训练集上学习基矩阵U,然后面对新来的测试数据时,需要快速得到其在同一低维空间中的表示V。这就是out-of-sample或测试阶段投影问题。 标准的NMF在测试阶段可以通过简单的非负最小二乘求解,但…

作者头像 李华
网站建设 2026/5/1 7:25:06

一文说清Proteus基础操作:适合初学者的通俗解释

当然&#xff0c;请将您希望我润色优化的博文内容发送给我&#xff0c;我会根据上述详细指南对其进行深度重构与提升&#xff0c;确保最终输出为一篇自然流畅、专业深入、毫无AI痕迹的技术佳作。

作者头像 李华
网站建设 2026/5/1 7:17:28

03-MongoDB高级运维

03-MongoDB高级运维 1、MongoDB常见架构 MongoDB 有三种常用架构,分别为单机版、副本集(Replica Set)和分片(Sharding) 2、分片集群机制及原理 2.1 为什么使用分片集群 数据容量日益增大,访问性能日渐降低,怎么破? 新品上线异常火爆,如何支撑更多的并发用户? 单库…

作者头像 李华
网站建设 2026/4/28 8:57:39

AD导出Gerber文件在量产交付中的注意事项(项目应用)

AD导出Gerber文件在量产交付中的实战避坑指南你有没有遇到过这样的情况&#xff1a;PCB设计反复修改、熬夜调线&#xff0c;好不容易通过DRC&#xff0c;信心满满地把Gerber发给工厂&#xff0c;结果一周后收到回复——“阻焊开窗错了”、“钻孔偏了0.1mm”、“NPTH没输出”………

作者头像 李华
网站建设 2026/4/19 0:27:04

电机控制器半桥驱动电路:自举电路完整示例

半桥驱动中的自举电路&#xff1a;从原理到实战的完整解析在设计电机控制器时&#xff0c;工程师常常会遇到一个看似简单却极为关键的问题&#xff1a;如何让高边N沟道MOSFET正常导通&#xff1f;如果你曾调试过H桥或三相逆变器电路&#xff0c;可能经历过这样的场景——低边开…

作者头像 李华