news 2026/5/25 17:43:04

【知识获取与分享社区项目 | 项目日记第 11 天】Kafka 异步写与写聚合:从点赞事件到 Redis SDS 计数快照

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【知识获取与分享社区项目 | 项目日记第 11 天】Kafka 异步写与写聚合:从点赞事件到 Redis SDS 计数快照

前言

上一篇我们整理了点赞系统的第一层:分片位图和 Lua 原子判重。

位图解决的是:

用户是否点赞过? 本次点赞是否真的改变了状态?

但是前端展示时还需要点赞数:

这篇知文有多少人点赞?

如果每次点赞都直接更新一个计数 Key,比如:

INCR like_count:10001

虽然简单,但高并发下会造成写热点。

所以项目采用了“异步写 + 写聚合”的方式:

点赞状态变化 ↓ 产生 CounterEvent ↓ Kafka 异步写入 counter-events ↓ 消费者把增量写入 Redis Hash 聚合桶 ↓ 定时任务每 1 秒把聚合增量折叠到 SDS

这一篇就专门整理这条链路。


一、为什么需要异步写聚合?

点赞属于高频写。

如果每次状态变化都直接更新最终计数:

SETBIT 位图 INCR 计数 更新缓存 更新用户获赞数

同步链路会越来越长,用户点击点赞时延也会变高。

项目里的做法是:

同步链路只保证位图事实更新 计数汇总异步完成

这样点赞接口可以很快返回。

计数允许秒级最终一致,所以没有必要强行同步更新所有读模型。


二、计数事件模型

状态变化后,会产生一个CounterEvent

// src/main/java/com/tongji/counter/event/CounterEvent.java@DatapublicclassCounterEvent{privateStringentityType;privateStringentityId;privateStringmetric;privateintidx;privatelonguserId;privateintdelta;}

字段说明:

字段含义
entityType实体类型,比如knowpost
entityId实体 ID
metric指标,比如like
idxSDS 中的字段下标
userId触发动作的用户
delta计数增量,点赞+1,取消点赞-1

例如一次点赞事件可以表示为:

{"entityType":"knowpost","entityId":"10001","metric":"like","idx":1,"userId":888,"delta":1}

取消点赞则是:

{"entityType":"knowpost","entityId":"10001","metric":"like","idx":1,"userId":888,"delta":-1}

三、事件生产者

// src/main/java/com/tongji/counter/event/CounterEventProducer.java@ServicepublicclassCounterEventProducer{privatefinalKafkaTemplate<String,String>kafka;privatefinalObjectMapperobjectMapper;publicvoidpublish(CounterEventevent){try{Stringpayload=objectMapper.writeValueAsString(event);kafka.send(CounterTopics.EVENTS,payload);}catch(JsonProcessingExceptione){// 生产异常不抛出影响主流程,可接入告警}}}

Kafka 主题定义如下:

// src/main/java/com/tongji/counter/event/CounterTopics.javapublicfinalclassCounterTopics{publicstaticfinalStringEVENTS="counter-events";}

点赞主流程里只负责把事件发出去,不等待计数完成。


四、消费事件写入聚合桶

Kafka 消费者在CounterAggregationConsumer中。

// src/main/java/com/tongji/counter/event/CounterAggregationConsumer.java@KafkaListener(topics=CounterTopics.EVENTS,groupId="counter-agg")publicvoidonMessage(Stringmessage,Acknowledgmentack)throwsException{CounterEventevt=objectMapper.readValue(message,CounterEvent.class);StringaggKey=CounterKeys.aggKey(evt.getEntityType(),evt.getEntityId());Stringfield=String.valueOf(evt.getIdx());try{redis.opsForHash().increment(aggKey,field,evt.getDelta());ack.acknowledge();}catch(Exceptionex){// 不提交位点以便重试}}

聚合桶 Key:

publicstaticStringaggKey(StringentityType,StringentityId){returnString.format("agg:%s:%s:%s",CounterSchema.SCHEMA_ID,entityType,entityId);}

实际 Key:

agg:v1:knowpost:10001

Redis Hash 中保存:

field = idx value = delta 累计值

例如:

HINCRBY agg:v1:knowpost:10001 1 20

表示这篇知文的点赞数还有+20增量没刷到 SDS。


五、SDS 计数快照结构

计数最终会折叠到 SDS。

// src/main/java/com/tongji/counter/schema/CounterSchema.javapublicfinalclassCounterSchema{publicstaticfinalStringSCHEMA_ID="v1";publicstaticfinalintFIELD_SIZE=4;publicstaticfinalintSCHEMA_LEN=5;publicstaticfinalintIDX_LIKE=1;publicstaticfinalintIDX_FAV=2;}

SDS 总长度是:

5 * 4 = 20 字节

字段布局:

0: read 预留 1: like 点赞数 2: fav 收藏数 3: comment 预留 4: repost 预留

计数 Key:

publicstaticStringsdsKey(StringentityType,StringentityId){returnString.format("cnt:%s:%s:%s",CounterSchema.SCHEMA_ID,entityType,entityId);}

示例:

cnt:v1:knowpost:10001

SDS 是一个 Redis String,但里面保存的是二进制定长结构。


六、定时刷写聚合桶

项目通过定时任务每 1 秒刷写一次聚合桶。

@Scheduled(fixedDelay=1000L)publicvoidflush(){Set<String>keys=redis.keys("agg:"+CounterSchema.SCHEMA_ID+":*");if(keys.isEmpty()){return;}for(StringaggKey:keys){Map<Object,Object>entries=redis.opsForHash().entries(aggKey);if(entries.isEmpty()){continue;}String[]parts=aggKey.split(":",4);StringcntKey=CounterKeys.sdsKey(parts[2],parts[3]);for(Map.Entry<Object,Object>e:entries.entrySet()){Stringfield=String.valueOf(e.getKey());longdelta=Long.parseLong(String.valueOf(e.getValue()));intidx=Integer.parseInt(field);redis.execute(incrScript,List.of(cntKey),String.valueOf(CounterSchema.SCHEMA_LEN),String.valueOf(CounterSchema.FIELD_SIZE),String.valueOf(idx),String.valueOf(delta));redis.execute(decrScript,List.of(aggKey),field,String.valueOf(delta));}if(redis.opsForHash().size(aggKey)==0L){redis.delete(aggKey);}}}

刷写逻辑可以拆成:

扫描 agg:v1:* 聚合桶 ↓ 读取每个 Hash field 的 delta ↓ 根据 aggKey 解析 entityType 和 entityId ↓ 定位 cnt:v1:{entityType}:{entityId} ↓ Lua 原子更新 SDS 中对应字段 ↓ 从聚合桶扣减已刷写 delta ↓ Hash 为空则删除聚合桶

七、Lua 原子折叠到 SDS

localcntKey=KEYS[1]localschemaLen=tonumber(ARGV[1])localfieldSize=tonumber(ARGV[2])localidx=tonumber(ARGV[3])localdelta=tonumber(ARGV[4])localcnt=redis.call('GET',cntKey)ifnotcntthencnt=string.rep(string.char(0),schemaLen*fieldSize)endlocaloff=idx*fieldSizelocalv=read32be(cnt,off)+deltaifv<0thenv=0endlocalseg=write32be(v)cnt=string.sub(cnt,1,off)..seg..string.sub(cnt,off+fieldSize+1)redis.call('SET',cntKey,cnt)return1

这里有几个细节:

1. SDS 不存在时自动初始化

cnt=string.rep(string.char(0),schemaLen*fieldSize)

也就是初始化一个 20 字节的全 0 字符串。

2. 按 idx 定位字段

localoff=idx*fieldSize

点赞字段idx=1,偏移就是:

1 * 4 = 4

3. 小于 0 时归 0

ifv<0thenv=0end

这样可以避免异常情况下出现负数计数。


八、为什么不用 Redis Hash 直接做最终计数?

Redis Hash 版本可能是这样:

HINCRBY cnt:knowpost:10001 like 1 HINCRBY cnt:knowpost:10001 fav 1

它确实简单,但每个 field/value 都有结构开销。

而 SDS 定长结构是:

20 字节保存 5 个指标

对于大量内容实体来说,SDS 的空间更紧凑。

同时固定偏移读取也很快:

like 偏移 = 1 * 4 fav 偏移 = 2 * 4

这也是项目选择定制化 SDS 的原因。


九、读取计数接口

// src/main/java/com/tongji/counter/api/CounterController.java@GetMapping("/{etype}/{eid}")publicResponseEntity<CountsResponse>getCounts(@PathVariable("etype")StringentityType,@PathVariable("eid")StringentityId,@RequestParam(value="metrics",required=false)StringmetricsStr){List<String>metrics;if(metricsStr==null||metricsStr.isBlank()){metrics=newArrayList<>(CounterSchema.SUPPORTED_METRICS);}else{metrics=Arrays.stream(metricsStr.split(",")).map(String::trim).filter(CounterSchema.SUPPORTED_METRICS::contains).toList();}Map<String,Long>counts=counterService.getCounts(entityType,entityId,metrics);returnResponseEntity.ok(newCountsResponse(entityType,entityId,counts));}

请求示例:

GET /api/v1/counter/knowpost/10001?metrics=like,fav

响应:

{"entityType":"knowpost","entityId":"10001","counts":{"like":128,"fav":36}}

十、正常读取 SDS

byte[]raw=getRaw(sdsKey);booleanneedRebuild=raw==null||raw.length!=expectedLen;if(!needRebuild){for(Stringm:metrics){Integeridx=CounterSchema.NAME_TO_IDX.get(m);intoff=idx*CounterSchema.FIELD_SIZE;longval=readInt32BE(raw,off);result.put(m,val);}}

正常读取只需要:

Redis GET 一次 本地按偏移解析字节

所以读路径非常轻。


十一、Feed 中如何使用计数?

Feed 里读取计数时,也会调用CounterService

Map<String,Long>counts=counterService.getCounts("knowpost",String.valueOf(base.id()),List.of("like","fav"));LonglikeCount=counts.getOrDefault("like",0L);LongfavoriteCount=counts.getOrDefault("fav",0L);

用户态状态则实时读位图:

booleanliked=uid!=null&&counterService.isLiked("knowpost",base.id(),uid);booleanfaved=uid!=null&&counterService.isFaved("knowpost",base.id(),uid);

这样设计可以避免公共 Feed 缓存被用户态字段污染。


十二、知识点总结

1. 为什么点赞要异步聚合?

因为点赞是高频写,直接更新最终计数会产生写热点。

Kafka + Redis 聚合桶可以削峰,把多次增量合并后再刷写。

2. Redis 聚合桶有什么作用?

聚合桶保存短时间内的计数增量。

比如 1 秒内同一篇文章新增 100 个点赞,最终可以聚合成一个字段增量,而不是每次都写最终 SDS。

3. SDS 适合什么场景?

适合指标数量固定、读频率高、希望压缩内存开销的计数场景。

4. 为什么计数是最终一致?

因为点赞状态先写位图,计数增量再异步刷 SDS。

在 1 秒刷写窗口内,计数可能略滞后,但用户状态是实时准确的。


总结

这一篇主要整理了点赞系统的异步写和写聚合链路。

用户点赞时,同步链路只负责位图状态变化;如果状态真的变化,就产生CounterEvent。事件进入 Kafka 后,消费者先写 Redis Hash 聚合桶,后台定时任务再把增量折叠到 SDS 二进制计数快照。

这套方案牺牲了秒级强一致,但换来了更高的写入吞吐和更低的读成本,非常适合点赞这种高频、可最终一致的业务场景。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/25 17:42:52

3分钟极速部署:Windows免费PDF处理工具终极指南

3分钟极速部署&#xff1a;Windows免费PDF处理工具终极指南 【免费下载链接】poppler-windows Download Poppler binaries packaged for Windows with dependencies 项目地址: https://gitcode.com/gh_mirrors/po/poppler-windows 还在为Windows系统上处理PDF文档而烦恼…

作者头像 李华
网站建设 2026/5/25 17:42:50

TaotokenAPIKey的精细权限管理与审计日志功能使用体验分享

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 Taotoken API Key 的精细权限管理与审计日志功能使用体验分享 1. 引言 在团队协作中引入大模型能力时&#xff0c;一个核心的工程…

作者头像 李华
网站建设 2026/5/25 17:42:38

多传感器融合模型后处理C++工程师面试参考回答

智驾多传感器融合|模型后处理C工程师面试参考回答 前言&#xff1a;本文为上一篇《面试重点盘点》逐点配套详细标准答案。全文采用社招3年工程师量产口吻&#xff0c;语言通俗、不堆砌论文公式、面试可直接口述&#xff1b;所有C、Linux、工程问题通用适配所有后端、嵌入式、服…

作者头像 李华
网站建设 2026/5/25 17:42:11

基于ESP32/ESP8266与LAMP栈构建低成本分布式物联网传感系统

1. 项目概述&#xff1a;用ESP构建你自己的分布式传感云几年前&#xff0c;当我们需要在厂区部署一套环境监测系统时&#xff0c;面对动辄上万的工业网关和复杂的组网协议&#xff0c;我就在想&#xff0c;有没有一种更轻量、更灵活且成本极低的方式&#xff1f;直到我开始深度…

作者头像 李华
网站建设 2026/5/25 17:41:06

微信红包助手终极指南:无需ROOT的智能抢红包解决方案

微信红包助手终极指南&#xff1a;无需ROOT的智能抢红包解决方案 【免费下载链接】WeChatLuckyMoney :money_with_wings: WeChats lucky money helper (微信抢红包插件) by Zhongyi Tong. An Android app that helps you snatch red packets in WeChat groups. 项目地址: ht…

作者头像 李华