go 操作RabbitMQ

docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 rabbitmq:3-management

  • 三个端口映射,分别表示
5672:连接生产者、消费者的端口
15672:WEB管理页面的端口
25672:分布式集群的端口

2.基本概念

  • amqp:高级消息队列协议,即一种消息中间件协议,RMQ是amqp协议的一个具体实现。RMQ使用Erlang语言实现的,具有很好的并发能力,具体历史请百度,这里主要关心怎么用。
  • 生产者将消息发送至交换器;交换器再发送至队列,最后发送至消费者
  • 交换器有四种类型,fanout、direct、topic三种类型,header类型没用过,不关注。
fanout
一对多,根据绑定发送到每一个队列,
常用于发布订阅

direct
默认模式,一对一关系,根据routingkey与bindingjkey
一一对应匹配,发送消息

关于topic模式
以 ‘.’ 来分割单词。
‘#’ 表示一个或多个单词。
‘*’ 表示一个单词。
如:
RoutingKey为:
aaa.bbb.ccc
BindingKey可以为:
*.bbb.ccc
aaa.#

3.库中重要的方法

  • 创建交换器
func (ch *Channel) ExchangeDeclare(
        name string,  //交换器的名称
        kind string, //表示交换器的类型。有四种常用类型:direct、fanout、topic、headers
        durable bool, //是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器
        autoDelete bool, //是否自动删除,true表示,当所有绑定都与交换器解绑后,会自动删除此交换器。
        internal bool,  //是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
        noWait bool, //是否非阻塞, 阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ
        args Table
) error
  • 创建队列
func (ch *Channel) QueueDeclare(
        name string,  //队列名称
        durable bool,  //是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息
        autoDelete bool,  //是否删除,当所有消费者都断开时,队列会自动删除。
        exclusive bool,   //是否排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。
        noWait bool, //是否非阻塞
        args Table) (Queue, error)
  • 队列与交换器绑定,key,表示要绑定的键,交换器以此来分发
func (ch *Channel) QueueBind(
        name,  //队列名字,确定哪个队列
        key, // 对应图中BandingKey,表示要绑定的键。
        exchange string,  //交换器的名字
        noWait bool,  //是否非阻塞
        args Table) error
  • 交换器之间的绑定
func (ch *Channel) ExchangeBind(
        destination,  //目的交换器,通常是内部交换器。
        key,    //对应BandingKey,表示要绑定的键。
        source string,  //源交换器
        noWait bool,   //是否非阻塞
        args Table) error
  • 发送消息
func (ch *Channel) Publish(
                exchange,  //要发送的交换机
                key string,  //路由键,与之相关的绑定键对应
                mandatory, 
                immediate bool, 
                msg Publishing   //要发送的消息,msg对应一个Publishing结构
                ) error
                
//Publishing 结构体
type Publishing struct {
        Headers Table
        // Properties
        ContentType     string  //消息的类型,通常为“text/plain”
        ContentEncoding string  //消息的编码,一般默认不用写
        DeliveryMode    uint8   //消息是否持久化,2表示持久化,0或1表示非持久化。
        Body []byte  //消息主体
        Priority        uint8  //消息的优先级 0 to 9
        CorrelationId   string    // correlation identifier
        ReplyTo         string    // address to to reply to (ex: RPC)
        Expiration      string    // message expiration spec
        MessageId       string    // message identifier
        Timestamp       time.Time // message timestamp
        Type            string    // message type name
        UserId          string    // creating user id - ex: "guest"
        AppId           string    // creating application id
}
                
  • 消费者接收消息--推模式
func (ch *Channel) Consume(
        queue string,  //队列名称 
        consumer string,  //消费者标签,用于区分不同的消费者
        autoAck string,  //是否自动回复ACK,true为是,回复ACK表示高速服务器我收到消息了。建议为false,手动回复,这样可控性强
        exclusive bool,  //设置是否排他,排他表示当前队列只能给一个消费者使用
        noLocal bool, //如果为true,表示生产者和消费者不能是同一个connect
        noWait bool, //是否非阻塞
        args Table) (<-chan Delivery, error)
  • 消费者接收消息--拉模式
func (ch *Channel) Get(
        queue string, 
        autoAck bool) (msg Delivery, ok bool, err error)
  • 手动回复消息
func (ch *Channel) Ack(tag uint64, multiple bool) error

func (me Delivery) Ack(multiple bool) error {
        if me.Acknowledger == nil {
                return errDeliveryNotInitialized
        }
        return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

func (d Delivery) Reject(requeue bool) error
Publish – mandatory参数
  • false:当消息无法通过交换器匹配到队列时,会丢弃消息。
  • true:当消息无法通过交换器匹配到队列时,会调用basic.return通知生产者。
  • 注:不建议使用,因会使程序逻辑变得复杂,可以通过备用交换机来实现类似的功能。
Publish – immediate参数
  • true:当消息到达Queue后,发现队列上无消费者时,通过basic.Return返回给生产者。

  • false:消息一直缓存在队列中,等待生产者。

  • 注:不建议使用此参数,遇到这种情况,可用TTL和DLX方法代替(后面会介绍

  • Qos

    func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

  • 注意:这个在推送模式下非常重要,通过设置Qos用来防止消息堆积。

  • prefetchCount:消费者未确认消息的个数。

  • prefetchSize :消费者未确认消息的大小。

  • global :是否全局生效,true表示是。全局生效指的是针对当前connect里的所有channel都生效

4.代码示例

生产者

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

消费者

package main

import (
        "github.com/streadway/amqp"
        "log"
)

func main() {
        conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        DealWithError(err,"Failed to connect to RabbitMQ")
        defer conn.Close()

        ch,err := conn.Channel()
        DealWithError(err,"Failed to open a channel")
        defer ch.Close()
        //声明交换器
        ch.ExchangeDeclare(
                "logs",
                "fanout",
                true,
                false,
                false,
                false,
                nil,
                )
        DealWithError(err,"Failed to declare an exchange")
        //声明了队列
        q,err := ch.QueueDeclare(
                "", //队列名字为rabbitMQ自动生成
                false,
                false,
                true,
                false,
                nil,
                )
        DealWithError(err,"Failed to declare an exchange")
        //交换器跟队列进行绑定,交换器将接收到的消息放进队列中
        err = ch.QueueBind(
                q.Name,
                "",
                "logs",
                false,
                nil,
                )
        DealWithError(err,"Failed to bind a queue")
        msgs,err := ch.Consume(
                q.Name,
                "",
                true,
                false,
                false,
                false,
                nil,
                )
        DealWithError(err,"Failed to register a consumer")
        forever := make(chan bool)
        go func() {
                for d := range msgs{
                        log.Printf(" [x] %s",d.Body)
                }
        }()
        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

func DealWithError(err error,msg string)  {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}