JUC并发工具类全解析:CountDownLatch、CyclicBarrier、Semaphore 与 Phaser 实战指南
前言:为什么你需要掌握 JUC 并发工具类?
在 Java 多线程编程中,synchronized和ReentrantLock虽然能解决基本的线程同步问题,但在面对协调多个线程执行顺序、控制并发数量、实现阶段性任务同步等复杂场景时,往往显得力不从心。此时,Java 并发包(java.util.concurrent,简称 JUC)提供的高级同步工具类便成为开发者的“利器”。
CountDownLatch、CyclicBarrier、Semaphore和Phaser是 JUC 中最常用且功能强大的四大并发协调工具。它们各自针对不同协作模型设计,灵活运用可极大简化多线程逻辑、提升系统可靠性与可维护性。
然而,许多开发者对这些工具的理解仅停留在“能用”的层面,不清楚其内部原理、适用边界及潜在陷阱。本文将从核心概念、工作原理、源码简析、典型应用场景到实战代码示例,系统性地解析这四类工具,并提供可落地的调优建议与避坑指南。即使你是刚接触并发编程的实习生,也能快速掌握并应用于实际项目。
💡提示:本文基于 JDK 17 编写,所有示例均可在 JDK 8+ 环境运行。
一、JUC 并发工具类概览
| 工具类 | 核心功能 | 是否可重用 | 典型场景 |
|---|---|---|---|
CountDownLatch | 等待 N 个事件完成后再继续 | ❌ 一次性 | 启动/关闭协调、多任务结果聚合 |
CyclicBarrier | 多个线程互相等待到达屏障点 | ✅ 可重复使用 | 多阶段计算、循环任务同步 |
Semaphore | 控制同时访问特定资源的线程数量 | ✅ 可重用 | 限流、连接池、资源配额管理 |
Phaser | 更灵活的阶段式同步(支持动态注册/注销) | ✅ 可重用 | 复杂多阶段任务、动态参与者 |
📌关键区别:
CountDownLatch是一个或多个线程等待其他线程完成;CyclicBarrier是一组线程互相等待彼此完成;Semaphore不关注“谁完成”,只限制“多少能进”;Phaser是前两者的超集,支持动态调整参与线程。
二、CountDownLatch:倒计时门闩
2.1 核心概念与工作原理
CountDownLatch内部维护一个不可重置的计数器(count)。线程调用await()会阻塞,直到计数器减至 0;其他线程通过countDown()方法递减计数器。
- 构造函数:
CountDownLatch(int count)—— 初始化计数值 - 核心方法:
void await():阻塞当前线程,直到count == 0boolean await(long timeout, TimeUnit unit):带超时的等待void countDown():将计数器减 1
⚠️注意:计数器一旦归零,后续
await()将立即返回,且无法重置(不可重用)。
2.2 源码简析(JDK 17)
CountDownLatch基于AbstractQueuedSynchronizer(AQS)实现:
// 内部静态类 Sync 继承 AQSprivatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{privatevolatileintstate;// state 即 count 值Sync(intcount){setState(count);}intgetCount(){returngetState();}// tryReleaseShared:countDown() 调用protectedbooleantryReleaseShared(intreleases){for(;;){intc=getState();if(c==0)returnfalse;intnextc=c-1;if(compareAndSetState(c,nextc))returnnextc==0;// 归零时唤醒所有等待线程}}// tryAcquireShared:await() 调用protectedlongtryAcquireShared(intacquires){return(getState()==0)?1:-1;}}countDown()→ 调用releaseShared(1)→ 触发tryReleaseSharedawait()→ 调用acquireSharedInterruptibly(1)→ 触发tryAcquireShared
🔍技术细节:使用 CAS + 自旋保证线程安全,归零时通过
doReleaseShared()唤醒所有等待线程。
2.3 实战案例:多任务结果聚合
场景:启动 5 个线程分别计算数据分片,主线程等待所有结果返回后汇总。
importjava.util.concurrent.*;importjava.util.stream.IntStream;publicclassCountDownLatchExample{publicstaticvoidmain(String[]args)throwsInterruptedException{inttaskCount=5;CountDownLatchlatch=newCountDownLatch(taskCount);ExecutorServiceexecutor=Executors.newFixedThreadPool(taskCount);ConcurrentMap<Integer,Integer>results=newConcurrentHashMap<>();IntStream.range(0,taskCount).forEach(i->{executor.submit(()->{try{// 模拟耗时计算intresult=i*i;results.put(i,result);System.out.println("Task "+i+" completed with result: "+result);}finally{latch.countDown();// 必须放在 finally 中!}});});latch.await();// 主线程阻塞等待System.out.println("All tasks completed. Results: "+results);executor.shutdown();}}✅最佳实践:
countDown()务必放在finally块中,防止异常导致计数器未减,主线程永久阻塞。- 若需超时控制,使用
await(timeout, unit)避免无限等待。
2.4 常见误区
- 误用为 CyclicBarrier:试图在归零后重用
CountDownLatch(会失败) - 计数器初始值错误:如应为 5 却设为 4,导致提前唤醒
- 未处理中断:
await()可被中断,需捕获InterruptedException
三、CyclicBarrier:循环屏障
3.1 核心概念与工作原理
CyclicBarrier允许一组线程互相等待,直到所有线程都到达某个屏障点(barrier point),然后一起继续执行。与CountDownLatch不同,它是可重用的。
- 构造函数:
CyclicBarrier(int parties)CyclicBarrier(int parties, Runnable barrierAction)—— 所有线程到达后、释放前执行的动作
- 核心方法:
int await():等待其他线程,返回当前线程的到达序号(0 ~ parties-1)int await(long timeout, TimeUnit unit):带超时版本
🔄可重用性:一轮完成后,自动重置计数器,可进行下一轮同步。
3.2 源码简析(简化版)
CyclicBarrier内部使用ReentrantLock+Condition实现:
// 关键字段privatefinalReentrantLocklock=newReentrantLock();privatefinalConditiontrip=lock.newCondition();privateintparties;// 总参与线程数privateintcount;// 当前未到达线程数privatefinalRunnablebarrierCommand;// 屏障动作// await() 核心逻辑privateintdowait(booleantimed,longnanos)throws...{finalReentrantLocklock=this.lock;lock.lock();try{// 检查是否已破损(broken)if(generation.broken)thrownewBrokenBarrierException();intindex=--count;if(index==0){// 最后一个线程到达booleanranAction=false;try{finalRunnablecommand=barrierCommand;if(command!=null)command.run();// 执行屏障动作ranAction=true;nextGeneration();// 重置,开启新周期trip.signalAll();// 唤醒所有等待线程return0;}finally{if(!ranAction)breakBarrier();// 异常则标记破损}}else{// 非最后一个线程,等待for(;;){try{if(!timed)trip.await();elseif(nanos>0L)nanos=trip.awaitNanos(nanos);}catch(InterruptedExceptionie){if(generation==g)breakBarrier();// 中断则破损throwie;}if(g.broken)thrownewBrokenBarrierException();if(g!=generation)returnindex;// 新周期,正常返回if(timed&&nanos<=0L){breakBarrier();// 超时则破损thrownewTimeoutException();}}}}finally{lock.unlock();}}🔍关键机制:
- 使用
generation对象标识当前周期,避免“假唤醒”干扰- 任一线程中断/超时/异常,屏障将破损(broken),后续
await()抛出BrokenBarrierException
3.3 实战案例:多阶段模拟计算
场景:5 个线程模拟多轮游戏,每轮必须等所有玩家行动完毕才能进入下一轮。
importjava.util.concurrent.*;publicclassCyclicBarrierExample{publicstaticvoidmain(String[]args){intplayerCount=5;CyclicBarrierbarrier=newCyclicBarrier(playerCount,()->System.out.println(">>> All players finished round "+(round.get())+", starting next round..."));AtomicIntegerround=newAtomicInteger(1);ExecutorServiceexecutor=Executors.newFixedThreadPool(playerCount);for(inti=0;i<playerCount;i++){finalintplayerId=i;executor.submit(()->{try{for(intr=1;r<=3;r++){System.out.println("Player "+playerId+" is playing round "+r);Thread.sleep((long)(Math.random()*1000));// 模拟随机耗时barrier.await();// 等待其他玩家round.set(r+1);}}catch(InterruptedException|BrokenBarrierExceptione){Thread.currentThread().interrupt();System.err.println("Player "+playerId+" interrupted or barrier broken");}});}executor.shutdown();}}✅优势:天然支持多轮同步,无需重新创建对象。
3.4 与 CountDownLatch 的对比
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 方向 | 单向(等待完成) | 双向(互相等待) |
| 重用性 | ❌ 一次性 | ✅ 可循环 |
| 屏障动作 | 不支持 | ✅ 支持(最后一个线程执行) |
| 异常处理 | 无特殊机制 | 破损机制(BrokenBarrierException) |
四、Semaphore:信号量
4.1 核心概念与工作原理
Semaphore用于控制同时访问某一资源的线程数量,本质是一个计数信号量。
- 许可(Permit):代表一个“通行证”,初始许可数由构造函数指定
- 核心方法:
void acquire():获取一个许可(若无可用则阻塞)void release():释放一个许可boolean tryAcquire():尝试获取,不阻塞int availablePermits():查询当前可用许可数
🔒公平性:可通过
Semaphore(int permits, boolean fair)指定是否公平(FIFO)
4.2 源码简析
同样基于 AQS 实现,但使用共享模式:
// Sync 继承 AQSprotectedinttryAcquireShared(intacquires){for(;;){intavailable=getState();intremaining=available-acquires;if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}protectedbooleantryReleaseShared(intreleases){for(;;){intcurrent=getState();intnext=current+releases;if(next<current)thrownewError("Maximum permit count exceeded");if(compareAndSetState(current,next))returntrue;}}acquire()→doAcquireSharedInterruptibly()→ 自旋 + 阻塞队列release()→doReleaseShared()→ 唤醒等待线程
4.3 实战案例:数据库连接池限流
场景:模拟一个最多支持 3 个并发连接的数据库连接池。
importjava.util.concurrent.*;publicclassSemaphoreExample{privatestaticfinalSemaphoresemaphore=newSemaphore(3,true);// 公平信号量publicstaticvoidmain(String[]args){ExecutorServiceexecutor=Executors.newFixedThreadPool(10);for(inti=0;i<10;i++){finalinttaskId=i;executor.submit(()->{try{semaphore.acquire();// 获取连接System.out.println("Task "+taskId+" acquired DB connection. Available: "+semaphore.availablePermits());Thread.sleep(2000);// 模拟数据库操作}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{semaphore.release();// 释放连接System.out.println("Task "+taskId+" released DB connection. Available: "+semaphore.availablePermits());}});}executor.shutdown();}}✅扩展应用:
- 限流网关:控制每秒请求数
- 资源配额:限制 CPU/内存密集型任务并发数
- 生产者-消费者:替代
BlockingQueue实现缓冲区满/空控制
五、Phaser:灵活的阶段同步器
5.1 核心概念与优势
Phaser是CountDownLatch和CyclicBarrier的超集,支持:
动态注册/注销参与者
分层树形结构(适用于大规模并发)
每个阶段可自定义行为
核心方法:
register()/bulkRegister(int parties):动态增加参与者arrive()/arriveAndAwaitAdvance():到达并等待下一阶段onAdvance(int phase, int registeredParties):重写以自定义阶段结束逻辑
5.2 实战案例:动态任务阶段同步
场景:主任务启动若干子任务,中途可能新增任务,所有任务必须完成当前阶段才能进入下一阶段。
importjava.util.concurrent.Phaser;publicclassPhaserExample{staticclassTaskimplementsRunnable{privatefinalPhaserphaser;privatefinalStringname;Task(Phaserphaser,Stringname){this.phaser=phaser.register();// 注册并获取引用this.name=name;}@Overridepublicvoidrun(){for(intphase=0;phase<3;phase++){System.out.println(name+" working on phase "+phase);try{Thread.sleep((long)(Math.random()*1000));}catch(InterruptedExceptione){Thread.currentThread().interrupt();}phaser.arriveAndAwaitAdvance();// 等待本阶段所有任务完成}phaser.arriveAndDeregister();// 完成后注销}}publicstaticvoidmain(String[]args){Phaserphaser=newPhaser(){@OverrideprotectedbooleanonAdvance(intphase,intregisteredParties){System.out.println(">>> Phase "+phase+" completed. Parties: "+registeredParties);returnphase>=2;// 第2阶段后终止}};// 启动初始任务newThread(newTask(phaser,"Worker-1")).start();newThread(newTask(phaser,"Worker-2")).start();// 模拟中途新增任务phaser.bulkRegister(1);newThread(newTask(phaser,"Worker-3")).start();// 等待所有任务完成while(!phaser.isTerminated()){Thread.yield();}System.out.println("All phases completed.");}}✅适用场景:
- 大规模并行计算(如 MapReduce)
- 动态工作流引擎
- 游戏帧同步(每帧为一个阶段)
六、选型指南与避坑总结
6.1 如何选择合适的工具?
| 需求 | 推荐工具 |
|---|---|
| 等待 N 个独立任务完成 | CountDownLatch |
| 多线程循环同步(固定参与者) | CyclicBarrier |
| 控制并发访问数量 | Semaphore |
| 动态参与者 + 多阶段同步 | Phaser |
6.2 常见陷阱与最佳实践
| 工具 | 陷阱 | 建议 |
|---|---|---|
CountDownLatch | 忘记countDown()导致死锁 | 放在finally块 |
CyclicBarrier | 未处理BrokenBarrierException | 捕获异常并优雅降级 |
Semaphore | 忘记release()导致许可泄漏 | 使用 try-with-resources 封装 |
Phaser | 未正确注销导致阶段无法结束 | 任务结束调用deregister() |
⚠️通用原则:
- 所有阻塞方法(
await,acquire)都可能被中断,需处理InterruptedException- 避免在屏障动作(barrier action)中执行耗时操作,否则阻塞所有线程
- 生产环境务必设置超时,防止永久阻塞
七、FAQ 与扩展阅读
Q1:CountDownLatch 能替代 CyclicBarrier 吗?
A:不能。前者是单向等待,后者是互相等待且可重用。
Q2:Semaphore 的许可数可以超过初始值吗?
A:可以。release()无限制,但通常应与acquire()成对使用。
Q3:Phaser 的性能如何?
A:对于小规模任务(<100线程),性能接近 CyclicBarrier;大规模时因树形结构更优。
扩展阅读:
- 《Java并发编程实战》(Brian Goetz)— 第5章
- JDK 官方文档:java.util.concurrent
- Doug Lea 论文:The java.util.concurrent Synchronizer Framework
结语:掌握并发工具,构建高可靠系统
CountDownLatch、CyclicBarrier、Semaphore和Phaser是 Java 并发编程的“瑞士军刀”。理解其设计思想与适用场景,能让你在面对复杂同步需求时游刃有余。记住:工具本身不难,难的是在正确的时间使用正确的工具。
行动建议:
- 在你的项目中识别是否存在“手动 wait/notify”可被替换的场景
- 用
Semaphore重构现有连接池或限流逻辑- 尝试用
Phaser实现一个多阶段数据处理 pipeline
欢迎点赞、收藏、评论交流!
关注我,获取更多 Java 并发与性能优化深度内容!