上一篇【第88篇】日志收集平台的Kafka实战——百亿日志的接入、传输与清洗
下一篇【第90篇】Kafka在微服务中的最佳实践——事件驱动架构设计全攻略
摘要
数据库里一条记录变了,怎么让ES、Redis、数据仓库同步感知?传统的定时全量同步太重了,延迟动辄几分钟到几小时。而CDC(Change Data Capture,变更数据捕获)能把数据库的每一次增删改变成一条实时消息,通过Kafka广播给所有下游——延迟降到秒级,还不影响源库性能。
本文深入MySQL CDC与Kafka的集成实战:从Binlog原理讲起,到Debezium的完整配置与调优,再到Binlog事件→Kafka消息的映射机制,下游消费者的幂等处理方案,大表百万级数据的全量初始化策略,以及让人头疼的DDL变更处理。每个环节都有可落地的配置和代码。
一、CDC原理——从Binlog到Kafka的魔法
1.1 什么是CDC
【传统数据同步 vs CDC实时同步】 传统方案(定时全量同步): ┌────────┐ 每5分钟一次 ┌────────┐ │ MySQL │───────────────►│ ES │ │ 1000万行│ 全表扫描 │ 1000万行│ └────────┘ CPU飙升! └────────┘ 问题: 延迟大、源库压力大、无法感知DELETE CDC方案(实时增量同步): ┌────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │ MySQL │ │ Binlog流 │ │ Kafka │ │ ES │ │ INSERT │──► 实时读取 │──► 消息队列 │──► 增量更新 │ │ UPDATE │ │ Binlog │ │ │ │ Redis │ │ DELETE │ │ │ │ │ │ Hive │ └────────┘ └──────────┘ └──────────┘ └────────┘ 延迟: <1秒 源库压力: 几乎为零 支持DELETECDC的核心思想很简单:不去扫表,而是监听数据库的变更日志。
1.2 MySQL Binlog的三种格式
| 格式 | 记录方式 | 优点 | 缺点 | CDC适用 |
|---|---|---|---|---|
| STATEMENT | 记录SQL语句 | 日志量小 | 不确定函数(NOW()等)导致不一致 | ❌ |
| ROW | 记录每行数据变更 | 精确、可重放 | 日志量大 | ✅ 必须用ROW |
| MIXED | 混合,默认STATEMENT | 折中 | CDC可能丢数据 | ❌ |
必须将MySQL的binlog_format设置为ROW,这是Debezium工作的前提。
-- 检查当前binlog格式SHOWVARIABLESLIKE'binlog_format';-- 设置为ROW(需要重启或动态修改)SETGLOBALbinlog_format='ROW';-- 同时确保binlog是开启的SHOWVARIABLESLIKE'log_bin';1.3 Binlog事件类型
【Binlog事件 → Kafka消息映射】 MySQL Binlog事件 Kafka消息 ────────────────────────────────────────────────── INSERT INTO users VALUES (...) → Create事件 {op: "c", after: {...}} UPDATE users SET name='Bob' → Update事件 {op: "u", before:{...}, after:{...}} DELETE FROM users WHERE id=1 → Delete事件 {op: "d", before: {...}} ALTER TABLE users ADD COLUMN ... → schema变更事件(需要特殊处理)二、Debezium配置与使用
2.1 Debezium Connector架构
【Debezium + Kafka Connect 架构】 ┌─────────────────────────────────────────────────────┐ │ Kafka Connect Worker │ │ │ │ ┌───────────────────────────────────────────────┐ │ │ │ Debezium MySQL Connector │ │ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────────┐ │ │ │ │ │ Snapshot线程 │ │ Binlog读取线程 │ │ │ │ │ │ (全量初始化) │ │ (增量同步) │ │ │ │ │ └──────┬───────┘ └────────┬─────────┘ │ │ │ │ │ │ │ │ │ │ └───────────┬───────────┘ │ │ │ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ │ │ Kafka Topic │ │ │ │ │ │ (每条表的 │ │ │ │ │ │ 变更事件) │ │ │ │ │ └─────────────┘ │ │ │ └───────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ Snapshot: 首次启动时全量导出表数据到Kafka Binlog读取: 实时监听Binlog,增量推送变更2.2 完整配置
{"name":"mysql-orders-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector",// ===== MySQL连接配置 ====="database.hostname":"mysql-master","database.port":"3306","database.user":"debezium","database.password":"${DBZ_MYSQL_PASSWORD}","database.server.id":"184054","database.server.name":"prod-mysql",// ===== 要监控的库和表 ====="database.include.list":"ecommerce","table.include.list":"ecommerce.orders,ecommerce.order_items,ecommerce.payments",// ===== Kafka连接配置 ====="database.history.kafka.bootstrap.servers":"broker1:9092,broker2:9092","database.history.kafka.topic":"dbhistory.prod-mysql",// ===== 全量初始化配置 ====="snapshot.mode":"initial","snapshot.locking.mode":"minimal","snapshot.fetch.size":"10000",// ===== 数据类型转换 ====="decimal.handling.mode":"double","time.precision.mode":"connect",// ===== 高级配置 ====="max.queue.size":"8192","max.batch.size":"2048","poll.interval.ms":"500",// ===== Topic命名策略 ====="topic.prefix":"prod-mysql",// ===== DDL处理 ====="include.schema.changes":"true","schema.history.internal.kafka.bootstrap.servers":"broker1:9092","schema.history.internal.kafka.topic":"schema-history.prod-mysql"}}// 部署命令curl-XPOST-H"Content-Type: application/json"\--data @mysql-connector.json \http://connect-host:8083/connectors2.3 MySQL端权限配置
-- 创建Debezium专用用户CREATEUSER'debezium'@'%'IDENTIFIEDBY'strong-password';-- 授予必要权限GRANTSELECT,RELOAD,SHOWDATABASES,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TO'debezium'@'%';FLUSHPRIVILEGES;三、Binlog事件到Kafka消息的映射
3.1 Kafka消息结构
Debezium发送到Kafka的消息结构:
{"schema":{...},"payload":{"before":null,// UPDATE/DELETE时有值"after":{// INSERT/UPDATE时有值"id":1001,"user_id":"U10086","amount":299.00,"status":"CREATED","created_at":"2026-05-30T10:30:00Z"},"source":{// 元数据"version":"2.5.0.Final","connector":"mysql","name":"prod-mysql","ts_ms":1717059600000,"snapshot":"false",// 是否来自快照"db":"ecommerce","table":"orders","server_id":1,"gtid":"xxx","file":"mysql-bin.000003","pos":12345,"row":0},"op":"c",// c=create, u=update, d=delete, r=read(快照)"ts_ms":1717059600000}}3.2 下游Topic命名规则
Debezium自动生成的Topic:
规则: {topic.prefix}.{database}.{table} 示例: prod-mysql.ecommerce.orders → orders表的变更 prod-mysql.ecommerce.order_items → order_items表的变更 prod-mysql.ecommerce.payments → payments表的变更四、下游消费者幂等处理
4.1 CDC消息的特性
CDC消息有一个重要特性:At-Least-Once语义。即同一条数据库变更可能被发送多次(因为Connector故障恢复、Rebalance等)。消费者必须做幂等处理。
4.2 基于操作类型的幂等写入ES
@ComponentpublicclassOrderCDCToES{privatefinalRestHighLevelClientesClient;@KafkaListener(topics="prod-mysql.ecommerce.orders",groupId="es-sync-service")publicvoidhandleOrderChange(StringjsonMessage){CDCEventevent=parseCDCEvent(jsonMessage);StringorderId=event.getAfter().get("id").toString();StringesId="order_"+orderId;switch(event.getOp()){case"c","r"->{// CREATE 或 快照READ// UPSERT 写入ES(天然幂等)IndexRequestrequest=newIndexRequest("orders").id(esId).source(event.getAfter(),XContentType.JSON);esClient.index(request,RequestOptions.DEFAULT);}case"u"->{// UPDATE// 部分更新(幂等)UpdateRequestrequest=newUpdateRequest("orders",esId).doc(event.getAfter(),XContentType.JSON).upsert(event.getAfter());// 如果不存在就插入esClient.update(request,RequestOptions.DEFAULT);}case"d"->{// DELETE// 删除(幂等,删多次不会报错)DeleteRequestrequest=newDeleteRequest("orders",esId);esClient.delete(request,RequestOptions.DEFAULT);}}}}4.3 写入Redis的幂等处理
@KafkaListener(topics="prod-mysql.ecommerce.orders",groupId="redis-cache-service")publicvoidhandleOrderChangeForRedis(StringjsonMessage){CDCEventevent=parseCDCEvent(jsonMessage);StringorderId=event.getAfter()!=null?event.getAfter().get("id").toString():event.getBefore().get("id").toString();StringredisKey="order:"+orderId;switch(event.getOp()){case"c","r","u"->{// SET操作天然幂等redisTemplate.opsForValue().set(redisKey,JSON.toJSONString(event.getAfter()),Duration.ofHours(24));}case"d"->{// DEL操作天然幂等redisTemplate.delete(redisKey);}}}4.4 利用source信息做精确去重
对于需要严格去重的场景,可以用Debezium提供的binlog位置信息:
publicclassCDCDeduplicator{privatefinalRedisTemplate<String,String>redis;publicbooleanisDuplicate(CDCEventevent){JsonNodesource=event.getSource();StringdedupKey=String.format("%s:%s:%d",source.get("file").asText(),// binlog文件名source.get("pos").asLong(),// binlog位置source.get("row").asInt()// 行号);// SetNX: 返回true说明是新事件,false说明已处理过return!redis.opsForValue().setIfAbsent("cdc_dedup:"+dedupKey,"1",Duration.ofDays(7));}}五、大表全量初始化方案
5.1 问题场景
订单表有5000万行数据,如果一次全量快照,需要几个小时,还可能在快照期间锁表。
5.2 Debezium的快照策略
| snapshot.mode | 行为 | 适用场景 |
|---|---|---|
initial | 首次启动做全量快照,之后增量 | 新部署(默认) |
when_needed | 只在需要时做快照 | 恢复数据后 |
never | 永远不做快照 | 从指定binlog位点开始 |
schema_only | 只同步表结构 | 数据已存在,只同步增量 |
initial_only | 只做快照然后停 | 一次性数据导出 |
5.3 大表全量导出的最佳实践
【大表全量初始化三步骤】 Step 1: 分批快照 (Snapshot) ┌─────────────────────────────────────────┐ │ snapshot.fetch.size = 20000 │ │ snapshot.locking.mode = "minimal" │ │ │ │ 5000万行 / 20000 = 2500批 │ │ 每批约2秒 → 总耗时约1.5小时 │ │ │ │ 关键技术: minimal锁只在获取全局读锁的 │ │ 一瞬间锁表,拿到binlog位点后立即释放 │ └─────────────────────────────────────────┘ Step 2: 增量追赶 (Catch-up) ┌─────────────────────────────────────────┐ │ 快照期间发生的变更都存在Binlog里了 │ │ 快照完成后自动从记录的位点开始回放 │ │ │ │ 最终实现: 快照数据 + 增量变更 = 完整数据 │ └─────────────────────────────────────────┘ Step 3: 并行消费加速 ┌─────────────────────────────────────────┐ │ Topic: prod-mysql.ecommerce.orders │ │ 分区数: 12 │ │ │ │ Consumer Group: es-sync-service │ │ 消费者数: 12 (每个消费一个分区) │ │ │ │ 注意: 快照阶段消息没有Key, 会轮询到 │ │ 所有分区; 增量阶段按表主键Hash分区 │ └─────────────────────────────────────────┘5.4 快照性能调优
{// 每批拉取的行数(增大可以减少查询次数)"snapshot.fetch.size":"20000",// 锁策略(minimal最轻量,不阻塞业务写入)"snapshot.locking.mode":"minimal",// 使用SELECT ... WHERE 分批查询(避免大结果集内存溢出)"snapshot.select.statement.overrides":"ecommerce.orders","snapshot.select.statement.overrides.ecommerce.orders":"SELECT * FROM ecommerce.orders WHERE id > ? ORDER BY id ASC",// 限制快照的并发查询线程"snapshot.max.threads":"4"}六、DDL变更的优雅处理
6.1 DDL变更的问题
表结构变了(比如加了个字段),下游消费者怎么办?
【DDL变更场景】 ALTER TABLE orders ADD COLUMN coupon_id BIGINT; 问题: 1. 旧消息没有coupon_id字段,解析会出错 2. 新消息有coupon_id字段,旧消费者schema不兼容 3. 消费者代码需要更新,但更新需要时间6.2 Debezium的Schema Evolution
Debezium通过Schema History Topic自动管理表结构的变更历史:
【Schema变更的处理流程】 ┌──────────────┐ 执行DDL: │ MySQL │ ALTER TABLE │ 执行DDL │ └──────┬───────┘ │ DDL写入Binlog ▼ ┌──────────────┐ │ Debezium │ │ 读取DDL事件 │ └──────┬───────┘ │ ┌─────────────┼─────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌──────────┐ ┌───────────┐ │ Schema │ │ 新Schema │ │ 数据Topic │ │ History │ │ 嵌入消息 │ │ 新消息含 │ │ Topic │ │ (可选) │ │ coupin_id │ └─────────────┘ └──────────┘ └───────────┘6.3 消费者的Schema兼容处理
@KafkaListener(topics="prod-mysql.ecommerce.orders")publicvoidhandleOrderChange(StringjsonMessage){JsonNodepayload=objectMapper.readTree(jsonMessage).get("payload");if(payload==null)return;// 获取after节点(可能为null,DELETE的情况)JsonNodeafter=payload.get("after");if(after!=null){Map<String,Object>orderMap=newHashMap<>();// 用迭代器遍历所有字段(自动适配DDL变更)Iterator<String>fieldNames=after.fieldNames();while(fieldNames.hasNext()){Stringfield=fieldNames.next();orderMap.put(field,after.get(field).asText());}// 安全获取新旧字段(兼容DDL变更)StringorderId=orderMap.containsKey("id")?orderMap.get("id").toString():null;StringcouponId=orderMap.getOrDefault("coupon_id",null);// 写入ES(动态字段映射,自动适配)IndexRequestrequest=newIndexRequest("orders").id("order_"+orderId).source(orderMap);esClient.index(request,RequestOptions.DEFAULT);}}6.4 DDL变更最佳实践
| 场景 | 处理方式 |
|---|---|
| 加字段 | 消费者用Map/Object接收消息,不过度依赖固定Pojo |
| 改字段类型 | 先在消费者端兼容新旧类型,再执行DDL |
| 删字段 | 消费者代码先去掉对该字段的引用,再执行DROP |
| 改表名 | 在Debezium中配table.rename.format重映射到新Topic |
| 加表 | 更新Debezium的table.include.list,重启Connector |
七、常见坑与解决方案
| 问题 | 症状 | 原因 | 解决方案 |
|---|---|---|---|
| 快照锁表 | 业务写入超时 | snapshot.locking.mode错误 | 使用minimal模式 |
| Binlog丢失 | Debezium报错找不到binlog | binlog过期被清理 | 增加expire_logs_days或增大Kafka保留时间 |
| 大事务卡住 | 消费Lag突然暴涨 | 一个大事务产生大量binlog | 拆分大事务,设置max.queue.size |
| GTID不一致 | 从库切换后数据重复 | GTID没有启用 | 启用GTID并使用gtid.new.channel.position |
| OOM | Debezium内存溢出 | 快照fetch.size太大 | 减小fetch.size,调大堆内存 |
本篇小结
MySQL CDC + Kafka + Debezium 是实时数据同步的王炸方案:
- CDC核心原理:把数据库Binlog(ROW格式)变成Kafka消息流,变被动轮询为主动推送
- Debezium配置关键:
binlog_format=ROW是前提,snapshot.mode=initial首次全量,snapshot.locking.mode=minimal减少锁表时间 - 消息映射:INSERT→Create(after有值)、UPDATE→Update(before+after)、DELETE→Delete(before有值)
- 幂等处理:ES用upsert、Redis用SET/DEL天然幂等,敏感场景用binlog位置做精确去重
- 大表初始化:分批快照 + 增量追赶 + 并行消费加速
- DDL变更:消费者用动态字段遍历而非固定Pojo,Schema History Topic自动管理结构变更
一句话总结:让数据库的每一次呼吸都变成一条Kafka消息。
上一篇【第88篇】日志收集平台的Kafka实战——百亿日志的接入、传输与清洗
下一篇【第90篇】Kafka在微服务中的最佳实践——事件驱动架构设计全攻略