news 2026/6/15 13:27:11

Pulsar 消息重试与死信机制

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Pulsar 消息重试与死信机制

消息队列消费场景下,经常会发生预期外的异常,比如:消息处理时业务报错(比如数据格式异常或下游服务短暂不可用)、业务处理消息耗时超过ack最大等待时间等。为应对这些场景,Pulsar 提供了消息重试和死信机制,通过消费者客户端不同的配置,在处理消息出现异常时,可以实现有限重试和无限重试两种效果。

有限重试

有限重试是利用 Pulsar 的重试队列和死信队列机制,保证业务的最终一致性。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。当消息进入到死信队列中,一般这时就需要人为介入来处理这批消息。可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。

实现原理

客户端处理消息失败后,调用consumer.reconsumeLater接口开始走重试策略。首先,客户端检查消息对应的重试次数,如果达到指定的最大重试次数,消息被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。

使用重试队列实现自动重试的关键点总结

  1. 发送到 Retry Topic:消息被发送到 Retry Topic,并设置 deliverAfter(delayTime, unit) 延迟投递

  2. 自动 ACK:发送成功后,通过 doAcknowledge() 自动确认原始消息

  3. 原消息状态:原始 Topic 中的消息变为已确认(Acknowledged)状态

  4. 延迟重试:延迟时间到达后,消费者会从 Retry Topic 收到该消息

注意事项:当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。

代码示例

自动重试的代码示例,消费过程中出现某些异常,进入重试 Topic 重试,最后进入死信Topic中。

注:如果消费的消息 ack 超时,会触发重新投递(Redelivery),消息会从原 Topic 重新发送给消费者

消费者参数设置

注意要重试队列和死信队列的topic需要在hulk云平台创建好,并给token配置读写权限。

PulsarClient pulsarClient = Constant.getPulsarClient(); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic("persistent://my-property/my-ns/test_retry_p2") .subscriptionName("sub1") .subscriptionType(SubscriptionType.Key_Shared) .enableRetry(true)//开启重试消费 .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(3)//可以指定最大重试次数 .retryLetterTopic("persistent://my-property/my-ns/sub1-retry") //指定重试队列 .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq") //指定死信队列 .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Properties:{}", message.getValue(), message.getProperties()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("Ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("Consumer exception:{}", e.getMessage(), e); consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);//延迟重试 } }

Message对象中有个字段 property,包含了重试相关的属性

{ REAL_TOPIC=persistent://my-property/my-ns/real-topic, #原 Topic REAL_SUBSCRIPTION=sub_topic1, ORIGIN_MESSAGE_ID=8143097:5:0, #最初生产的消息 ID ORIGIN_MESSAGE_IDY_TIME=8143097:5:0, DELAY_TIME=1000, RECONSUMETIMES=2 #消息重试的次数 }

可以使用属性中的重试的次数,实现指数级回退重试。

无限重试

无限重试是指客户端在处理消息失败后,主动发一条否定应答,让服务端重新推送。如果一直发送否定应答,服务端会一直重推,因此实现无限重试的效果。仅需2步即可开启无限重试

1、初始化consumer时,指定重试间隔(negativeAckRedeliveryDelay,默认1min)

2、捕获业务异常,对处理失败的消息,发送否定应答(consumer.negativeAcknowledge)

以下为主动重试的代码示例:

Consumer<byte[]> consumer = client.newConsumer() .topic("persistent://my-property/my-ns/real-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared) .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度,默认 1 秒 .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 默认1min .subscribe(); ​while (true) { Message<String> message = null; try { message = consumer.receive(); log.info("Received message: {}, Redelivery count:{}", message.getValue(), message.getRedeliveryCount()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info("ack: {},{}", message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error("consumer exception", e); //否定应答 consumer.negativeAcknowledge(message); } }

值得注意的是,当消费者 unack 的消息过多时,Broker 会停止向消费者发送增量消息,而是一直推送 unack 的消息。直到 unack 的消息消息数量低于阈值,才会继续推送新消息。默认单个消费者unack消息数量上限是5万,订阅的 unack 消息数量上限是 20万

参考文档

[1]. Pulsar 消费者客户端官方文档 https://pulsar.apache.org/docs/4.0.x/client-libraries-consumers/

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/21 1:02:06

JMeter 接口自动化测试的最佳实践

JMeter 是一个开源的负载测试工具&#xff0c;它可以模拟多种协议和应用程序的负载&#xff0c;包括 HTTP、FTP、SMTP、JMS、SOAP 和 JDBC 等。在进行接口自动化测试时&#xff0c;使用 JMeter 可以帮助我们快速地构建测试用例&#xff0c;模拟多种场景&#xff0c;发现接口的性…

作者头像 李华
网站建设 2026/6/10 15:56:03

吐血推荐9个AI论文网站,MBA轻松搞定毕业论文!

吐血推荐9个AI论文网站&#xff0c;MBA轻松搞定毕业论文&#xff01; AI 工具助力论文写作&#xff0c;轻松应对学术挑战 在当前的学术环境中&#xff0c;MBA 学生面临着越来越高的论文要求&#xff0c;从选题到撰写、修改&#xff0c;每一个环节都充满挑战。而 AI 技术的快速发…

作者头像 李华
网站建设 2026/6/10 15:15:24

深度解析:智能体记忆模式全景分类,收藏这份完整技术指南

本文系统梳理了AI智能体记忆的结构与逻辑&#xff0c;从技术实现和拓扑结构两个核心维度进行分类&#xff0c;并构建了"形式类型操作"三维框架。详细分析了词元级、参数化和潜在三种记忆形式&#xff0c;区分了短期记忆与长期记忆类型&#xff0c;阐述了记忆内化、提…

作者头像 李华
网站建设 2026/6/15 11:22:57

基于主从博弈的共享储能与综合能源微网优化运行探秘

基于主从博弈的共享储能与综合能源微网优化运行研究 综合能源微网与共享储能的结合具有一定的创新性&#xff0c;在共享储能的背景下考虑微网运营商与用户聚合商之间的博弈关系&#xff0c;微网的收益和用户的收益之间达到均衡。 采用主从博弈的方法&#xff0c;微网运营商作为…

作者头像 李华
网站建设 2026/6/15 11:23:29

网络进阶教程:节点小宝中心节点策略的反向使用方法!

最近有小伙伴咨询到小白&#xff1a;如果家里局域网里有一台24h开机状态的NAS&#xff0c;且在这台NAS上已经部署了节点小宝&#xff0c;现在有一台机器是在异地状态下也部署了节点小宝。问&#xff1a;家里局域网下的其他设备不安装节点小宝能否通过中心节点访问到异地的那台设…

作者头像 李华