文章目录
- 并发系列(一):深入理解信号量(含 Redis 分布式信号量)
- 一、信号量是什么?
- 二、信号量的典型使用场景
- 1. 控制并发访问数量
- 2. 限制资源(连接、对象)的最大使用数量
- 3. 实现简单对象池 / 连接池控制
- 4. 实现“并发维度”的限流
- 5. 模拟现实世界资源:车位 / 座位 / 号牌
- 三、本地信号量工具与代码示例(Java)
- 3.1 `Semaphore`:标准计数信号量
- 示例:限制任务的同时执行数量
- 3.2 `CountDownLatch`:一次性“倒计时信号量”
- 示例:主线程等待多个子任务执行完毕
- 3.3 `CyclicBarrier`:可循环使用的同步栅栏
- 示例:多个线程分阶段同步执行
- 四、Redis 信号量:分布式场景下的并发控制
- 4.1 Redis 信号量是什么?
- 4.2 Redis 信号量的特别之处
- 4.3 Redis 信号量的典型使用场景
- 五、基于 Redisson 的 Redis 分布式信号量代码示例
- 5.1 Redisson 配置示例
- 5.2 使用 `RSemaphore` 实现接口级并发限流
- 六、总结
往期技术资料分享速通车
并发系列(一):深入理解信号量(含 Redis 分布式信号量)
一、信号量是什么?
在并发编程中,信号量(Semaphore)是一种非常经典的同步原语,用于控制同时访问某个共享资源的线程数量。
可以把信号量想象成一个“带计数器的锁”:
- 内部维护一个计数器(许可数,permits);
- 每个线程在访问受保护资源前,必须先从信号量中“申请一个许可”;
- 如果还有剩余许可,计数器减 1,线程可以继续执行;
- 如果许可已经耗尽,后续线程就需要等待,直到有线程释放许可(计数器加 1)。
在 Java 中,最典型的实现就是java.util.concurrent.Semaphore。在分布式场景中,我们常用Redis 实现分布式信号量来做跨进程、跨机器的并发控制。
二、信号量的典型使用场景
1. 控制并发访问数量
- 某个接口、某段逻辑最多只能允许 N 个线程同时执行,防止压垮下游服务;
- 超过 N 的请求要么排队等待,要么快速失败返回“稍后重试”。
2. 限制资源(连接、对象)的最大使用数量
- 只有 10 条数据库连接、20 个对象实例等,希望同时占用数量不能超过上限;
- 使用信号量来“发放名额”,获得许可才能从池子里借资源,用完后释放。
3. 实现简单对象池 / 连接池控制
- 借出前
acquire(),归还时release(),确保池中“借出中的数量”不超过初始化设置; - 与阻塞队列一起使用时,可以同时控制“任务排队长度”和“并发执行数量”。
4. 实现“并发维度”的限流
- 常见限流维度有 QPS(每秒请求数)、并发数(同时处理中的请求数);
- 信号量天然适合做“最大并发数 = N”的限流,避免服务被瞬时高并发拖垮。
5. 模拟现实世界资源:车位 / 座位 / 号牌
- 地下停车场有 100 个车位,每进来一辆车就占用一个车位,没有车位就必须等待;
- 影院有 200 个座位,只能卖 200 张票,多了就要拒绝;
- 这些场景都可以用信号量来抽象和建模。
三、本地信号量工具与代码示例(Java)
本小节先看“单机(单 JVM)”场景下的信号量工具,主要是 Java 并发包中的几个常用类:
Semaphore:标准的计数信号量;CountDownLatch:一次性“倒计时信号量”;CyclicBarrier:可循环使用的“阶段性同步栅栏”。
3.1Semaphore:标准计数信号量
Semaphore是 Java 并发包java.util.concurrent提供的计数信号量实现,用于控制同时访问某资源的线程数量。
典型构造方式:
// permits: 允许同时访问的线程数Semaphoresemaphore=newSemaphore(intpermits);// fair: 是否公平(先来先得),默认为非公平SemaphorefairSemaphore=newSemaphore(3,true);示例:限制任务的同时执行数量
importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Semaphore;publicclassSemaphoreDemo{// 最多允许 3 个线程并发执行privatestaticfinalSemaphoreSEMAPHORE=newSemaphore(3);publicstaticvoidmain(String[]args){ExecutorServiceexecutorService=Executors.newFixedThreadPool(10);for(inti=0;i<10;i++){inttaskId=i;executorService.submit(()->{try{// 获取许可,如果没有可用许可,会阻塞等待SEMAPHORE.acquire();System.out.println("任务 "+taskId+" 获取许可,开始执行,当前线程:"+Thread.currentThread().getName());Thread.sleep(2000);// 模拟业务处理System.out.println("任务 "+taskId+" 执行完成,准备释放许可");}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{// 无论如何都要释放SEMAPHORE.release();}});}executorService.shutdown();}}要点:
acquire():阻塞式获取许可,没有许可时会挂起当前线程;tryAcquire()/tryAcquire(timeout, unit):非阻塞或带超时获取许可,适合做“快速失败”或“有限等待”;release():释放许可,通常放在finally中,避免异常导致“占坑不还”。
3.2CountDownLatch:一次性“倒计时信号量”
CountDownLatch通过一个初始计数值来控制线程之间的等待关系:
- 初始化时设置一个正整数 N;
- 每当某个任务完成时调用
countDown(),计数减 1; - 当计数减到 0 时,所有在
await()处等待的线程会被同时唤醒。
它不是传统意义上的“可重复使用的信号量”,但同样基于计数机制来实现线程协作。
示例:主线程等待多个子任务执行完毕
importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;publicclassCountDownLatchDemo{privatestaticfinalintTASK_COUNT=3;publicstaticvoidmain(String[]args)throwsInterruptedException{CountDownLatchlatch=newCountDownLatch(TASK_COUNT);ExecutorServiceexecutorService=Executors.newFixedThreadPool(TASK_COUNT);for(inti=0;i<TASK_COUNT;i++){inttaskId=i;executorService.submit(()->{try{System.out.println("子任务 "+taskId+" 开始执行...");Thread.sleep(1000+taskId*500L);System.out.println("子任务 "+taskId+" 执行完成");}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{// 完成一个任务,计数减一latch.countDown();}});}System.out.println("主线程等待所有子任务完成...");// 当计数归零时,await 解除阻塞latch.await();System.out.println("所有子任务完成,主线程继续执行");executorService.shutdown();}}特点:
- 适合“一次性”的协作,例如:系统启动时等待多个模块初始化完成;
- 计数归零后就不能重置,因此不能循环使用。
3.3CyclicBarrier:可循环使用的同步栅栏
CyclicBarrier更像是“可重复使用的栅栏”,适合多线程在某个阶段全部到齐后再一起进入下一阶段。
- 构造时指定参与线程数 N;
- 每个线程在某个阶段完成后调用
await()等待其他线程; - 当第 N 个线程调用
await()时,所有线程同时被唤醒,继续执行; - 栅栏可重用,可以进入下一轮阶段同步。
示例:多个线程分阶段同步执行
importjava.util.concurrent.BrokenBarrierException;importjava.util.concurrent.CyclicBarrier;publicclassCyclicBarrierDemo{privatestaticfinalintTHREAD_COUNT=3;publicstaticvoidmain(String[]args){// 所有线程都到达屏障点后,会执行 barrierActionCyclicBarrierbarrier=newCyclicBarrier(THREAD_COUNT,()->System.out.println("所有线程到达栅栏点,开始下一阶段..."));for(inti=0;i<THREAD_COUNT;i++){intworkerId=i;Threadworker=newThread(()->{try{System.out.println("线程 "+workerId+" 执行第一阶段任务");Thread.sleep(1000+workerId*500L);System.out.println("线程 "+workerId+" 第一阶段完成,等待其他线程...");barrier.await();// 等待所有线程到达这里System.out.println("线程 "+workerId+" 开始第二阶段任务");Thread.sleep(1000+workerId*500L);System.out.println("线程 "+workerId+" 第二阶段完成");}catch(InterruptedExceptione){Thread.currentThread().interrupt();}catch(BrokenBarrierExceptione){System.out.println("栅栏被打破:"+e.getMessage());}});worker.start();}}}特点:
- 适合“多线程分阶段协同”的场景,比如并行计算中的分步汇总;
- 与
CountDownLatch相比,CyclicBarrier可以多次复用。
四、Redis 信号量:分布式场景下的并发控制
上面的信号量工具都属于“本地(单 JVM)”工具,只能控制当前进程内部线程的并发访问。在微服务、分布式系统中,我们更常遇到这样的需求:
- 多个应用实例部署在不同机器上;
- 所有实例访问同一个第三方接口 / 下游服务;
- 希望对整个集群设置“最大并发数”的上限,比如全局最多 50 个请求同时打到第三方接口。
这时,单机Semaphore已经不够用了,需要一个分布式信号量,而 Redis 就是一个非常合适的“共享状态存储”。
4.1 Redis 信号量是什么?
Redis 本身并没有内置Semaphore类型,但我们可以基于 Redis 的:
- 原子自增 / 自减(
INCR/DECR); - Lua 脚本;
- 有序集合 / 列表等数据结构;
来实现一个跨进程、跨机器共享的计数器,从而达到“分布式信号量”的效果。
很多 Redis 客户端框架(例如 Redisson)已经封装好了这套逻辑,提供了类似RSemaphore的抽象,对使用者来说与本地Semaphore非常相似。
4.2 Redis 信号量的特别之处
与本地Semaphore相比,Redis 信号量有几个非常重要的特点:
作用范围不同:
- 本地
Semaphore只在当前 JVM 内部有效; - Redis 信号量基于 Redis 存储,天然支持多实例、多机器之间共享,适合作为全局并发配额控制工具。
- 本地
阻塞模型不同:
- 本地
Semaphore.acquire()可以直接阻塞当前线程,等待许可释放; - Redis 是网络服务,本身不会“挂起”调用方线程,通常通过“轮询 + 睡眠”或封装在客户端(如 Redisson)内部来实现等待逻辑。
- 本地
需要考虑“宕机 / 超时”场景:
- 本地环境下,如果线程异常终止,一般很快能感知;
- 分布式环境中,某个实例拿到许可后如果宕机或者网络异常,可能永远不会主动释放许可;
- 因此 Redis 信号量通常要配合过期时间(TTL)或租约(lease)机制,防止“占坑不还”导致整体可用许可数越来越少,最终被锁死。
一致性与容错更复杂:
- 需要考虑主从复制延迟、Redis 宕机、主从切换、客户端重试等情况;
- 一般使用 Lua 脚本将“检查 + 修改”操作打包成原子操作,保证一致性。
总体来说:
本地信号量控制的是“一个进程内的线程并发”;Redis 信号量控制的是“多个进程 / 多台机器之间的全局并发”。
4.3 Redis 信号量的典型使用场景
分布式接口限流(按并发数):
- 整个集群对某个接口、某类操作,最多允许 N 个请求同时在执行;
- 超出的请求要么排队等待,要么直接返回“稍后重试”。
分布式任务调度的全局“工作线程”上限:
- 多个 worker 实例从队列中消费任务;
- 希望集群内同时执行的任务数不能超过 M。
跨语言 / 跨技术栈共享资源配额:
- Java、Go、Python 等不同语言的服务,共同访问同一个下游 API;
- 通过 Redis 的分布式信号量,共享同一套“名额池”。
五、基于 Redisson 的 Redis 分布式信号量代码示例
下面以 Spring Boot + Redisson 为例,演示如何使用 Redis 实现全局并发限制。
5.1 Redisson 配置示例
首先定义一个 Redisson 客户端配置,用于连接 Redis:
importorg.redisson.Redisson;importorg.redisson.api.RedissonClient;importorg.redisson.config.Config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRedissonConfig{@BeanpublicRedissonClientredissonClient(){Configconfig=newConfig();// 单节点示例,生产环境可以使用哨兵 / 集群配置config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);returnRedisson.create(config);}}5.2 使用RSemaphore实现接口级并发限流
假设我们有一个接口/redisSemaphore/doWork,要求:
- 整个集群层面,最多只能有 5 个请求同时在执行;
- 超出并发限制的请求,等待 1 秒还拿不到名额则快速失败,防止长时间阻塞。
importjava.util.concurrent.TimeUnit;importorg.redisson.api.RSemaphore;importorg.redisson.api.RedissonClient;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassRedisSemaphoreController{privatefinalRedissonClientredissonClient;publicRedisSemaphoreController(RedissonClientredissonClient){this.redissonClient=redissonClient;}@GetMapping("/redisSemaphore/doWork")publicStringdoWork()throwsInterruptedException{// 获取一个分布式信号量对象,key 可以根据业务命名RSemaphoresemaphore=redissonClient.getSemaphore("demo:semaphore:doWork");// 初始化许可数(只在第一次时需要,通常可以在应用启动阶段设置好)semaphore.trySetPermits(5);// 尝试在 1 秒内获取一个许可,获取不到则快速失败booleanacquired=semaphore.tryAcquire(1,TimeUnit.SECONDS);if(!acquired){return"当前请求过多,请稍后重试";}try{// 模拟业务逻辑Thread.sleep(2000L);return"处理成功,线程:"+Thread.currentThread().getName();}finally{// 处理完成后归还许可semaphore.release();}}}示例说明:
getSemaphore("demo:semaphore:doWork"):- 所有应用实例只要使用同一个 key,就共享同一批“并发名额”;
trySetPermits(5):- 将最大并发许可数设置为 5,一般在应用初始化时设置一次即可;
tryAcquire(1, TimeUnit.SECONDS):- 等待最多 1 秒尝试获取许可;
- 获取成功则进入业务处理,获取失败则直接返回友好提示;
release():- 放在
finally中,确保无论业务是否异常结束,名额都能被归还。
- 放在
与本地Semaphore相比,差异在于:
- 信号量的状态存储在 Redis 中,而不是单个 JVM 内存中;
- 所有服务实例共享同一“名额池”,实现真正的全局并发限制。
六、总结
- **信号量(Semaphore)**是控制并发访问数量的核心工具,可以看作“带计数器的锁”;
- 在 Java 中,
Semaphore、CountDownLatch、CyclicBarrier等工具可以很好地解决单机多线程场景下的同步与并发控制问题; - 在分布式系统中,单机信号量已经不够,需要依赖 Redis 这类中间件实现分布式信号量,典型场景包括:
- 分布式接口并发限流;
- 分布式任务调度的全局并发数控制;
- 跨语言 / 跨应用共享同一资源配额;
- 基于 Redisson 的
RSemaphore,我们可以以几乎与本地Semaphore相同的编码方式,在 Spring Boot 中轻松落地 Redis 分布式信号量。
在实际工程中,推荐将本地信号量 + Redis 信号量 + 线程池 + 阻塞队列结合使用:
- 本地信号量限制单机并发,保护本机资源(CPU、内存、线程数);
- Redis 信号量限制集群整体并发,保护下游服务;
- 线程池和队列负责排队与调度,从而构建一套既安全又高效的并发治理方案。