news 2026/5/31 9:47:07

【架构实战】Canal数据同步:MySQL数据变更实时捕获

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【架构实战】Canal数据同步:MySQL数据变更实时捕获

一、一次数据不一致让我通宵排查

2019年,我们的订单系统和财务系统数据不一致,差了2000多条记录。

财务那边说订单金额和财务对账对不上,让我们查。我花了整整一个通宵,逐条对比两个系统的数据库,发现是同步脚本漏跑了一批数据。

当时的同步方案是每小时跑一个定时任务,从订单数据库导出数据,然后导入到财务数据库。但是凌晨3点的时候,定时任务因为数据库连接超时失败了,而这批数据再也没有被补上。

从那以后,我们引入了Canal做实时数据同步,再也没有出现过数据不一致的问题。


二、Canal原理

2.1 MySQL主从复制原理

MySQL主从复制: 1. Master将变更写入Binlog 2. Slave的IO线程连接Master,请求Binlog 3. Master的Dump线程发送Binlog给Slave 4. Slave的IO线程将Binlog写入Relay Log 5. Slave的SQL线程读取Relay Log,执行SQL Canal伪装成Slave: - Canal连接MySQL,假装自己是Slave - 接收Binlog事件 - 解析Binlog,提取数据变更 - 将变更推送给下游消费者

2.2 Canal架构

┌─────────────────────────────────────────────────────────────────┐ │ Canal架构 │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ │ │ MySQL │───▶│ Canal │───▶│ Canal Client │ │ │ │ (Master) │ │ Server │ │ (应用层) │ │ │ └──────────┘ └──────────┘ └──────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────┐ │ │ │ MQ │ │ │ │(Kafka/ │ │ │ │ RocketMQ)│ │ │ └──────────┘ │ │ │ │ │ ▼ │ │ ┌──────────┐ │ │ │ 下游系统 │ │ │ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────────┘

三、Canal部署与配置

3.1 MySQL配置

# my.cnf - 开启Binlog [mysqld] # Binlog格式:ROW(推荐,数据最完整) binlog-format=ROW # Server ID(主从环境中必须唯一) server-id=1 # Binlog文件名 log-bin=mysql-bin # 需要同步的数据库(可选) binlog-do-db=order_db,product_db # 不需要同步的数据库 binlog-ignore-db=mysql,information_schema # Binlog保留天数 expire-logs-days=7 # 每次事务提交都刷盘 sync-binlog=1

3.2 Canal Server配置

# canal.propertiescanal:server:mode:tcp# tcp/kafka/rocketMQport:11111# ZooKeeper配置(HA模式)zk:servers:127.0.0.1:2181# 实例配置instances:-name:orderdestination:order_sync
# instance.properties - 订单库同步canal:instance:master:address:127.0.0.1:3306# 用户名密码(需要REPLICATION权限)dbUsername:canaldbPassword:canal123# 过滤规则filter:order_db\\..*# Binlog位置(首次启动时)journalName:mysql-bin.000001position:0

3.3 Canal Client开发

/** * Canal客户端 */@Component@Slf4jpublicclassCanalClientimplementsInitializingBean,DisposableBean{privateCanalConnectorconnector;@Value("${canal.server.host:127.0.0.1}")privateStringcanalHost;@Value("${canal.server.port:11111}")privateintcanalPort;@Value("${canal.destination:order_sync}")privateStringdestination;privatevolatilebooleanrunning=false;@OverridepublicvoidafterPropertiesSet(){// 创建连接connector=CanalConnectors.newSingleConnector(newInetSocketAddress(canalHost,canalPort),destination,"","");running=true;// 启动消费线程newThread(this::consume,"canal-client").start();}/** * 消费Binlog */privatevoidconsume(){while(running){try{connector.connect();connector.subscribe("order_db\\..*");connector.rollback();while(running){// 获取一批数据Messagemessage=connector.getWithoutAck(1000);longbatchId=message.getId();intsize=message.getEntries().size();if(batchId==-1||size==0){Thread.sleep(1000);continue;}// 处理数据变更processEntries(message.getEntries());// 确认connector.ack(batchId);}}catch(Exceptione){log.error("Canal消费异常,5秒后重连",e);try{Thread.sleep(5000);}catch(InterruptedExceptionie){Thread.currentThread().interrupt();}}finally{connector.disconnect();}}}/** * 处理数据变更 */privatevoidprocessEntries(List<CanalEntry.Entry>entries){for(CanalEntry.Entryentry:entries){if(entry.getEntryType()!=CanalEntry.EntryType.ROWDATA){continue;}try{CanalEntry.RowChangerowChange=CanalEntry.RowChange.parseFrom(entry.getStoreValue());StringtableName=entry.getHeader().getTableName();CanalEntry.EventTypeeventType=rowChange.getEventType();for(CanalEntry.RowDatarowData:rowChange.getRowDatasList()){DataChangeEventevent=DataChangeEvent.builder().table(tableName).eventType(eventType).before(rowData.getBeforeColumnsList()).after(rowData.getAfterColumnsList()).timestamp(entry.getHeader().getTimestamp()).build();// 发布事件handleEvent(event);}}catch(Exceptione){log.error("处理Entry异常: {}",entry,e);}}}/** * 处理事件 */privatevoidhandleEvent(DataChangeEventevent){switch(event.getEventType()){caseINSERT:log.info("INSERT: table={}, data={}",event.getTable(),event.getAfter());handleInsert(event);break;caseUPDATE:log.info("UPDATE: table={}, before={}, after={}",event.getTable(),event.getBefore(),event.getAfter());handleUpdate(event);break;caseDELETE:log.info("DELETE: table={}, data={}",event.getTable(),event.getBefore());handleDelete(event);break;}}@Overridepublicvoiddestroy(){running=false;if(connector!=null){connector.disconnect();}}}

四、Canal+MQ方案

4.1 Canal投递到Kafka

# canal.properties - Kafka模式canal:server:mode:kafkakafka:bootstrap:servers:127.0.0.1:9092topic:canal-data-syncpartition:1replication:1acks:allretries:3batch:size:16384linger:ms:1buffer:memory:33554432

4.2 消费Kafka消息

/** * Kafka消费者 - 处理Canal消息 */@Component@KafkaListener(topics="canal-data-sync",groupId="data-sync-group")@Slf4jpublicclassCanalKafkaConsumer{@AutowiredprivateDataSyncHandlerFactoryhandlerFactory;/** * 消费Canal消息 */@KafkaListener(topics="canal-data-sync")publicvoidconsume(ConsumerRecord<String,String>record){Stringvalue=record.value();try{// 解析Canal消息CanalMessagemessage=JSON.parseObject(value,CanalMessage.class);// 获取处理器DataSyncHandlerhandler=handlerFactory.getHandler(message.getTable());if(handler!=null){// 处理数据同步handler.handle(message);}else{log.warn("没有找到处理器: table={}",message.getTable());}}catch(Exceptione){log.error("消费Canal消息失败: offset={}",record.offset(),e);}}}/** * 数据同步处理器工厂 */@ComponentpublicclassDataSyncHandlerFactory{privateMap<String,DataSyncHandler>handlers=newHashMap<>();@AutowiredpublicvoidsetHandlers(List<DataSyncHandler>handlerList){for(DataSyncHandlerhandler:handlerList){handlers.put(handler.getTable(),handler);}}publicDataSyncHandlergetHandler(Stringtable){returnhandlers.get(table);}}/** * 订单同步处理器 */@Component@Slf4jpublicclassOrderSyncHandlerimplementsDataSyncHandler{@AutowiredprivateFinanceServiceClientfinanceClient;@OverridepublicStringgetTable(){return"t_order";}@Overridepublicvoidhandle(CanalMessagemessage){switch(message.getEventType()){caseINSERT:caseUPDATE:// 同步到财务系统syncToFinance(message.getAfter());break;caseDELETE:// 通知财务系统financeClient.notifyDelete(message.getBefore().get("id").toString());break;}}privatevoidsyncToFinance(Map<String,String>data){FinanceOrderDTOdto=FinanceOrderDTO.builder().orderId(data.get("id")).amount(newBigDecimal(data.get("amount"))).status(data.get("status")).createTime(data.get("create_time")).build();financeClient.syncOrder(dto);log.info("同步订单到财务系统: orderId={}",dto.getOrderId());}}

五、踩坑实录

坑1:Binlog格式不对

用了STATEMENT格式的Binlog,导致Canal解析出来的SQL和实际数据不一致。

解决:必须使用ROW格式,数据最完整。

坑2:Canal位点丢失

Canal Server宕机后,消费位点丢失,重复消费或漏消费。

解决:定期保存位点,使用ZooKeeper做HA。

坑3:大事务导致延迟

一次性删除100万条记录,Binlog太大,Canal消费延迟严重。

解决:分批操作,每批1000条,减少单次Binlog大小。

坑4:字段类型变更

ALTER TABLE修改了字段类型,Canal解析失败。

解决:Canal配置支持DDL变更,或事先通知Canal刷新元数据。

坑5:循环同步

A系统同步到B系统,B系统又同步回A系统,形成循环。

解决:标记数据来源,避免处理自己发出的变更。


六、总结

Canal是MySQL数据实时同步的最佳方案:

  • 原理:伪装MySQL Slave,解析Binlog
  • 架构:Canal Server + Client/MQ
  • 应用:数据同步、缓存更新、搜索引擎索引

最佳实践:

  1. 使用ROW格式的Binlog
  2. 做好位点管理和HA
  3. 避免大事务
  4. 防止循环同步
  5. 监控消费延迟

血的教训:

数据同步看似简单,但一旦出问题就是数据不一致,修复成本极高。Canal是好的工具,但用好它需要对MySQL和业务有深刻理解。

思考题:你的系统用了什么数据同步方案?有没有遇到过数据不一致的问题?


个人观点,仅供参考

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/31 9:46:54

如何快速备份QQ空间:GetQzonehistory终极数据保护指南

如何快速备份QQ空间&#xff1a;GetQzonehistory终极数据保护指南 【免费下载链接】GetQzonehistory 获取QQ空间发布的历史说说 项目地址: https://gitcode.com/GitHub_Trending/ge/GetQzonehistory 还在担心那些承载着青春回忆的QQ空间说说不小心丢失吗&#xff1f;Get…

作者头像 李华
网站建设 2026/5/31 9:31:18

实战指南:用阿里云ECS+Nginx+TailScale搭建安全的个人开发隧道

基于云服务器与Nginx构建安全开发隧道的全流程指南对于开发者而言&#xff0c;能够随时随地安全访问内网开发环境是提升工作效率的关键。本文将详细介绍如何利用云服务器ECS、Nginx反向代理与TailScale组网技术&#xff0c;构建一个既安全又便捷的远程开发通道。1. 技术方案概述…

作者头像 李华
网站建设 2026/5/31 9:30:24

从编辑器到游戏:揭秘Godot拖放API的“潜规则”与实战避坑指南

从编辑器到游戏&#xff1a;揭秘Godot拖放API的“潜规则”与实战避坑指南在Godot引擎的UI交互设计中&#xff0c;拖放功能是实现复杂用户界面的关键组件。许多开发者在初次接触Godot拖放系统时&#xff0c;往往会被其表面上的简单性所迷惑——直到他们在实际项目中遇到那些难以…

作者头像 李华