RabbitMQ学习整理

2019年12月12日 阅读数:66
这篇文章主要向大家介绍RabbitMQ学习整理,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

一、什么是消息队列?

概念:html

消息队列(Message Queue,简称MQ),本质是个队列,FIFO先入先出,只不过队列中存放的内容是一些Messagejava

二、为何要用消息队列,应用场景?

不一样系统、进程或者线程直接进行通讯。

 

系统解耦,将要作的部分放入队列,便于模块分离。

传统模式spring

 

使用队列解耦json

 

 

缓冲功能,当有大量请求要处理时,能够先入队而后依次处理,保证系统可靠性。

例如:淘宝秒杀活动等。服务器

 

 

异步操做,有时候为了快速响应,可使用队列来实现异步,好比邮箱验证,手机验证码发送等。

 

三、消息队列的几种模式。

3-1、简单模式(单生产者单消费者)

            

一个线程负责生产,一个线程负责总队列里取出来消费。app

3-2、单生产者多消费者

 

一个线程生产,多个线程取出来消费。异步

3-3、订阅/发布模式

 

 一个发布者发送消息,多个订阅者能够同时获取到发布的消息ide

3-4、路由模式

 

3-5、主题模式,按规则模糊匹配

 

3-5、广播模式

生产者发送的消息会发往每一个与其绑定的队列。学习

 

4RabbitMQ的几个组成部分

AMQP协议: Advanced Message Queuing Protocol 高级消息队列协议,是一个异步消息传递所使用的应用层协议规范。fetch

 

组成:

         服务主机:接收客户端请求,并做出相应处理

         虚拟主机:一个服务器能够开启多个Virtual Host,每一个虚拟主机都能提供完整的服务,有本身的权限控制。

         生产者:发送消息

         消费者:接收消息并处理

         交换器:RabbitMQ中,消息不是直接发往队列,而是要先给交换器,而后交换器按照必定的路由规则发送到相对应的队列上。

         路由Key:发送消息时要指定交换器和路由Key

         绑定:将队列和交换器绑定起来路由Key做为绑定时的关键字

         队列:消息的载体,消费者从队列中获取消息,路由器根据路由规则把消息发往对应的队列。

         消息:队列中存储的信息单元。

5、工做原理

生产者发送消息时指定交换器名和路由Key,而后交换器根据路由Key与绑定信息进行比对,找到对应的队列后将信息发送出去。

消费者监听某个队列,若是有消息就取出来作对应操做,没有就阻塞。


 

 

6、交换器的几个工做模式

Direct:固定名称匹配,只有路由Key与绑定的Key一致才会将消息发送到该队列。

Topic:主题模式,路由Key能够用*#来填充

 

#能够匹配任意多个单词,*只能匹配一个单词

好比bingdKey  x.y  x.y.z  a.y  a.b.z

x.*只能匹配x.y,而x.#能够匹配x.y x.y.z

Fanout:广播模式

7Demo

安装RabbitMQ,并启动服务。默认用户名密码guest。建立VirtualHost

7-1、单生产者单消费者

publicclass Recv {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "hello-exchange";

       channel.exchangeDeclare(exchangeName, "direct", true);

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       String routingKey = "hola";

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, routingKey);

 

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

    }

}

publicclass Send {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "hello-exchange";

        channel.exchangeDeclare(exchangeName, "direct", true);

 

        String routingKey = "hola";

        //发布消息

        byte[] messageBodyBytes = "quit".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

 

    }

}

 

7-2、单生产者多消费者

11的相似,只不过两个消费者共同消费一个队列内的信息,复制一份消费者便可。

7-3、订阅/发布模式

生产者发送的消息,发往每一个订阅他的消费者那里。全部消费者均可以获取相同的信息。

发布者A

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "王力宏";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //发布消息

        byte[] messageBodyBytes = "王力宏发布的消息:啦啦啦啦啦".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

发布者B

publicclass Send2 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "赵薇";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //发布消息

        byte[] messageBodyBytes = "赵薇发布的消息:啊啊啊啊".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

订阅者A1A2

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "王力宏";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

               }

           });

       }

    }

}

订阅者B1B2

publicclass Recv3 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       channel.exchangeDeclare("赵薇", "fanout", true);

       //声明交换器

       String exchangeName = "赵薇";

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

 

}

 

7-4、主题模式

生产者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "topic-exchange";

        channel.exchangeDeclare(exchangeName, "topic", true);

 

        //

        String routingKey = "#.B";

       

        //发布消息

        for (inti = 0; i < 3; i++) {

            channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes());

       }

        //byte[] messageBodyBytes = "匹配消息".getBytes();

        //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();

        conn.close();

    }

}

消费者A

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "topic-exchange";

       channel.exchangeDeclare(exchangeName, "topic", true);

       //声明队列

       String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();

      

       String routingKey = "X.A";

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

              }

           });

       }

    }

}

消费者BroutingKey routingKey.A

消费者CroutingKey  routingKey.B

7-5、广播模式,不须要管routingKeybindingKey是否匹配。

生产者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "fanout-exchange";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //

        String routingKey = "hola";

       

        //发布消息

        byte[] messageBodyBytes = "群发消息".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

消费者

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "fanout-exchange";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

      

       String routingKey = "hola2";

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

}

 

8RabbitMQSpring整合

8-1、添加依赖

<dependency>

  <groupId>com.rabbitmq</groupId>

  <artifactId>amqp-client</artifactId>

  <version>x.x.x</version>

</dependency>

<dependency>

    <groupId>org.springframework.amqp</groupId>

    <artifactId>spring-rabbit</artifactId>

    <version>x.x.xRELEASE</version>

</dependency>

8-2、配置文件中加入rabbit服务链接配置

mq.host=real_host

mq.username=guest

mq.password=guest

mq.port=5672

mq.vhost=real_vhost

8-3、新建application-mq.xml文件,添加配置信息

主要用来配置链接信息、Producer配置、队列声明、交换器声明、队列与交换器的绑定、队列的监听器配置(即消费者)等。

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- 全局配置 -->

    <!-- 定义RabbitMQ的链接工厂 -->

    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>

    <!-- MQ的管理,包括队列、交换器等 -->

    <rabbit:admin connection-factory="connectionFactory"/>

 

    <!-- Sender配置 -->

    <!-- spring template声明-->

    <!-- 能够不指定交换器,在每次发送请求时须要指明发给哪一个交换器 <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"/> -->

    <rabbit:template exchange="test" id="amqpTemplate"  connection-factory="connectionFactory"/><!-- message-converter="jsonMessageConverter" />    -->

    <!-- 消息对象json转换类 -->

    <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> 

    -->

 

    <!--定义queue  说明:durable:是否持久化 exclusive: 仅建立者可使用的私有队列,断开后自动删除 auto_delete: 当全部消费客户端链接断开后,是否自动删除队列-->

    <rabbit:queue name="mq.A" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:queue name="mq.B" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:queue name="mq.C" durable="true" auto-delete="false" exclusive="false" />

   

    <!-- 定义交换机,而且完成队列和交换机的绑定 -->

    <rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test">

        <rabbit:bindings>

            <rabbit:binding queue="mq.A" key="key.A"/>

            <rabbit:binding queue="mq.B" key="key.B"/>

            <rabbit:binding queue="mq.C" key="key.C"/>

        </rabbit:bindings>

    </rabbit:direct-exchange>

   

    <!--

         queues:监听的队列,多个的话用逗号(,)分隔

        ref:监听器

     -->

    <!-- 配置监听  acknowledeg = "manual"   设置手动应答  当消息处理失败时:会一直重发  直到消息处理成功  -->

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">

    <!-- 配置监听器 -->

        <rabbit:listener queues="mq.A" ref="listenerA"/>

        <rabbit:listener queues="mq.B" ref="listenerB"/>

        <rabbit:listener queues="mq.C" ref="listenerC"/>

    </rabbit:listener-container>

</beans>

 

生产者:Spring提供的AmqpTemplate,使用注解注入便可使用

@Autowired

private AmqpTemplate amqpTemplate;

publicvoid sendMsg(String msg) {

    amqpTemplate.convertAndSend("routingKey", "发送了消息A");

}

 

监听器:即收到队列的消息后做何处理,要实现ChannelAwareMessageListener

例如:监听器listenerA

@Component

publicclass ListenerA implements ChannelAwareMessageListener {

 

      privatefinalstatic Log logger = LogFactory.getLog(ListenerA.class);

   

    @Override

    publicvoid onMessage(Message message, Channel channel) throws Exception {

       // TODO Auto-generated method stub

       String msg = new String(message.getBody());

       System.out.println("A received : " + msg);

       logger.error(msg);

    }

}

 

 

 

9、持久化机制

有可能遇到程序崩溃或者Rabbit服务器宕机的状况,那么若是没有持久化机制,全部数据都会丢失。

交换器持久化

Durable:是否持久化参数设为True便可

channel.exchangeDeclare(exchangeName, type, true);

队列持久化channel.queueDeclare("A",true,false,false,null).getQueue();

消息持久化

在以前,消息分发给consumer后当即就会被标记为已消费,这时候若是consumber接到了一个消息可是尚未来的及处理就异常退出,那么这个消息的状态是已被消费的,因而就会形成消息丢失的问题。

 

处理的代码也很简单,一共有两个步骤。第一个把autoAck改为false

//消费结果须要进行确认

 channel.BasicConsume("firstTest", false, consumer);

第二部分就是在咱们消费完成后进行确认

//进行交付,肯定此消息已经处理完成

channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);

若是没有进行确认queue会把这个消息交给其它的consumer去处理,若是没有交付的代码,那么这个消息会一直存在。

 

消息持久化步骤:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

                    throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props,        byte[] body)

                    throws IOException;

exchange表示exchange的名称
routingKey
表示routingKey的名称
body
表明发送的消息体

MessageProperties.PERSISTENT_TEXT_PLAIN  能够设置为持久化,类型为文本

MessageProperties.PERSISTENT_BASIC    类型为二进制数据

mandatorymandatory标志位设置为true时,若是exchange没法找到一个队列取转发,就返回给生产者。

immediateimmediate标志位设置为true时,若是exchange要转发的队列上没有消费者时,就返回给生产者。

10、消息确认机制

概述

RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,那么有没有高效的解决方式呢?答案是采用Confirm模式。

producerconfirm模式的实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID,这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ackmultiple域,表示到这个序列号以前的全部消息都已经获得了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。

channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack

确认机制的三种实现

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。其实是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

普通confirm

//开启Procedure确认机制

channel.confirmSelect();

//发布消息

channel.basicPublish(exchangeName, routingKey,null,message);

//消息发送成功的确认,也能够设置超时时间

if (channel.waitForConfirms([long timeOut]) {

    System.out.println("send success...");

} else {

    System.out.println("send failed...");

}

批量Confirm

批量发送消息后再进行确认。

异步Confirm

//待确认的序列

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

//开启确认机制

channel.confirmSelect();

//添加处理事件

channel.addConfirmListener(new ConfirmListener() {

    publicvoid handleAck(longdeliveryTag, booleanmultiple) throws IOException {

       if (multiple) {

           confirmSet.headSet(deliveryTag + 1).clear();

       } else {

           confirmSet.remove(deliveryTag);

       }

       System.out.println("发送成功...");

    }

    publicvoid handleNack(longdeliveryTag, booleanmultiple) throws IOException {

       System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);

       if (multiple) {

           confirmSet.headSet(deliveryTag + 1).clear();

       } else {

           confirmSet.remove(deliveryTag);

       }

       System.err.println("发送失败...");

    }

});

 

//发送消息

for (inti = 0; i < 10; i++) {

    //获取下个发送序号

    longnextSeqNo = channel.getNextPublishSeqNo();

    channel.basicPublish(exchangeName, routingKey, null, ("车票ID:" + i).getBytes());

    //加入待处理集合中

    confirmSet.add(nextSeqNo);

    //休息0.2s

    Thread.sleep(200);

}

 

 

 

 

 

 

Consumer端的确认

自动确认,默认是自动确认,即获取消息后,直接确认。

手动确认,给当前消息设置状态,当手动ack后服务端才会删除该消息,若是返回nack,从新入队。

//手动确认

booleanautoAck = false;

       channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {

           @Override

           publicvoid handleDelivery(String consumerTag,

                  Envelope envelope,  

                  AMQP.BasicProperties properties,

                  byte[] body) throws IOException {

              //…其余处理操做

longdeliveryTag = envelope.getDeliveryTag();

              //确认消息

              channel.basicAck(deliveryTag, false);

           }

       });

 

附件:

https://blog.csdn.net/wangshuminjava/article/details/80998992

深刻解读RabbitMQ工做原理及简单使用

https://blog.csdn.net/boonya/article/details/64904706

RabbitMQ深刻学习指导

 

一、什么是消息队列?

概念:

消息队列(Message Queue,简称MQ),本质是个队列,FIFO先入先出,只不过队列中存放的内容是一些Message

二、为何要用消息队列,应用场景?

不一样系统、进程或者线程直接进行通讯。

 

系统解耦,将要作的部分放入队列,便于模块分离。

传统模式

 

使用队列解耦

 

 

缓冲功能,当有大量请求要处理时,能够先入队而后依次处理,保证系统可靠性。

例如:淘宝秒杀活动等。

 

 

异步操做,有时候为了快速响应,可使用队列来实现异步,好比邮箱验证,手机验证码发送等。

 

三、消息队列的几种模式。

3-1、简单模式(单生产者单消费者)

            

一个线程负责生产,一个线程负责总队列里取出来消费。

3-2、单生产者多消费者

 

一个线程生产,多个线程取出来消费。

3-3、订阅/发布模式

 

 一个发布者发送消息,多个订阅者能够同时获取到发布的消息

3-4、路由模式

 

3-5、主题模式,按规则模糊匹配

 

3-5、广播模式

生产者发送的消息会发往每一个与其绑定的队列。

 

4RabbitMQ的几个组成部分

AMQP协议: Advanced Message Queuing Protocol 高级消息队列协议,是一个异步消息传递所使用的应用层协议规范。

 

组成:

         服务主机:接收客户端请求,并做出相应处理

         虚拟主机:一个服务器能够开启多个Virtual Host,每一个虚拟主机都能提供完整的服务,有本身的权限控制。

         生产者:发送消息

         消费者:接收消息并处理

         交换器:RabbitMQ中,消息不是直接发往队列,而是要先给交换器,而后交换器按照必定的路由规则发送到相对应的队列上。

         路由Key:发送消息时要指定交换器和路由Key

         绑定:将队列和交换器绑定起来路由Key做为绑定时的关键字

         队列:消息的载体,消费者从队列中获取消息,路由器根据路由规则把消息发往对应的队列。

         消息:队列中存储的信息单元。

5、工做原理

生产者发送消息时指定交换器名和路由Key,而后交换器根据路由Key与绑定信息进行比对,找到对应的队列后将信息发送出去。

消费者监听某个队列,若是有消息就取出来作对应操做,没有就阻塞。


 

 

6、交换器的几个工做模式

Direct:固定名称匹配,只有路由Key与绑定的Key一致才会将消息发送到该队列。

Topic:主题模式,路由Key能够用*#来填充

 

#能够匹配任意多个单词,*只能匹配一个单词

好比bingdKey  x.y  x.y.z  a.y  a.b.z

x.*只能匹配x.y,而x.#能够匹配x.y x.y.z

Fanout:广播模式

7Demo

安装RabbitMQ,并启动服务。默认用户名密码guest。建立VirtualHost

7-1、单生产者单消费者

publicclass Recv {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "hello-exchange";

       channel.exchangeDeclare(exchangeName, "direct", true);

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       String routingKey = "hola";

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, routingKey);

 

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

    }

}

publicclass Send {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "hello-exchange";

        channel.exchangeDeclare(exchangeName, "direct", true);

 

        String routingKey = "hola";

        //发布消息

        byte[] messageBodyBytes = "quit".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

 

    }

}

 

7-2、单生产者多消费者

11的相似,只不过两个消费者共同消费一个队列内的信息,复制一份消费者便可。

7-3、订阅/发布模式

生产者发送的消息,发往每一个订阅他的消费者那里。全部消费者均可以获取相同的信息。

发布者A

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "王力宏";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //发布消息

        byte[] messageBodyBytes = "王力宏发布的消息:啦啦啦啦啦".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

发布者B

publicclass Send2 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "赵薇";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //发布消息

        byte[] messageBodyBytes = "赵薇发布的消息:啊啊啊啊".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

订阅者A1A2

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "王力宏";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

               }

           });

       }

    }

}

订阅者B1B2

publicclass Recv3 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       channel.exchangeDeclare("赵薇", "fanout", true);

       //声明交换器

       String exchangeName = "赵薇";

       //声明队列

       String queueName = channel.queueDeclare().getQueue();

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

 

}

 

7-4、主题模式

生产者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "topic-exchange";

        channel.exchangeDeclare(exchangeName, "topic", true);

 

        //

        String routingKey = "#.B";

       

        //发布消息

        for (inti = 0; i < 3; i++) {

            channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes());

       }

        //byte[] messageBodyBytes = "匹配消息".getBytes();

        //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();

        conn.close();

    }

}

消费者A

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //声明交换器

       String exchangeName = "topic-exchange";

       channel.exchangeDeclare(exchangeName, "topic", true);

       //声明队列

       String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();

      

       String routingKey = "X.A";

       //绑定队列,经过键 hola将队列和交换器绑定起来

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消费消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消费的路由键:" + routingKey);

                  System.out.println("消费的内容类型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //确认消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消费的消息体内容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

              }

           });

       }

    }

}

消费者BroutingKey routingKey.A

消费者CroutingKey  routingKey.B

7-5、广播模式,不须要管routingKeybindingKey是否匹配。

生产者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立链接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //设置 RabbitMQ 地址

        factory.setHost("localhost");

        //创建到代理服务器到链接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //声明交换器

        String exchangeName = "fanout-exchange";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //

        String routingKey = "hola";

       

        //发布消息

        byte[] messageBodyBytes = "群发消息".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

消费者

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //创建到代理服务器到链接

       Connection conn = factory.newConnection();

       //