news 2026/5/30 16:53:06

RabbitMQ

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ

RabbitMQ

  • 快速入门
  • Spring AMQP
    • 快速入门
    • 一个队列绑定多个消费者
  • 交换机
    • 作用
    • Direct 交换机
    • Topic 交换机
    • 代码声明队列和交换机
    • 消息转换器
  • 消息的可靠性
    • 生产者的可靠性
      • 生产者重连
      • 生产者确认(尽量不要用)
    • MQ 的可靠性
      • 数据持久化
      • Lazy Queue(惰性队列)
    • 消费者可靠性
      • 消费者确认

快速入门

创建用户(每个项目创建一个用户)

创建虚拟主机

创建队列

发送消息


Spring AMQP

快速入门

# 配置类spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwy
<!-- 导入依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
// 发送消息StringqueueName="mqtest1";// 队列名字Stringmessage="hello world";rabbitTemplate.convertAndSend(queueName,message);
// 接收消息并处理@RabbitListener(queues="mqtest1")// 可对应多个队列publicvoidlisten(Stringmessage){System.out.println("mqtest1 收到消息:"+message);}

一个队列绑定多个消费者

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积(当消费者1 一秒处理 50 条消息,消费者 2 一秒处理 5 条消息,有 50 条消息的时候 mq 仍然会让消费者 1 处理 25 条,消费者 2 处理 25 条,就会导致消费者 1 处理完了不工作了,消费者 2 还在工作)
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwylistener:simple:prefetch:1

交换机

作用

  1. 接收 publisher 发送的消息
  2. 将消息按照规则路由到与之绑定的队列
    Fanout 交换机
    FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
// 发送消息到交换机StringexchangeName="lwy.fanout";Stringmessage="hello world";rabbitTemplate.convertAndSend(exchangeName,"",message);

Direct 交换机

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的Routingkey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

StringexchangeName="lwy.direct";Stringmessage="blue";// 第二个参数是路由关键字rabbitTemplate.convertAndSend(exchangeName,"yellow",message);

Topic 交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以“ . ”分割
Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词。例:china.#可以匹配 china.news、china.weather
  • *:代指一个单词。china.*可以匹配 china.news 不可匹配 china.news.sports

代码声明队列和交换机

// 如果没有指定的队列和交换机会自动创建@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct1"),exchange=@Exchange(name="lwy.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}// 集合))publicvoiddirect1(Stringmessage)throwsInterruptedException{System.out.println("direct1 收到消息:"+message);}

消息转换器

rabbitMQ 本身只接受和传输字节流,传输对象的时候会乱码,通常需要消息转换器自动序列化和反序列化

<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}

消息的可靠性

生产者的可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout:1s# 连接超时时间template:retry:enabled:true# 开启超时重试initial-interval:1000ms# 初始重试间隔时间multiplier:1# 重试间隔时间倍数max-attempts:3# 最大重试次数

注意:

  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认(尽量不要用)

RabbitMQ 有 PublisherConfirm和PublisherReturn两种确认机制。开启确认机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功(路由失败一般是代码有问题)
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

MQ 的可靠性

数据持久化

非持久的消息会存储到内存中,如果 mq 重启消息就会消失,持久化的会存到磁盘中
注意:如果开启了持久化和生产者确认,只有在消息 持久化完成后才会给生产者返回 ACK 回执

Lazy Queue(惰性队列)

特性:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
@RabbitListener(queuesToDeclare=@Queue(name="lazy.queue",durable="true",// 持久化队列arguments=@Argument(name="x-queue-mode",value="lazy")// lazy队列))publicvoidlazy(Stringmessage){System.out.println("lazy.queue 收到消息:"+message);}

消费者可靠性

消费者确认

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

可通过配置文件选择 ACK 处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回 reject
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode:none
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/29 23:33:00

比手动调试快10倍:AI自动化解决NTP问题

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个NTP问题自动修复工具&#xff0c;功能包括&#xff1a;1. 一键诊断 2. 自动测试多个公共NTP服务器 3. 智能选择最佳服务器 4. 自动修改系统配置 5. 验证修复结果。要求提供…

作者头像 李华
网站建设 2026/5/29 14:55:56

5分钟用AI搭建时间戳API服务

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个轻量级时间戳API服务&#xff0c;要求&#xff1a;1. 提供时间戳生成和转换端点&#xff1b;2. 支持JWT鉴权&#xff1b;3. 包含Swagger文档&#xff1b;4. 基础速率限制&a…

作者头像 李华
网站建设 2026/5/12 4:21:28

回答准确率测试

day26 回答准确率测试 回答准确率测试 1️⃣ 定义&#xff08;通俗版&#xff09; 回答准确率测试&#xff0c;就是&#xff1a; 给模型一批「有标准答案的问题」&#xff0c;看它给出的回答有多少是“对的”本质是一个 评测&#xff08;Evaluation&#xff09;问题。2️⃣ 数学…

作者头像 李华
网站建设 2026/5/30 6:39:05

零基础理解VS Code的launch.json文件

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 请创建一个面向完全新手的launch.json教程&#xff0c;从VS Code的调试面板开始讲解&#xff0c;逐步解释&#xff1a;1) 如何创建文件 2) 最基本的配置结构 3) 如何添加第一个调试…

作者头像 李华
网站建设 2026/5/22 22:48:58

零基础学Pigx:AI带你完成第一个微服务项目

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 为编程新手创建一个Pigx框架学习项目&#xff0c;要求&#xff1a;1. 构建一个最简单的用户管理微服务 2. 每个代码文件添加详细中文注释 3. 包含图文并茂的部署指南 4. 提供常见错…

作者头像 李华
网站建设 2026/5/28 8:37:02

零基础30分钟搭建个人zlib镜像站

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个最简单的个人用zlib镜像网站&#xff0c;要求&#xff1a;1. 单页面应用设计 2. 内置20本示例电子书 3. 基础搜索框 4. 无需用户系统 5. 一键导出静态网站文件。使用纯HTML…

作者头像 李华