前言
上一篇我们整理了点赞系统的第一层:分片位图和 Lua 原子判重。
位图解决的是:
用户是否点赞过? 本次点赞是否真的改变了状态?但是前端展示时还需要点赞数:
这篇知文有多少人点赞?如果每次点赞都直接更新一个计数 Key,比如:
INCR like_count:10001虽然简单,但高并发下会造成写热点。
所以项目采用了“异步写 + 写聚合”的方式:
点赞状态变化 ↓ 产生 CounterEvent ↓ Kafka 异步写入 counter-events ↓ 消费者把增量写入 Redis Hash 聚合桶 ↓ 定时任务每 1 秒把聚合增量折叠到 SDS这一篇就专门整理这条链路。
一、为什么需要异步写聚合?
点赞属于高频写。
如果每次状态变化都直接更新最终计数:
SETBIT 位图 INCR 计数 更新缓存 更新用户获赞数同步链路会越来越长,用户点击点赞时延也会变高。
项目里的做法是:
同步链路只保证位图事实更新 计数汇总异步完成这样点赞接口可以很快返回。
计数允许秒级最终一致,所以没有必要强行同步更新所有读模型。
二、计数事件模型
状态变化后,会产生一个CounterEvent。
// src/main/java/com/tongji/counter/event/CounterEvent.java@DatapublicclassCounterEvent{privateStringentityType;privateStringentityId;privateStringmetric;privateintidx;privatelonguserId;privateintdelta;}字段说明:
| 字段 | 含义 |
|---|---|
entityType | 实体类型,比如knowpost |
entityId | 实体 ID |
metric | 指标,比如like |
idx | SDS 中的字段下标 |
userId | 触发动作的用户 |
delta | 计数增量,点赞+1,取消点赞-1 |
例如一次点赞事件可以表示为:
{"entityType":"knowpost","entityId":"10001","metric":"like","idx":1,"userId":888,"delta":1}取消点赞则是:
{"entityType":"knowpost","entityId":"10001","metric":"like","idx":1,"userId":888,"delta":-1}三、事件生产者
// src/main/java/com/tongji/counter/event/CounterEventProducer.java@ServicepublicclassCounterEventProducer{privatefinalKafkaTemplate<String,String>kafka;privatefinalObjectMapperobjectMapper;publicvoidpublish(CounterEventevent){try{Stringpayload=objectMapper.writeValueAsString(event);kafka.send(CounterTopics.EVENTS,payload);}catch(JsonProcessingExceptione){// 生产异常不抛出影响主流程,可接入告警}}}Kafka 主题定义如下:
// src/main/java/com/tongji/counter/event/CounterTopics.javapublicfinalclassCounterTopics{publicstaticfinalStringEVENTS="counter-events";}点赞主流程里只负责把事件发出去,不等待计数完成。
四、消费事件写入聚合桶
Kafka 消费者在CounterAggregationConsumer中。
// src/main/java/com/tongji/counter/event/CounterAggregationConsumer.java@KafkaListener(topics=CounterTopics.EVENTS,groupId="counter-agg")publicvoidonMessage(Stringmessage,Acknowledgmentack)throwsException{CounterEventevt=objectMapper.readValue(message,CounterEvent.class);StringaggKey=CounterKeys.aggKey(evt.getEntityType(),evt.getEntityId());Stringfield=String.valueOf(evt.getIdx());try{redis.opsForHash().increment(aggKey,field,evt.getDelta());ack.acknowledge();}catch(Exceptionex){// 不提交位点以便重试}}聚合桶 Key:
publicstaticStringaggKey(StringentityType,StringentityId){returnString.format("agg:%s:%s:%s",CounterSchema.SCHEMA_ID,entityType,entityId);}实际 Key:
agg:v1:knowpost:10001Redis Hash 中保存:
field = idx value = delta 累计值例如:
HINCRBY agg:v1:knowpost:10001 1 20表示这篇知文的点赞数还有+20增量没刷到 SDS。
五、SDS 计数快照结构
计数最终会折叠到 SDS。
// src/main/java/com/tongji/counter/schema/CounterSchema.javapublicfinalclassCounterSchema{publicstaticfinalStringSCHEMA_ID="v1";publicstaticfinalintFIELD_SIZE=4;publicstaticfinalintSCHEMA_LEN=5;publicstaticfinalintIDX_LIKE=1;publicstaticfinalintIDX_FAV=2;}SDS 总长度是:
5 * 4 = 20 字节字段布局:
0: read 预留 1: like 点赞数 2: fav 收藏数 3: comment 预留 4: repost 预留计数 Key:
publicstaticStringsdsKey(StringentityType,StringentityId){returnString.format("cnt:%s:%s:%s",CounterSchema.SCHEMA_ID,entityType,entityId);}示例:
cnt:v1:knowpost:10001SDS 是一个 Redis String,但里面保存的是二进制定长结构。
六、定时刷写聚合桶
项目通过定时任务每 1 秒刷写一次聚合桶。
@Scheduled(fixedDelay=1000L)publicvoidflush(){Set<String>keys=redis.keys("agg:"+CounterSchema.SCHEMA_ID+":*");if(keys.isEmpty()){return;}for(StringaggKey:keys){Map<Object,Object>entries=redis.opsForHash().entries(aggKey);if(entries.isEmpty()){continue;}String[]parts=aggKey.split(":",4);StringcntKey=CounterKeys.sdsKey(parts[2],parts[3]);for(Map.Entry<Object,Object>e:entries.entrySet()){Stringfield=String.valueOf(e.getKey());longdelta=Long.parseLong(String.valueOf(e.getValue()));intidx=Integer.parseInt(field);redis.execute(incrScript,List.of(cntKey),String.valueOf(CounterSchema.SCHEMA_LEN),String.valueOf(CounterSchema.FIELD_SIZE),String.valueOf(idx),String.valueOf(delta));redis.execute(decrScript,List.of(aggKey),field,String.valueOf(delta));}if(redis.opsForHash().size(aggKey)==0L){redis.delete(aggKey);}}}刷写逻辑可以拆成:
扫描 agg:v1:* 聚合桶 ↓ 读取每个 Hash field 的 delta ↓ 根据 aggKey 解析 entityType 和 entityId ↓ 定位 cnt:v1:{entityType}:{entityId} ↓ Lua 原子更新 SDS 中对应字段 ↓ 从聚合桶扣减已刷写 delta ↓ Hash 为空则删除聚合桶七、Lua 原子折叠到 SDS
localcntKey=KEYS[1]localschemaLen=tonumber(ARGV[1])localfieldSize=tonumber(ARGV[2])localidx=tonumber(ARGV[3])localdelta=tonumber(ARGV[4])localcnt=redis.call('GET',cntKey)ifnotcntthencnt=string.rep(string.char(0),schemaLen*fieldSize)endlocaloff=idx*fieldSizelocalv=read32be(cnt,off)+deltaifv<0thenv=0endlocalseg=write32be(v)cnt=string.sub(cnt,1,off)..seg..string.sub(cnt,off+fieldSize+1)redis.call('SET',cntKey,cnt)return1这里有几个细节:
1. SDS 不存在时自动初始化
cnt=string.rep(string.char(0),schemaLen*fieldSize)也就是初始化一个 20 字节的全 0 字符串。
2. 按 idx 定位字段
localoff=idx*fieldSize点赞字段idx=1,偏移就是:
1 * 4 = 43. 小于 0 时归 0
ifv<0thenv=0end这样可以避免异常情况下出现负数计数。
八、为什么不用 Redis Hash 直接做最终计数?
Redis Hash 版本可能是这样:
HINCRBY cnt:knowpost:10001 like 1 HINCRBY cnt:knowpost:10001 fav 1它确实简单,但每个 field/value 都有结构开销。
而 SDS 定长结构是:
20 字节保存 5 个指标对于大量内容实体来说,SDS 的空间更紧凑。
同时固定偏移读取也很快:
like 偏移 = 1 * 4 fav 偏移 = 2 * 4这也是项目选择定制化 SDS 的原因。
九、读取计数接口
// src/main/java/com/tongji/counter/api/CounterController.java@GetMapping("/{etype}/{eid}")publicResponseEntity<CountsResponse>getCounts(@PathVariable("etype")StringentityType,@PathVariable("eid")StringentityId,@RequestParam(value="metrics",required=false)StringmetricsStr){List<String>metrics;if(metricsStr==null||metricsStr.isBlank()){metrics=newArrayList<>(CounterSchema.SUPPORTED_METRICS);}else{metrics=Arrays.stream(metricsStr.split(",")).map(String::trim).filter(CounterSchema.SUPPORTED_METRICS::contains).toList();}Map<String,Long>counts=counterService.getCounts(entityType,entityId,metrics);returnResponseEntity.ok(newCountsResponse(entityType,entityId,counts));}请求示例:
GET /api/v1/counter/knowpost/10001?metrics=like,fav响应:
{"entityType":"knowpost","entityId":"10001","counts":{"like":128,"fav":36}}十、正常读取 SDS
byte[]raw=getRaw(sdsKey);booleanneedRebuild=raw==null||raw.length!=expectedLen;if(!needRebuild){for(Stringm:metrics){Integeridx=CounterSchema.NAME_TO_IDX.get(m);intoff=idx*CounterSchema.FIELD_SIZE;longval=readInt32BE(raw,off);result.put(m,val);}}正常读取只需要:
Redis GET 一次 本地按偏移解析字节所以读路径非常轻。
十一、Feed 中如何使用计数?
Feed 里读取计数时,也会调用CounterService:
Map<String,Long>counts=counterService.getCounts("knowpost",String.valueOf(base.id()),List.of("like","fav"));LonglikeCount=counts.getOrDefault("like",0L);LongfavoriteCount=counts.getOrDefault("fav",0L);用户态状态则实时读位图:
booleanliked=uid!=null&&counterService.isLiked("knowpost",base.id(),uid);booleanfaved=uid!=null&&counterService.isFaved("knowpost",base.id(),uid);这样设计可以避免公共 Feed 缓存被用户态字段污染。
十二、知识点总结
1. 为什么点赞要异步聚合?
因为点赞是高频写,直接更新最终计数会产生写热点。
Kafka + Redis 聚合桶可以削峰,把多次增量合并后再刷写。
2. Redis 聚合桶有什么作用?
聚合桶保存短时间内的计数增量。
比如 1 秒内同一篇文章新增 100 个点赞,最终可以聚合成一个字段增量,而不是每次都写最终 SDS。
3. SDS 适合什么场景?
适合指标数量固定、读频率高、希望压缩内存开销的计数场景。
4. 为什么计数是最终一致?
因为点赞状态先写位图,计数增量再异步刷 SDS。
在 1 秒刷写窗口内,计数可能略滞后,但用户状态是实时准确的。
总结
这一篇主要整理了点赞系统的异步写和写聚合链路。
用户点赞时,同步链路只负责位图状态变化;如果状态真的变化,就产生CounterEvent。事件进入 Kafka 后,消费者先写 Redis Hash 聚合桶,后台定时任务再把增量折叠到 SDS 二进制计数快照。
这套方案牺牲了秒级强一致,但换来了更高的写入吞吐和更低的读成本,非常适合点赞这种高频、可最终一致的业务场景。