news 2026/5/1 1:58:46

MQ生产者确认机制捕获到消息投递失败后如何重试?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
MQ生产者确认机制捕获到消息投递失败后如何重试?

要实现生产者确认机制失败后自动重试重新投递,核心思路是:将发送失败的消息暂存→按策略重试→跟踪重试状态→失败兜底。以下是具体实现思路和关键步骤,结合代码示例说明。

一、核心思路框架

当生产者通过 ConfirmCallback 收到 ack=false(Broker 未确认接收)或超时未收到确认时,说明消息发送失败。此时需将消息暂存到可靠存储(避免内存丢失),并按重试策略(次数、间隔)重新投递,直至成功或超过阈值后转入死信队列。

二、关键实现步骤

1. 定义“失败消息”存储结构

需持久化存储失败消息的核心信息,确保重启后不丢失。推荐用 数据库(MySQL/PostgreSQL)或 Redis(缓存+持久化),字段包括:

字段名 说明 示例值

msg_id 消息唯一ID(CorrelationData的ID) MSG-1690000000000-0.123456

exchange 目标交换机名称 order.exchange

routing_key 目标路由键 order.routingKey

message_body 消息体(JSON序列化) {"id":1001,"amount":99.9}

retry_count 当前重试次数 0(初始值)

max_retry 最大重试次数(如3次) 3

next_retry_time 下次重试时间(时间戳) 1690000060000(当前时间+10秒)

status 状态(待重试/重试中/失败) PENDING

2. 实现“失败消息”存储层

以 MySQL 为例,创建表存储失败消息:

CREATE TABLE mq_failed_message (

id BIGINT PRIMARY KEY AUTO_INCREMENT,

msg_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一ID',

exchange VARCHAR(128) NOT NULL COMMENT '交换机',

routing_key VARCHAR(128) NOT NULL COMMENT '路由键',

message_body TEXT NOT NULL COMMENT '消息体(JSON)',

retry_count INT DEFAULT 0 COMMENT '当前重试次数',

max_retry INT DEFAULT 3 COMMENT '最大重试次数',

next_retry_time BIGINT NOT NULL COMMENT '下次重试时间戳(ms)',

status VARCHAR(20) DEFAULT 'PENDING' COMMENT '状态:PENDING/RETRYING/FAILED',

created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

INDEX idx_next_retry_time (next_retry_time) COMMENT '按下次重试时间索引'

);

3. 发送消息时关联“失败存储”

生产者发送消息时,生成唯一 msg_id(如 UUID 或雪花算法),并在 ConfirmCallback 中处理失败逻辑:

@Service

public class RetryProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private FailedMessageStorage failedMessageStorage; // 失败消息存储接口

public void sendWithRetry(Object message, String exchange, String routingKey) {

// 1. 生成唯一消息ID

String msgId = "MSG-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);

CorrelationData correlationData = new CorrelationData(msgId);

// 2. 发送消息(携带CorrelationData)

rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

// 3. 注册确认回调(处理成功/失败)

rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {

String id = correlationData1 != null ? correlationData1.getId() : null;

if (id == null) return;

if (ack) {

// 确认成功:删除存储中的失败记录(若有)

failedMessageStorage.deleteById(id);

System.out.println("消息确认成功,ID: " + id);

} else {

// 确认失败:存入失败消息表,等待重试

FailedMessage failedMsg = new FailedMessage();

failedMsg.setMsgId(id);

failedMsg.setExchange(exchange);

failedMsg.setRoutingKey(routingKey);

failedMsg.setMessageBody(JSON.toJSONString(message)); // 序列化为JSON

failedMsg.setRetryCount(0);

failedMsg.setMaxRetry(3);

failedMsg.setNextRetryTime(System.currentTimeMillis() + 10 * 1000); // 10秒后重试

failedMsg.setStatus("PENDING");

failedMessageStorage.save(failedMsg);

System.err.println("消息确认失败,存入重试队列,ID: " + id + ",原因: " + cause);

}

});

}

}

4. 实现重试调度器(核心)

通过 定时任务(如 Spring Scheduler)或 线程池 定期检查失败消息表,对 next_retry_time ≤ 当前时间 的消息执行重试:

@Component

@EnableScheduling

public class RetryScheduler {

@Autowired

private FailedMessageStorage failedMessageStorage;

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private DeadLetterQueueHandler deadLetterQueueHandler; // 死信队列处理器

// 每5秒扫描一次待重试消息

@Scheduled(fixedRate = 5000)

public void retryFailedMessages() {

List<FailedMessage> pendingMsgs = failedMessageStorage.queryPendingMessages(System.currentTimeMillis());

for (FailedMessage msg : pendingMsgs) {

try {

// 1. 标记为“重试中”(避免并发重复处理)

failedMessageStorage.updateStatus(msg.getMsgId(), "RETRYING");

// 2. 反序列化消息体

Object message = JSON.parseObject(msg.getMessageBody(), Object.class); // 根据实际类型强转

// 3. 重新发送消息(携带原msgId)

CorrelationData correlationData = new CorrelationData(msg.getMsgId());

rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRoutingKey(), message, correlationData);

// 4. 更新重试次数和下次重试时间(指数退避:10s→20s→40s)

int newRetryCount = msg.getRetryCount() + 1;

long nextRetryInterval = (long) (10 * Math.pow(2, newRetryCount - 1)) * 1000; // 指数退避

long nextRetryTime = System.currentTimeMillis() + nextRetryInterval;

if (newRetryCount >= msg.getMaxRetry()) {

// 超过最大重试次数:转入死信队列

deadLetterQueueHandler.sendToDlx(msg);

failedMessageStorage.updateStatus(msg.getMsgId(), "FAILED");

System.err.println("消息重试次数耗尽,转入死信队列,ID: " + msg.getMsgId());

} else {

// 更新重试状态(次数+1,下次重试时间)

failedMessageStorage.updateRetryInfo(msg.getMsgId(), newRetryCount, nextRetryTime, "PENDING");

System.out.println("消息重试中,ID: " + msg.getMsgId() + ",第" + newRetryCount + "次");

}

} catch (Exception e) {

// 重试过程中异常:恢复状态为PENDING,等待下次扫描

failedMessageStorage.updateStatus(msg.getMsgId(), "PENDING");

System.err.println("重试发送失败,ID: " + msg.getMsgId() + ",错误: " + e.getMessage());

}

}

}

}

5. 死信队列兜底(重试失败的最终处理)

当消息超过最大重试次数仍未成功,将其转入死信队列(DLX),人工介入处理:

@Component

public class DeadLetterQueueHandler {

@Autowired

private RabbitTemplate rabbitTemplate;

public void sendToDlx(FailedMessage msg) {

// 发送到死信交换机(需提前配置死信队列和交换机)

rabbitTemplate.convertAndSend(

"dlx.exchange",

"dlx.routingKey",

msg.getMessageBody(),

message -> {

MessageProperties props = message.getMessageProperties();

props.setHeader("failed_reason", "max_retry_exceeded");

props.setHeader("original_msg_id", msg.getMsgId());

return message;

}

);

}

}

三、关键技术点说明

1. 重试策略

指数退避:重试间隔随次数递增(如 10s→20s→40s),避免集中重试压垮 Broker;

固定间隔:简单场景用固定间隔(如每 30 秒重试一次);

随机间隔:避免多个生产者同时重试导致“惊群效应”。

2. 幂等性保障

重试可能导致消息重复投递,消费者需通过 唯一ID去重(如 Redis 记录已处理 msg_id,有效期 24 小时):

@Component

public class IdempotentConsumer {

@Autowired

private RedisTemplate<String, Object> redisTemplate;

@RabbitListener(queues = "order.queue")

public void consume(String message, @Header("msg_id") String msgId) {

String key = "processed_msg:" + msgId;

if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {

// 已处理,直接确认

channel.basicAck(deliveryTag, false);

return;

}

// 业务逻辑处理...

redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS); // 记录已处理

channel.basicAck(deliveryTag, false);

}

}

3. 存储选型对比

存储方式 优点 缺点 适用场景

MySQL 持久化可靠,支持复杂查询 性能较低,需维护数据库连接 生产环境(消息重要性高)

Redis 高性能,支持过期时间 数据易失(需开启RDB/AOF持久化) 中小规模、对性能要求高的场景

内存队列 速度快 重启丢失消息 测试环境或临时重试

四、总结

生产者确认失败后的重试重新投递,本质是“存储-调度-重试-兜底”的闭环:

存储:用数据库/Redis 持久化失败消息;

调度:定时任务扫描待重试消息;

重试:按指数退避策略重新发送,更新重试状态;

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 5:09:47

5分钟用AI创建一个RGBA调色板应用

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 快速生成一个RGBA调色板应用&#xff0c;功能包括&#xff1a;1) 颜色选择器 2) 调色板保存 3) 颜色代码复制 4) 分享功能 5) 历史记录。要求响应式设计&#xff0c;支持PWA安装。使…

作者头像 李华
网站建设 2026/5/1 5:10:04

Visual Studio 十月更新 —— 新模型、记忆功能、计划功能及更多内容

2025年10月的 Visual Studio 2022&#xff08;v17.14&#xff09;更新现已发布。本月&#xff0c;我们为您带来了模型选择和智能体流程方面的改进。1新模型我们的聊天窗口中现已提供 Claude Sonnet 4.5 和 Claude Haiku 4.5。这意味着&#xff0c;推动您的智能体工作流的最新创…

作者头像 李华
网站建设 2026/4/29 20:58:31

怎么给图纸文件加密?2025 年 5 款轻量图纸加密软件分享

图纸文件承载核心设计成果&#xff0c;泄露或篡改可能造成重大损失。2025 年&#xff0c;轻量型加密工具成为技术从业者首选 —— 无需复杂部署&#xff0c;就能实现精准防护。本文精选 5 款实用软件&#xff0c;兼顾安全性与易用性&#xff0c;帮你快速找到适配的图纸加密方案…

作者头像 李华
网站建设 2026/4/28 5:22:51

快速搭建智能体----agno

在最近的工作学习中接触到智能体的搭建&#xff0c;基于当下ai的快速发展&#xff0c;像豆包、gpt等一些智能问答工具每天都有大量的免费token可以使用&#xff0c;我们个人如何利用这些模型创建属于自己个人的智能系统&#xff1f; agno--就是一旦开放的可以快速搭建属于自己…

作者头像 李华
网站建设 2026/4/17 22:48:58

电商网站中no-referrer-when-downgrade的实际应用案例

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个电商网站demo&#xff0c;展示no-referrer-when-downgrade策略在支付页面跳转、第三方服务集成等场景的应用。要求包含从HTTPS到HTTP支付网关的跳转示例&#xff0c;以及相…

作者头像 李华
网站建设 2026/4/23 17:30:05

ABAP 三种类型的内表读取性能测试

在做 SAP 项目时,性能问题往往不是出在数据库,也不是出在 CDS View 或者 OData 协议本身,而是出在最不起眼的一行代码:你选了哪一种 ABAP 内表。 很多人习惯性把结果集塞进一个 STANDARD TABLE,随后在循环里 READ TABLE ... WITH KEY 做查找。开发机上几千条数据跑得飞起…

作者头像 李华