Go语言实现消息队列:从RabbitMQ到Kafka的完整指南
引言
消息队列是分布式系统中解耦、异步处理和削峰填谷的关键组件。Go语言提供了丰富的消息队列客户端库,支持RabbitMQ、Kafka等主流消息队列。本文将深入探讨Go语言实现消息队列的实践。
一、消息队列基础
1.1 消息队列模式
┌─────────────────────────────────────────────────────────────┐ │ 消息队列架构 │ ├─────────────────────────────────────────────────────────────┤ │ Producer │ Queue │ Consumer │ │ ┌─────────┐ │ ┌───────────┐ │ ┌─────────┐│ │ │发送消息 │───────│──>│ 消息队列 │───│──────>│处理消息 ││ │ └─────────┘ │ └───────────┘ │ └─────────┘│ │ │ │ │ │ Producer │ │ Consumer │ │ ┌─────────┐ │ │ ┌─────────┐│ │ │发送消息 │───────│──> │──────>│处理消息 ││ │ └─────────┘ │ │ └─────────┘│ └─────────────────────────────────────────────────────────────┘1.2 常见消息队列对比
| 特性 | RabbitMQ | Kafka | Redis |
|---|---|---|---|
| 协议 | AMQP | Kafka Protocol | RESP |
| 持久化 | 支持 | 支持 | 可选 |
| 吞吐量 | 中等 | 高 | 高 |
| 延迟 | 低 | 中等 | 低 |
| 消息顺序 | 保证 | 分区内保证 | 不保证 |
| 适用场景 | 任务队列 | 日志收集 | 缓存+队列 |
二、RabbitMQ实战
2.1 安装依赖
go get github.com/streadway/amqp2.2 生产者实现
package rabbitmq import ( "log" "github.com/streadway/amqp" ) type Producer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewProducer(uri, queueName string) (*Producer, error) { conn, err := amqp.Dial(uri) if err != nil { return nil, err } channel, err := conn.Channel() if err != nil { return nil, err } queue, err := channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return nil, err } return &Producer{ conn: conn, channel: channel, queue: queue, }, nil } func (p *Producer) Publish(message []byte) error { return p.channel.Publish( "", // exchange p.queue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: message, DeliveryMode: amqp.Persistent, }, ) } func (p *Producer) Close() { p.channel.Close() p.conn.Close() }2.3 消费者实现
type Consumer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewConsumer(uri, queueName string) (*Consumer, error) { conn, err := amqp.Dial(uri) if err != nil { return nil, err } channel, err := conn.Channel() if err != nil { return nil, err } queue, err := channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return nil, err } return &Consumer{ conn: conn, channel: channel, queue: queue, }, nil } func (c *Consumer) Consume(handler func([]byte) error) error { messages, err := c.channel.Consume( c.queue.Name, "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } forever := make(chan bool) go func() { for d := range messages { err := handler(d.Body) if err != nil { log.Printf("Handler error: %v", err) d.Nack(false, true) // 重新入队 } else { d.Ack(false) } } }() <-forever return nil }2.4 发布/订阅模式
func NewPublisher(uri, exchangeName string) (*Producer, error) { conn, err := amqp.Dial(uri) if err != nil { return nil, err } channel, err := conn.Channel() if err != nil { return nil, err } err = channel.ExchangeDeclare( exchangeName, "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return nil, err } return &Producer{ conn: conn, channel: channel, queue: amqp.Queue{}, }, nil } func (p *Producer) PublishToExchange(exchange, routingKey string, message []byte) error { return p.channel.Publish( exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", Body: message, }, ) }三、Kafka实战
3.1 安装依赖
go get github.com/segmentio/kafka-go3.2 生产者实现
package kafka import ( "context" "github.com/segmentio/kafka-go" ) type Producer struct { writer *kafka.Writer } func NewProducer(brokers []string, topic string) *Producer { writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: &kafka.LeastBytes{}, }) return &Producer{writer: writer} } func (p *Producer) Produce(message []byte) error { return p.writer.WriteMessages(context.Background(), kafka.Message{ Value: message, }, ) } func (p *Producer) Close() error { return p.writer.Close() }3.3 消费者实现
type Consumer struct { reader *kafka.Reader } func NewConsumer(brokers []string, topic, groupID string) *Consumer { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, StartOffset: kafka.FirstOffset, }) return &Consumer{reader: reader} } func (c *Consumer) Consume(handler func([]byte) error) error { for { msg, err := c.reader.ReadMessage(context.Background()) if err != nil { return err } err = handler(msg.Value) if err != nil { // 处理失败,消息会被重新消费 continue } } } func (c *Consumer) Close() error { return c.reader.Close() }3.4 分区消费
func NewPartitionConsumer(brokers []string, topic string, partition int) *Consumer { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, Partition: partition, StartOffset: kafka.FirstOffset, }) return &Consumer{reader: reader} }四、Redis队列
4.1 生产者实现
package redisqueue import ( "context" "github.com/go-redis/redis/v8" ) type Producer struct { client *redis.Client queue string } func NewProducer(client *redis.Client, queue string) *Producer { return &Producer{ client: client, queue: queue, } } func (p *Producer) Enqueue(ctx context.Context, message []byte) error { return p.client.RPush(ctx, p.queue, message).Err() } func (p *Producer) EnqueuePriority(ctx context.Context, message []byte, priority int) error { // 使用有序集合实现优先级队列 return p.client.ZAdd(ctx, p.queue+":priority", &redis.Z{ Score: float64(priority), Member: message, }).Err() }4.2 消费者实现
type Consumer struct { client *redis.Client queue string } func NewConsumer(client *redis.Client, queue string) *Consumer { return &Consumer{ client: client, queue: queue, } } func (c *Consumer) Dequeue(ctx context.Context) ([]byte, error) { result, err := c.client.BLPop(ctx, 0, c.queue).Result() if err != nil { return nil, err } return []byte(result[1]), nil } func (c *Consumer) DequeuePriority(ctx context.Context) ([]byte, error) { result, err := c.client.ZPopMax(ctx, c.queue+":priority").Result() if err != nil { return nil, err } if len(result) == 0 { return nil, nil } return []byte(result[0].Member.(string)), nil }五、消息队列最佳实践
5.1 消息持久化
// RabbitMQ: 消息持久化 err = channel.Publish( "", queue.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久化消息 Body: message, }, ) // Kafka: 默认持久化 // 消息写入后会持久化到磁盘5.2 消息重试
func (c *Consumer) Consume(handler func([]byte) error) error { messages, err := c.channel.Consume(...) if err != nil { return err } for d := range messages { maxRetries := 3 var lastErr error for i := 0; i < maxRetries; i++ { err := handler(d.Body) if err == nil { d.Ack(false) break } lastErr = err time.Sleep(time.Duration(i+1) * time.Second) } if lastErr != nil { // 死信队列 c.channel.Publish("", "dead-letter-queue", false, false, amqp.Publishing{ Body: d.Body, }) d.Ack(false) } } }5.3 流量控制
type ThrottledProducer struct { producer *Producer limiter *rate.Limiter } func NewThrottledProducer(producer *Producer, rate rate.Limit) *ThrottledProducer { return &ThrottledProducer{ producer: producer, limiter: rate.NewLimiter(rate, 100), // 每秒100条 } } func (p *ThrottledProducer) Publish(message []byte) error { p.limiter.Wait(context.Background()) return p.producer.Publish(message) }六、实战:异步任务处理系统
type TaskQueue struct { producer *kafka.Producer consumer *kafka.Consumer handlers map[string]func([]byte) error } func NewTaskQueue(brokers []string) *TaskQueue { return &TaskQueue{ producer: NewProducer(brokers, "tasks"), consumer: NewConsumer(brokers, "tasks", "task-consumers"), handlers: make(map[string]func([]byte) error), } } func (tq *TaskQueue) RegisterHandler(taskType string, handler func([]byte) error) { tq.handlers[taskType] = handler } func (tq *TaskQueue) Enqueue(taskType string, payload []byte) error { message, _ := json.Marshal(map[string]interface{}{ "type": taskType, "payload": payload, }) return tq.producer.Produce(message) } func (tq *TaskQueue) StartConsuming() error { return tq.consumer.Consume(func(message []byte) error { var msg map[string]interface{} if err := json.Unmarshal(message, &msg); err != nil { return err } taskType := msg["type"].(string) payload := []byte(msg["payload"].(string)) handler, exists := tq.handlers[taskType] if !exists { return fmt.Errorf("no handler for task type: %s", taskType) } return handler(payload) }) }结论
消息队列是构建分布式系统的关键组件,Go语言提供了丰富的客户端库支持各种消息队列。在实际项目中,需要根据业务需求选择合适的消息队列:RabbitMQ适合任务队列场景,Kafka适合高吞吐量日志收集,Redis适合简单的缓存+队列场景。同时,需要注意消息持久化、重试机制和流量控制,以保证系统的可靠性和稳定性。