Rabbitmq 消费者的推模式与拉模式,go语言版本

RabbitMQ的消费者有两种模式

实现RabbitMQ的消费者有两种模式,推模式(Push)拉模式(Pull)

  • 推模式:消息中间件主动将消息推送给消费者
  • 拉模式:消费者主动从消息中间件拉取消息

推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。好处很明显,消费者总是有一堆在内存中待处理的消息,所以效率高。缺点是缓冲区可能会溢出。

拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。

选择推模式还是拉模式需要考虑使用场景。

推模式是最常用的,但是有些情况下推模式并不适用的,比如说:由于某些限制,消费者在某个条件成立时才能消费消息

接受消息 – 推模式

RMQ Server主动把消息推给消费者

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:队列名称。
  • consumer:消费者标签,用于区分不同的消费者。
  • autoAck:是否自动回复ACK,true为是,回复ACK表示高速服务器我收到消息了。建议为false,手动回复,这样可控性强。
  • exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
  • noLocal:如果为true,表示生产者和消费者不能是同一个connect。
  • nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
  • args:直接写nil,没研究过,不解释。

注意下返回值:返回一个<- chan Delivery类型,遍历返回值,有消息则往下走, 没有则阻塞。

接受消息 – 拉模式

消费者主动从RMQ Server拉消息

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
  • queue:队列名称 。
  • autoAck:是否开启自动回复。

消费者消息确认机制(关于Ack机制)

如果其中一个消费者开始一项漫长的任务而仅部分完成就挂掉,会发生什么情况?

在这种情况下,消费者挂掉,我们将丢失正在处理的消息。我们还将丢失所有发送给该特定消费者但尚未处理的消息。

但是我们不想丢失任何消息。如果一个消费者挂掉,我们希望将消息交付给另一个消费者。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(ACK),告知RabbitMQ特定的消息已被接收处理,并且RabbitMQ才可以删除此条消息。

在消费者从队列接收消息时,可以设置 autoAck 来指定是否自动确认。

  • autoAck = true,当消费者收到消息后,会自动发送ACK,RabbitMQ然后从内存或磁盘中删除消息。
  • autoAck = false,RabbitMQ 会一直等到持有消息的消费者显式地调用 Ack 命令为止。而且 RabbitMQ 也不会为消息设置过期时间,判断消息是否需要重新投递的依据是消费该消息的消费者已断开连接。这时候,队列中的消息分为两个部分:一部分是等待投递给消费者的消息;一部分是已经投递,但还没收到消费者确认信号的消息。如果一直没有受到确认信号,并且消费消息的消费者断开连接,RabbitMQ 会重新安排该消息进入队列,等待投递给下一个消费者(有可能还是原来的那个消费者)。
  • 推荐手动回复,尽量不要使用autoACK,因autoACK不可控。

如果设置为false后,主要使用两个函数进行手动回复:

func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
    if me.Acknowledger == nil {
    return errDeliveryNotInitialized
  }
  return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

简单看一眼,函数2调用了函数1,本质上两个函数没区别。

这里推荐使用第二个,因为方便。

另外说一下multiple参数。true表示回复当前信道所有未回复的ack,用于批量确认。false表示回复当前条目。

手动回复的例子

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

ACK机制的开发注意事项

如果你设置为手动回复,而且在程序中忘记了ACK,那么后果很严重。当消费者退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。

消费者拒绝消费

func (d Delivery) Reject(requeue bool) error

拒绝本条消息。如果requeue为true,则RMQ会把这条消息重新加入队列,如果requeue为false,则RMQ会丢弃本条消息。

RMQ官网提供的教程:https://www.rabbitmq.com/getstarted.html

go-amqp库函数手册:https://godoc.org/github.com/streadway/amqp