RabbitMQ 入门 ,Go - 2. 发布和接收消息

本文我将使用 Go 语言在 RabbitMQ 上发布和接收消息。

Go 的标准库本身并没有 RabbitMQ 的原生绑定,但是有一个第三方库确能够支持 RabbitMQ,它的源码在 https://github.com/streadway/amqp ,其文档在 https://pkg.go.dev/github.com/streadway/amqp

建立一个 Go 的项目,并使用 go mod init 进行初始化:

使用 go get -u github.com/streadway/amqp 命令来安装这个库:

获取 Queue

代码如下:

  1. 首先导入需要的包,主要是 streadway/amqp

  2. 第 12 行,编写处理错误的函数 failOnError

  3. 第 19 行,编写可以获得 AMQP Connection、Channel、Queue 的帮助函数 getQueue()

我们知道我们需要将消息发布到 Exchange 上面,但是如果使用默认 Exchange 的话,就可以使用一个捷径:我们可以将消息直接发送到 Queue 的名称上(但实际并不是直接发送到 Queue 上面)。

  1. getQueue() 函数不用任何参数,它返回三个对象:

    1. *amqp.Connection 表示应用和 RabbitMQ 之间的网络连接

    2. *amqp.Channel 位于 Connection 之上,它提供了用于双方通信的通道。通过把 Connection 和 Channel 分开,客户应用中就可以在同一个 Connection 上拥有多个 Channel 用来通信,这样就减少了对资源的需求。

    3. *amqp.Queue 也就是队列

  2. 第 20 行,使用 amqp 的 Dial 函数可以返回一个 Connection,Dial 函数的参数是 RabbitMQ 的 URL,URL 里面需要包含用户凭证。

  3. 第 22 行,通过调用 Connection 对象上的 Channel 方法,创建一个 Channel

  4. 第 24 行,通过调用 Channel 对象上的 QueueDeclare 方法,返回一个 Queue。注意:这个 Queue 不一定是被创建的,如果不存在指定名称的 Queue,那么 RabbitMQ 就会创建一个;如果存在指定名称的 Queue,但是和指定的配置不同,那么 RabbitMQ 就会拒绝这个请求,并抛出错误。

  5. QueueDeclare 方法参数:

    1. 第一个参数是 Queue 的名称:我们就写死一个 hello

    2. 第二个参数是 durable bool,表示是否将添加到 Queue 的消息存储在硬盘上。如果这个参数值为 true,那么 RabbitMQ 服务器重启之后消息依然会存在。但是它会导致处理消息的能力明显下降。这里我把它设为 false。

    3. 第三个参数 autoDelete bool,它会告诉 RabbitMQ 如果消息没有消费者应该怎么做:

      1. true:消息就会从 Queue 中删除

      2. false:将消息保留直到某个消费者前来获取该消息。这里我把它设为 false

    4. 第四个参数 exclusive bool,它允许我们把这个 Queue 设置为只能从请求它的那个 Connection 上进行访问。

      1. 如果它为 true,但想创建一个来自其它 Connection 的同名 Queue,那么就会报错

      2. 如果它是 false,那么想创建一个来自其它 Connection 的同名 Queue 的结果就是:两个 Connection 都连接到同一个 Queue,两个 Connection 会共享它。这里我把它设为 false

    5. 第五个参数 noWait bool,

      1. 如果为 true,这个 Queue 就被认为已经在服务器上存在了,将它返回即可,如果它不存在,那么就会报错

      2. 所以这里设置为 false,因为我要创建的 Queue 在服务器上不存在。

    6. 第六个参数 args amqp.Table,这个参数用于某些特定场景,例如声明一些要被这个 Queue 匹配的 Headers,如果这个 Queue 被绑定到 Header Exchange 的话。这里我传的是 nil。

  6. 如果第 24 行的 QueueDeclare 方法调用成功,那么就会得到一个绑定到 Default Exchange 的 Queue。

  7. 注意:Default Exchange 的类型是 Direct,也就是说任何没有路由 Key(和 Queue 的名称相同,在这里就是 hello)的消息传进来,将会被直接通过 exchange 送往输出的 Queue。

  8. 第 31 行,将 3 个对象返回即可,注意 q 我们返回的是指针

发布消息

  1. 16 行,编写了一个 server 函数

  2. 17 行,通过 getQueue 来获得 Connection,Channel 和 Queue

  3. 18,19 行,按顺序 defer 关闭 Connection 和 Channel

  4. 21 行,创建一个消息 amqp.Publishing 结构体,很多参数都是可选的,这里我设置两个:

    1. ContentType 会指明消息的类型。RabbitMQ 会把数据变成字节流来传输,所以它其实并不关心消息的类型。但是如果你往同一个 Queue 发送不同类型的消息,那么还是设置一下这个字段比较好,便于区分消息的类型。

    2. Body 可能是该结构体中最重要的字段:它的类型是 Byte Slice,里面包含着要传送的数据。

  5. 第 26 行,将消息发布到消息代理上。这里我们使用 Channel 上的 Publish 方法,其参数有:

    1. 第一个参数 Exchange:“”表示使用 Default Exchange,它没有名称

    2. 第二个参数是路由 Key:本例中,需要把它设置为 Queue 的名称

    3. 第三、四各参数 mandatory bool,immediate bool:用于发生者需要确认消息是否被传递成功,以及什么时候传递成功的。

    4. 第五个参数就是消息本身:也就是 msg

  6. 在 main 函数中调用 server 函数。这里循环调用是为了看看 RabbitMQ 的性能,你可以只调用一次。

运行程序

运行 go run . 命令:

打开管理控制台:

可以看到目前有 8 个 Exchange,10 个 Queue,有 9636 个消息

切换到 Exchange 画面:

可以看到 Default Exchange 的消息速率。

切换到 Queues 画面:

可以看到 hello 这个 Queue 的运行信息。

打开 hello Queue

可以看到 hello Queue 的运行信息。

移动到下面,

点击 Get Messages 按钮

就可以看到 Queue 里面的消息。

接收消息

从 RabbitMQ 接收消息与向 RabbitMQ 发布消息很类似。 接收消息仍然需要 Connection,Channel,Queue,但是交互方式略有不同。

我将之前的程序代码更新一下,以便可以在同一个 Queue 里同时发送和接收消息:

  1. 首先在 18 行建立 client 函数,用于处理接收消息的逻辑

  2. 19 行,通过 getQueue() 获得 Connection,Channel,Queue

  3. 23 行,调用 Channel 上的 Consume 方法。这个方法返回一个 Go 的 Channel,每从服务器接收到消息,就可以通过这个 Go Channel 获得。

  4. Consume 方法的参数:

    1. 第一个参数是接收消息的 queue 的名称,注意接收消息不需要关心 exchange,exchange 只有在发布消息时才用到。

    2. 第二个参数 consumer string,它会唯一的识别出 Queue 对应的 Connection。它被 RabbitMQ 内部使用,来确定谁正在监听这个 Queue。当 Direct Exchange 的多个客户端都从同一个 Queue 接收消息的时候,这个就很重要了。RabbitMQ 会把消息分发到各个客户端,粗略的看作是一种负载均衡。它对于想要取消连接的客户端也很重要,这就可以让 RabbitMQ 知道以后不要再向这个接收者发送消息了。这里我们传进一个“”,让 RabbitMQ 为我们赋一个值,因为我们没有追踪的需求。

    3. 第三个参数 autoAck bool,表示是否想在成功接收消息后自动确认。这个值通常设置为 true,从而让服务器可以立即将这个消息移除,以便节省资源。但是如果接收的消息需要做其它可能失败的动作,例如存储到数据库,那么可能最好将它设置为 false,然后手动进行确认。

    4. 第四个参数 exclusive bool,它可以确认这个客户端是这个 Queue 的唯一消费者。如果它设为 true,而其它客户端已经注册了,或稍后有其它 Queue 将要监听这个 Queue,就会发生错误。这里设为 false。

    5. 第五个参数 noLocal bool,防止 RabbitMQ 发送消息给与发送者处于同一个 Connection 的客户端。这里设为 false。

    6. 第六个参数 noWait bool,如果为 true,就不会等待服务器确认请求,而会立即进行传送。这里设为 false。

    7. 第七个参数 args amqp.Table,对于 Queue 或 Server 有特定语义的参数可以放在这里。此例中,我放的是 nil。

  5. 第 33 行,使用 for 循环来处理 msgs 这个 Go Channel,来接收消息。这里在接收消息后,我只简单的将其打印。

  6. 第 11、12 行,分别使用 goroutine 来运行 client 和 server,注意目前必须把 client 放在前面。

  7. 第 15 行的目的就是让这两个 goroutine 保持存活。

运行

我把 server 函数做了调整,让其持续不断的发布消息:

然后使用 go run . 运行以后,你将看到终端里不断在刷新这样的消息:

通过管理控制台,我们可以看到发布和接收消息的速率: