news 2026/6/4 2:29:39

【RocketMQ】阿里万亿级消息中间件MQ保姆级教程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【RocketMQ】阿里万亿级消息中间件MQ保姆级教程

>MQ主要解决应用解耦、异步消息、流量削峰、日志处理等问题

目录

一、RocketMQ概述

1.1 RocketMQ是什么

1.2 核心架构和概念

1.3 部署架构模式

二、快速上手

2.1 Docker部署

2.2 三中发送方式

2.3 顺序消息

2.4 事务消息

2.5 延迟消息

2.6 消息过滤

三、源码分析

3.1 Broker 启动流程

3.2 消息发送核心流程

3.3 消息消费核心流程


一、RocketMQ概述

1.1 RocketMQ是什么

Apache RocketMQ 是阿里巴巴开源的一款分布式、队列模型的消息中间件,后捐赠给 Apache 基金会并成为顶级项目(TLP)。

各mq之间的比较:

维度RocketMQKafkaRabbitMQ
定位消息中间件(业务消息)日志流式处理传统消息中间件
协议自定义协议自定义协议AMQP
顺序消息✅ 天然支持⚠️ 仅分区有序⚠️ 较弱
事务消息✅ 完整方案❌ 不支持⚠️ 弱支持
延迟消息✅ 内置 18 个级别❌ 需外部方案⚠️ 插件
消息堆积✅ 亿级✅ 强❌ 弱
吞吐量百万级百万级万级
运维复杂度

选型建议:

  • 业务消息(订单、交易)→RocketMQ(推荐)

  • 大数据日志流 → Kafka

  • 传统企业集成、复杂路由 → RabbitMQ

1.2 核心架构和概念

┌──────────────────────────────────────────────┐ │ RocketMQ 集群 │ │ │ │ ┌────────────┐ ┌──────────────────┐ │ │ │ NameServer │ ←──→ │ NameServer(集群)│ │ │ └────────────┘ └──────────────────┘ │ │ ↑ │ │ │ 注册/发现 │ │ ↓ │ │ ┌────────────┐ 主从同步 ┌────────────┐ │ │ │ Broker-A │ ←───────→ │ Broker-A-S │ │ │ └────────────┘ └────────────┘ │ │ ↑ │ │ │ 发送/消费 │ │ ↓ │ │ ┌────────────┐ ┌────────────┐ │ │ │ Producer │ │ Consumer │ │ │ └────────────┘ └────────────┘ │ └──────────────────────────────────────────────┘
角色说明
NameServer路由注册中心,集群中各角色通过它获取路由信息(无状态)
Broker消息存储与中转,负责消息的存储、投递、查询、高可用
Producer消息生产者,向 Broker 发送消息
Consumer消息消费者,从 Broker 拉取消息进行消费
Topic消息主题,一类消息的集合
Message Queue消息队列,Topic 被切分为多个 Queue 以提高并行度
Tag子主题,用于消息过滤
Group一类 Producer 或 Consumer 的集合
Offset消费位点,记录消费进度

1.3 部署架构模式

模式说明
单 Master不推荐,仅用于本地调试
多 Master集群无 Slave, Broker 集群
多 Master 多 Slave(异步)Master 写成功即返回,Slave 异步复制
多 Master 多 Slave(同步)双写成功才返回,强一致
Dledger 集群基于 Raft 协议的自动选主(4.5+)

二、快速上手

2.1 Docker部署

docker-compose.yml文件如下,注意IP需要修改为自己的公网IP:

version: '3.8' services: # ==================== 1. NameServer ==================== rmqnamesrv: image: apache/rocketmq:4.9.4 container_name: rmqnamesrv ports: - "9876:9876" environment: - JAVA_OPT_EXT=-server -Xms256m -Xmx256m -Xmn128m command: sh mqnamesrv networks: - rocketmq_net # ==================== 2. Broker Master ==================== rmqbroker-master: image: apache/rocketmq:4.9.4 container_name: rmqbroker-master ports: - "10911:10911" # 业务端口 - "10909:10909" # VIP 端口 (10911-2) - "10912:10912" # HA 端口 environment: - NAMESRV_ADDR=rmqnamesrv:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: > sh -c " echo 'brokerClusterName=DefaultCluster' > /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerName=broker-a' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerId=0' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'deleteWhen=04' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'fileReservedTime=48' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerRole=ASYNC_MASTER' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'flushDiskType=ASYNC_FLUSH' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP1=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP2=rmqbroker-master' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'autoCreateTopicEnable=true' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'listenPort=10911' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haListenPort=10912' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && sh mqbroker -n rmqnamesrv:9876 -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf & tail -f /home/rocketmq/logs/rocketmqlogs/broker.log " depends_on: - rmqnamesrv networks: - rocketmq_net # ==================== 3. Broker Slave ==================== rmqbroker-slave: image: apache/rocketmq:4.9.4 container_name: rmqbroker-slave ports: - "10921:10921" # 业务端口(Slave 独立) - "10919:10919" # VIP 端口(10921-2) - "10922:10922" # HA 端口 environment: - NAMESRV_ADDR=rmqnamesrv:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: > sh -c " echo 'brokerClusterName=DefaultCluster' > /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerName=broker-a' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerId=1' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'deleteWhen=04' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'fileReservedTime=48' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerRole=SLAVE' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'flushDiskType=ASYNC_FLUSH' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP1=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP2=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'autoCreateTopicEnable=true' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'listenPort=10921' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haListenPort=10922' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haMasterAddress=175.24.167.232:10912' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && sh mqbroker -n rmqnamesrv:9876 -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf & tail -f /home/rocketmq/logs/rocketmqlogs/broker.log " depends_on: - rmqnamesrv - rmqbroker-master networks: - rocketmq_net # ==================== 4. RocketMQ Dashboard ==================== rmqdashboard: image: apacherocketmq/rocketmq-dashboard:1.0.0 container_name: rmqdashboard ports: - "8180:8080" environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 depends_on: - rmqnamesrv - rmqbroker-master - rmqbroker-slave networks: - rocketmq_net networks: rocketmq_net: driver: bridge

浏览器访问:ip:8180,出现下面的页面说明mq集群部署成功

2.2 三中发送方式

添加Maven依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>

不同的发送方式

方式特点适用场景
同步发送等待 Broker 应答,可靠性高重要通知、订单消息
异步发送回调通知,性能高对响应时间敏感
单向发送不关心结果,性能最高日志收集

发送结果状态:

返回状态含义
SEND_OK发送成功
FLUSH_DISK_TIMEOUT刷盘超时(异步刷盘模式可能仍成功)
FLUSH_SLAVE_TIMEOUT主从同步超时(异步复制可能仍成功)
SLAVE_NOT_AVAILABLE从节点不可用

2.3 顺序消息

类型如下:

类型描述性能
全局顺序整个 Topic 只有一个 Queue
分区顺序同一业务 key 投递到同一 Queue

2.4 事务消息

流程图:

生产者代码:

消费者代码:

回查代码:

关键设计点

  • Half 消息: 对消费者不可见

  • Op 消息: Commit/Rollback 操作消息

  • 反查机制: 解决本地事务执行时间过长的问题

2.5 延迟消息

内置18个延迟级别:

"1s", "5s", "10s", "30s","1m", "2m", "3m", "4m", "5m", "6m", "7m", "8m", "9m", "10m", "20m", "30m", "1h", "2h"

实现原理:

  1. 消息写入 CommitLog

  2. 如果是延迟消息,替换 Topic 为SCHEDULE_TOPIC_XX

  3. 调度线程按级别将消息从 SCHEDULE_TOPIC 投递回原 Topic

  4. 消费者消费

2.6 消息过滤

三、源码分析

3.1 Broker 启动流程

BrokerStartup.createBrokerController(args)
→ BrokerController.initialize()
→ 加载配置
→ 创建各种管理器(消息存储、消费者偏移、订阅等)
→ BrokerController.start()
→ 启动 Netty Server
→ 注册到 NameServer
→ 启动各种定时任务

关键线程:

线程作用
SendMessageProcessor处理消息发送请求
PullMessageProcessor处理消息拉取请求
FlushRealTimeService异步刷盘
ReputMessageService构建 ConsumeQueue/Index
HAConnection主从同步

3.2 消息发送核心流程

DefaultMQProducerImpl.send()
→ MQFaultStrategy.selectOneMessageQueue() // 选队列
→ SendMessageProcessor.processRequest() // Broker 端
→ MessageStore.putMessage() // 存储
→ CommitLog.asyncPutMessage() // 顺序写
→ Broker 返回 SendResult

3.3消息消费核心流程

PullMessageHoldService
→ PullRequestHoldService 拉取请求
→ PullMessageProcessor 处理
→ PullResult 推送给消费者
→ ConsumeMessageConcurrentlyService 消费
→ 更新 Offset


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/4 2:28:38

告别每次上电重下载:深入理解Intel FPGA的AS模式与EPCQ配置流程

深入解析Intel FPGA的AS配置模式与EPCQ固件烧录技术在FPGA开发中&#xff0c;最令人沮丧的莫过于每次断电后都需要重新下载程序。这种重复性工作不仅降低开发效率&#xff0c;在产品量产阶段更是不可接受的。本文将带您深入理解Intel FPGA的Active Serial(AS)配置模式与EPCQ系列…

作者头像 李华
网站建设 2026/6/4 2:25:39

效率倍增:用快马平台将吴恩达claude code手册建议自动化

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 请根据以下需求&#xff0c;利用快马平台的AI能力生成一个能提升日常工作效率的Python脚本工具。需求描述&#xff1a;我需要一个脚本&#xff0c;能自动处理多个Markdown格式的日…

作者头像 李华
网站建设 2026/6/4 2:21:23

别再只用ArcGIS了!免费神器GeoDa 1.16版空间自相关分析保姆级教程

空间数据分析新选择&#xff1a;GeoDa 1.16版深度实战指南在数据驱动的时代&#xff0c;空间统计分析已成为城市规划、犯罪研究、公共卫生等领域不可或缺的工具。当大多数分析师习惯性地打开ArcGIS时&#xff0c;一款名为GeoDa的开源软件正在学术圈和专业领域悄然走红。这款由芝…

作者头像 李华