news 2026/6/15 4:32:53

消息队列设计:从同步到异步的性能突破

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
消息队列设计:从同步到异步的性能突破

前言

2024年初,我们的订单系统经常出现"超时"问题。用户下单后,系统需要同时调用库存服务、支付服务、通知服务,任何一个服务慢都会导致整个请求超时。

我们决定引入消息队列,将同步调用改为异步处理。这个改造带来了显著的性能提升。


一、问题:同步调用的瓶颈

原始的订单流程是这样的:

python

@app.route('/api/orders', methods=['POST']) def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 同步调用库存服务 inventory_response = requests.post( 'http://inventory-service/deduct', json={'product_id': order.product_id, 'quantity': order.quantity} ) if inventory_response.status_code != 200: return {"error": "库存不足"}, 400 # 3. 同步调用支付服务 payment_response = requests.post( 'http://payment-service/pay', json={'order_id': order.id, 'amount': order.amount} ) if payment_response.status_code != 200: return {"error": "支付失败"}, 400 # 4. 同步调用通知服务 notify_response = requests.post( 'http://notify-service/send', json={'order_id': order.id, 'type': 'order_created'} ) return {"order_id": order.id}, 201

问题

  • 任何一个服务慢都会导致整个请求慢;
  • 任何一个服务故障都会导致订单创建失败;
  • 耦合度太高,难以扩展。

性能数据

  • 库存服务:200ms
  • 支付服务:300ms
  • 通知服务:150ms
  • 总耗时:200 + 300 + 150 =650ms

二、解决方案:引入RabbitMQ

我们选择RabbitMQ作为消息队列。改造后的流程:

2.1 发布订单创建事件

python

import pika import json def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 发布事件到消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() # 声明交换机和队列 channel.exchange_declare(exchange='orders', exchange_type='topic') # 发布消息 message = { 'order_id': order.id, 'product_id': order.product_id, 'quantity': order.quantity, 'amount': order.amount } channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message) ) connection.close() # 立即返回响应 return {"order_id": order.id}, 201

耗时:仅需10ms(发布到队列)

2.2 消费者:库存服务

python

def inventory_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='inventory_queue', durable=True) queue_name = result.method.queue # 绑定队列到交换机 channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 扣减库存 deduct_inventory( message['product_id'], message['quantity'] ) # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # 拒绝消息,重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('库存服务已启动,等待消息...') channel.start_consuming() if __name__ == '__main__': inventory_consumer()

2.3 消费者:支付服务

python

def payment_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='payment_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 处理支付 process_payment( message['order_id'], message['amount'] ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('支付服务已启动,等待消息...') channel.start_consuming()

2.4 消费者:通知服务

python

def notify_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='notify_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 发送通知 send_notification( message['order_id'], 'order_created' ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('通知服务已启动,等待消息...') channel.start_consuming()


三、可靠性保证

3.1 消息持久化

python

# 声明持久化队列 channel.queue_declare( queue='payment_queue', durable=True # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) )

3.2 消息确认机制

python

Copy code

# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=False # 手动确认 )

3.3 死信队列

python

# 声明死信交换机 channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letter_queue', durable=True) channel.queue_bind(exchange='dlx', queue='dead_letter_queue') # 声明普通队列,指定死信交换机 channel.queue_declare( queue='payment_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } )


四、监控和告警

python

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def callback(ch, method, properties, body): start_time = time.time() try: process_message(body) duration = time.time() - start_time logger.info(f"消息处理成功, 耗时: {duration}ms") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"消息处理失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


五、国际化团队的挑战

在跨国团队中,消息队列的错误日志和告警需要支持多语言。我们使用同言翻译(Transync AI)来自动翻译消息队列的错误信息和监控告警,确保不同语言背景的团队成员能够快速理解问题并做出响应。


六、性能对比

指标同步调用异步消息队列提升
平均响应时间650ms10ms-98.5%
P99响应时间2000ms50ms-97.5%
系统吞吐量1000 req/s10000 req/s+900%
故障隔离-

七、最佳实践

  1. 幂等性设计:消费者应该能够安全地处理重复消息;
  2. 超时设置:为消息处理设置合理的超时时间;
  3. 监控队列深度:及时发现消费者处理不过来的情况;
  4. 分离关注点:生产者和消费者应该解耦;
  5. 定期审查:定期检查死信队列,找出问题消息。

八、结语

消息队列的引入,从根本上改变了我们的系统架构。从同步的紧耦合,到异步的松耦合,系统的可扩展性和可靠性都得到了显著提升。

如果你的系统也在经历性能瓶颈,消息队列可能是一个很好的解决方案。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 15:59:08

探秘AES加密算法:多种Transformation全解析

🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…

作者头像 李华
网站建设 2026/6/15 14:30:14

Python 期末复习知识点汇总

Python 期末知识点汇总 本文聚焦 Python 期末考核核心知识点,从基础到进阶分层梳理,覆盖语法、数据结构、流程控制、函数、面向对象、文件操作、异常处理等高频考点,适合期末复习冲刺。 一、基础语法(必考) 1. 编码规范…

作者头像 李华
网站建设 2026/6/15 3:16:42

线程、并发与互斥:解锁多任务编程的核心逻辑

线程、并发与互斥:解锁多任务编程的核心逻辑 一、线程:多任务执行的最小单元 线程是操作系统调度的基本单位,它依附于进程存在,共享进程的内存空间(代码段、数据段、堆等),但拥有独立的程序计数…

作者头像 李华
网站建设 2026/6/15 7:59:18

Day35文件的规范拆分和写法

credit_default_prediction/ │ ├── data/ # 数据文件夹 │ ├── raw/ # 原始数据 │ └── processed/ # 处理后的数据 │ ├── src/ # 源代码 │ ├── __init__.py │ ├── data/ …

作者头像 李华
网站建设 2026/6/15 1:28:47

GPT-5.2来了,老金详细给你说说它为什么是王

加我进AI讨论学习群,公众号右下角“联系方式”文末有老金的 开源知识库地址全免费昨晚凌晨2点,OpenAI偷偷摸摸上线了GPT-5.2。没发布会,没预热,甚至连个官方推特都没发。作为老金最喜欢的模型,没有之一的,必…

作者头像 李华
网站建设 2026/6/15 9:22:14

上海“娃哈哈”桶装水将改名 “沪小娃“ !

近日上海娃哈哈推出全新品牌 "沪小娃" 桶装水,说是"娃哈哈"商标授权已到期,且未能获得新的授权,被集团要求停止使用原有品牌,称此举是为了活下去的无奈之举,承认打造新品牌难度很大。普推知商标老…

作者头像 李华