一、真实业务场景 & 背景
- 项目背景
基于社区电商/校园二手交易平台,后端架构采用:SpringBoot + MySQL + Redis + RabbitMQ
核心业务:商品信息查询、用户信息查询、订单状态查询,核心特征如下:
- 90% 请求为读请求(查询商品、查询用户),读多写少特征明显;
- Redis 用于缓存高频读数据,降低MySQL查询压力,要求读请求响应时间 ≤ 100ms;
- 写请求(新增/修改/删除商品)频率低,但要求数据绝对准确,不允许长期脏数据。
- 核心业务痛点(企业开发常见坑)
- 同步双写风险高:初期采用“MySQL更新后同步更新Redis”,网络抖动、Redis超时、服务重启时,会出现“MySQL更新成功、Redis未更新”(旧数据残留)或“Redis更新成功、MySQL失败”(脏数据);
- 强一致性成本高:分布式事务(2PC/3PC/TCC)实现复杂、性能损耗大,不适合普通业务,性价比极低;
- 需保证最终一致性:业务允许短时间(1秒内)数据不一致,但必须确保经过一段时间后,MySQL与Redis数据完全同步。
解决方案
采用 RabbitMQ 异步补偿 + 最终一致性 架构,核心优先级:先保证MySQL数据正确,再异步同步Redis,具体流程:
写请求优先更新MySQL,用事务保证更新成功;
MySQL事务提交后,发送消息到RabbitMQ;
RabbitMQ消费者监听消息,异步更新/删除Redis;
添加消息可靠投递、重试机制,避免同步失败;
定时任务兜底校对,确保数据最终一致。
✅ 优势:性能高、不阻塞主流程、架构轻量、易落地,适配99%互联网读多写少业务。
二、整体架构图
- 流程说明(清晰易懂)
- 前端发起写请求(新增/修改/删除商品);
- 服务端开启事务,更新MySQL(事务回滚机制保证更新成功);
- MySQL事务提交后,发送消息到RabbitMQ(仅MySQL成功才发消息);
- MQ消费者监听消息,异步查询MySQL最新数据;
- 根据数据是否存在,更新/删除Redis缓存;
- 更新成功:手动ACK确认消息,MQ删除该消息;
- 更新失败:MQ自动重试(最多3次),重试失败进入死信队列;
- 定时任务兜底:每日校对MySQL与Redis数据,不一致则强制同步。
三、环境依赖
<!-- MySQL --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- MyBatis-Plus(简化CRUD,可选) --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version></dependency><!-- Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Lombok(简化实体类,可选) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- FastJSON(JSON序列化,必加) --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>四、RabbitMQ 核心配置
- application.yml 配置
spring:# MySQL 配置datasource:url:jdbc:mysql://localhost:3306/campus_store?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTCusername:rootpassword:rootdriver-class-name:com.mysql.cj.jdbc.Driver# Redis 配置redis:host:localhostport:6379database:0timeout:3000ms# RabbitMQ 配置rabbitmq:host:localhostport:5672username:guestpassword:guestvirtual-host:/publisher-confirm-type:correlated# 生产者确认publisher-returns:true# 消息返回listener:simple:acknowledge-mode:manual# 手动ACKretry:enabled:truemax-attempts:3# 最大重试3次initial-interval:1000# 重试间隔1秒MyBatis-Plus 配置(可选)
mybatis-plus:mapper-locations:classpath:mapper/*.xmltype-aliases-package:com.campus.store.entityconfiguration:map-underscore-to-camel-case:true- MQ 配置类(可直接复制运行)
importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * RabbitMQ 配置:MySQL与Redis数据同步 * 架构:正常交换机→正常队列→消费者更新Redis;失败→死信队列→人工排查 */@ConfigurationpublicclassRabbitMqConfig{// 正常交换机/队列/路由keypublicstaticfinalStringDATA_SYNC_EXCHANGE="data.sync.exchange";publicstaticfinalStringDATA_SYNC_QUEUE="data.sync.queue";publicstaticfinalStringDATA_SYNC_ROUTING_KEY="data.sync.routing.key";// 死信交换机/队列/路由keypublicstaticfinalStringDEAD_EXCHANGE="dead.exchange";publicstaticfinalStringDEAD_QUEUE="dead.queue";publicstaticfinalStringDEAD_ROUTING_KEY="dead.routing.key";// 正常队列(绑定死信)@BeanpublicQueuedataSyncQueue(){returnQueueBuilder.durable(DATA_SYNC_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();}@BeanpublicDirectExchangedataSyncExchange(){returnnewDirectExchange(DATA_SYNC_EXCHANGE,true,false);}@BeanpublicBindingdataSyncBinding(){returnBindingBuilder.bind(dataSyncQueue()).to(dataSyncExchange()).with(DATA_SYNC_ROUTING_KEY);}// 死信队列@BeanpublicQueuedeadQueue(){returnnewQueue(DEAD_QUEUE,true);}@BeanpublicDirectExchangedeadExchange(){returnnewDirectExchange(DEAD_EXCHANGE,true,false);}@BeanpublicBindingdeadBinding(){returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTING_KEY);}}五、核心业务代码
- 商品实体类
importcom.baomidou.mybatisplus.annotation.IdType;importcom.baomidou.mybatisplus.annotation.TableId;importcom.baomidou.mybatisplus.annotation.TableName;importlombok.Data;importjava.io.Serializable;importjava.math.BigDecimal;@Data@TableName("t_product")publicclassProductimplementsSerializable{privatestaticfinallongserialVersionUID=1L;@TableId(type=IdType.AUTO)privateLongid;// 商品IDprivateStringproductName;// 商品名称privateBigDecimalprice;// 商品价格privateIntegerstock;// 商品库存privateIntegerstatus;// 商品状态(0-下架,1-上架)}- Mapper 接口
importcom.baomidou.mybatisplus.core.mapper.BaseMapper;importcom.campus.store.entity.Product;importorg.apache.ibatis.annotations.Mapper;@MapperpublicinterfaceProductMapperextendsBaseMapper<Product>{}- Service 层(事务+发消息)
importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.campus.store.config.RabbitMqConfig;importcom.campus.store.entity.Product;importcom.campus.store.mapper.ProductMapper;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjavax.annotation.Resource;@ServicepublicclassProductServiceImplextendsServiceImpl<ProductMapper,Product>implementsProductService{@ResourceprivateRabbitTemplaterabbitTemplate;// 更新商品(核心方法)@Override@Transactional(rollbackFor=Exception.class)publicbooleanupdateProduct(Productproduct){// 1. 更新MySQL(事务保证)booleanupdateSuccess=this.updateById(product);if(!updateSuccess)returnfalse;// 2. 发送MQ消息,异步更新RedisrabbitTemplate.convertAndSend(RabbitMqConfig.DATA_SYNC_EXCHANGE,RabbitMqConfig.DATA_SYNC_ROUTING_KEY,product.getId());returntrue;}// 删除商品@Override@Transactional(rollbackFor=Exception.class)publicbooleandeleteProduct(LongproductId){booleandeleteSuccess=this.removeById(productId);if(!deleteSuccess)returnfalse;// 发送消息,删除Redis缓存rabbitTemplate.convertAndSend(RabbitMqConfig.DATA_SYNC_EXCHANGE,RabbitMqConfig.DATA_SYNC_ROUTING_KEY,productId);returntrue;}}六、消费者(异步更新Redis)
importcom.alibaba.fastjson.JSON;importcom.campus.store.config.RabbitMqConfig;importcom.campus.store.entity.Product;importcom.campus.store.service.ProductService;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.io.IOException;@ComponentpublicclassRedisSyncConsumer{@ResourceprivateProductServiceproductService;@ResourceprivateStringRedisTemplatestringRedisTemplate;privatestaticfinalStringREDIS_KEY_PREFIX="product:info:";// 监听数据同步队列@RabbitListener(queues=RabbitMqConfig.DATA_SYNC_QUEUE)publicvoidsyncDataToRedis(LongproductId,Messagemessage,Channelchannel)throwsIOException{try{// 1. 查询MySQL最新数据Productproduct=productService.getById(productId);// 2. 更新/删除Redisif(product==null){stringRedisTemplate.delete(REDIS_KEY_PREFIX+productId);}else{stringRedisTemplate.opsForValue().set(REDIS_KEY_PREFIX+productId,JSON.toJSONString(product));}// 3. 手动ACK确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){// 异常:拒绝消息,重回队列重试channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);e.printStackTrace();}}}七、方案稳定性保障
MySQL事务保证:仅MySQL更新成功才发消息,杜绝虚消息,从源头避免不一致;
MQ异步解耦:主流程不等待Redis更新,性能极高,不阻塞用户请求;
手动ACK+重试:临时故障自动重试,避免同步失败;
死信队列兜底:重试失败消息不丢失,可人工排查修复;
定时校对:每日对比数据,确保最终一致。
八、兜底定时任务(可选)
importcom.alibaba.fastjson.JSON;importcom.campus.store.entity.Product;importcom.campus.store.service.ProductService;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.scheduling.annotation.EnableScheduling;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.util.List;@Component@EnableSchedulingpublicclassDataCheckTask{@ResourceprivateProductServiceproductService;@ResourceprivateStringRedisTemplatestringRedisTemplate;privatestaticfinalStringREDIS_KEY_PREFIX="product:info:";// 每日凌晨2点执行,校对数据@Scheduled(cron="0 0 2 * * ?")publicvoidcheckAndSyncData(){List<Product>productList=productService.list();for(Productproduct:productList){StringredisKey=REDIS_KEY_PREFIX+product.getId();StringredisValue=stringRedisTemplate.opsForValue().get(redisKey);// 数据不一致,强制更新Redisif(redisValue==null||!redisValue.equals(JSON.toJSONString(product))){stringRedisTemplate.opsForValue().set(redisKey,JSON.toJSONString(product));}}}}十、总结
本方案是企业级最常用的MySQL与Redis双写一致性解决方案,基于RabbitMQ实现异步补偿,达到最终一致性,核心优势:
- 适配读多写少业务,性能高、不阻塞主流程;
- 架构简单、易落地,代码可直接复制运行;
- 三重兜底保障,可直接用于生产环境;
- 解耦MySQL与Redis更新逻辑,维护成本低。