吐血总结——消息队列之RocketMQ知识梳理

2022年05月15日 阅读数:3
这篇文章主要向大家介绍吐血总结——消息队列之RocketMQ知识梳理,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

摘要

在说消息队列以前,咱们要明白为啥须要消息队列,知乎上有一篇文章写的不错,连接: 为何要使用消息队列?html

消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,所以在进行一下梳理,加深一下印象。java

RocketMQ概述

RocketMQ为分布式消息中间件,其高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽可能命中PageCache),高可靠性在于刷盘机制和Master/Slave,另外NameServer若是所有挂掉都不会影响已经运行的Broker,Producer,Consumer。web

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具备如下特性:docker

一、支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
二、在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
三、支持拉(pull)和推(push)两种消息模式
四、单一队列百万消息的堆积能力
五、支持多种消息协议,如 JMS、MQTT 等
六、分布式高可用的部署架构,知足至少一次消息传递语义
七、提供 docker 镜像用于隔离测试和云集群部署
八、提供配置、指标和监控等功能丰富的Dashboard数据库

那么首先先明白NameServer、Broker、Producer、Consumer都是什么,其实后面当在分析Kafka的时候,就会发现两点很像,由于RocketMQ是阿里根据Kafka架构进行的自研开发,在一些功能结构上面保留了Kafka的一些特性。首先咱们先看一下RocketMQ的架构图:express

在这里插入图片描述
从图中就可以看出来这是一个双主双从的集群模式结构图,咱们根据这个图进行一下分析:apache

NameServer

为producer 和 consumer 提供路由信息,记录broker与topic的关系,而后在这个基础上对Broker进行每十秒的监测,判断Broker是否依然存活。
其优势以下:服务器

1、可集群部署
2、相互之间独立,没有通讯,没必要保障节点间的数据强一致性
3、其余角色同时向多个NameServer机器上报状态信息
4、自己是无状态的,NameServer中的BrokerTopic等状态信息不会持久存储,由各个角色定时上报并存储到内存中的(NameServer支持配置参数的持久化,通常用不到)
5、采用每30s心跳机制
6、长链接持续提供给ProducerConsumer Topic信息
7、存储当前集群全部的Broker信息、TopicBroker的对应关系(
1)broker的基本信息(ip port等)2)主题topic的地址信息3)broker集群信息4)存活的broker信息5)filter 过滤器
)
8、只作集群元数据存储和心跳工做,功能简单,稳定性高
9、多机热备,单台NameServer宕机不影响其余NameServer工做
10、每一个NameServer注册的信息都是同样的,并且是当前系统中的全部broker的元数据信息

须要注意的是,即便整个NameServer集群宕机了,已经正常工做的Producer、Consumer、Broker仍然能正常工做,但新起的Producer、Consumer、Broker就没法工做。架构

Broker(Master):

MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护,同时用来消息存储和生产消费转发。app

1、单个Broker跟全部Namesrv保持心跳请求,心跳间隔为302、心跳请求中包括当前Broker信息(IP+端口等)以及存储全部topic信息
3RocketMQ消息代理服务器主节点,负责接收Producer发送的消息、消息存储、Consumer拉取消息;
BrokerSlave):
RocketMQ消息代理服务器备份节点,主要是经过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;

Producer:

1、负责生产消息,通常由业务系统负责生产消息。
2、一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
3RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
4、同步和异步方式均须要Broker返回确认信息,单向发送不须要

Consumer:

1、负责消费消息,通常是后台系统负责异步消费。
2、一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
3、从用户应用的角度而言提供了两种消费形式:拉取式消费、推进式消费(其实都是拉取,保持长链接)

在这里插入图片描述

Queue

主要用于支持点对点的消息传递模式,即生产者将消息发送至队列,队列存在于Broker中,消息者从该队列中取出消息。这种传递方式的特色是,一个队列能够被多个生产者或消费者共用,可是某条消息一旦被某消费者取出,它将再也不存在于队列中。即一条消息只能传递给一个消费者。

Topic(主题):

1、主要用于发布/订阅的传递模式。
2、生产者可将消息发布到Topic中,该Topic可由多个消费者订阅,全部订阅该Topic的消费者都能收到生产者发布的消息。
3、全部订阅的消费者都接收消息后,消息才会从Topic中移除。即一条消息能够传递给多个消费者。
4、来表明一种数据的集合,Topic 并不具备真正的属性,它只是一类数据的集合,不一样类型的数据咱们应该放到不一样的 Topic5Topic 会分布式的进行存储;

Topic与Broker的对应关系
在这里插入图片描述

注意: 当咱们真正使用 MQ 时,第一步应该老是先建立一些 Topic,做为数据集合存放不一样类型的消息,其实本质上来说和使用数据库时老是先建立表结构是同样的。

什么是长轮询机制?

Consumer从消息队列获取消息的方式主要有两种:pull和push。两种都有一些问题,好比说pull的状况下,有时候可能致使消息在服务端堆积,消息处理延时较高,有时候又可能由于消息队列中没有消息而致使空拉取,形成资源浪费,而在push的状况下,可能致使超出客户端压力,形成客户端卡死甚至宕机。因而,把pull和push相结合,获得了长轮询。
长轮询的机制是由客户端发起pull请求,服务端接收到客户端的请求后,若是发现队列中没有消息,并不当即返回,而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息,若是有,则用现有链接将消息返回给客户端,若是一段时间内仍是没有新消息,则返回空。长轮询机制的好处在于,其本质仍是pull,因此,消息处理的主动权仍是在客户端手中,客户端就能够根据本身的能力去作消息处理。而服务端持有请求一段时间的机制又很大程序的避免了空拉取,减小了资源的浪费。可是,这种机制也有必定问题,当客户端数量过多时,服务端可能在时间段内须要持有过多的链接,这种请求下,也会对服务端形成压力。不过,通常来讲,消息队列的承压能力仍是比较可靠的,再加上集群的保障,基本不用担忧这个问题。
在这里插入图片描述

Rocketmq的工做流程是怎样的?

一、先启动Namesrv,Namesrver启动后监听端口,等待Broker、Produer、Consumer链接上来,至关于一个路由控制中心。
二、Broker启动,跟全部的Namesrver保持长链接,每30s发送一次心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储全部topic信息。注册成功后,Namesrv集群中就有Topic跟Broker的映射关系。
三、收发消息前,先建立topic,建立topic时须要指定该topic要存储在哪些Broker上。也能够在发送消息时自动建立Topic。
四、Producer启动而且发送消息,启动时先跟Namesrv集群中的其中一台创建长链接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,而后跟对应的Broker建长链接,直接向Broker发消息。
五、Consumer跟Producer相似,跟其中一台Namesrv创建长链接,获取当前订阅Topic存在哪些Broker,而后直接跟Broker创建链接通道,开始消费消息。

RocketMq使用哪一种方式消费消息,pull仍是push?

RocketMq提供两种方式:pull和push进行消息的消费 而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的 push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感受好像消息是被推送过来的 其实想一想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息确定都是须要消费者主动去拉的喽~

后面知识点太多了 仍是本身看吧,有时间再总结!!!

RocketMQ中的延迟消息

RocketMQ-延时消息Demo及实现原理分析

RocketMQ的消费模式

细谈RocketMQ的消费模式

RocketMQ 的核心 NameServer

不要和陌生人说话,消息中间件之 Topic

还在纠结秒杀?看看 MQ 如何搞定

为何要使用消息中间件?

《吃透MQ系列》核心基础全在这里了,一文啃透!

分布式消息队列

RocketMQ相关问题研究

Broker 主从同步机制

RocketMQ刷盘机制

RocketMQ练习:

若是以集群的形式来进行演示的话一定会使得我电脑不堪重负,因此我采用单机的模式来进行。
@rocketMQ配置
106.12.50.23 rocketmq-nameserver-1
106.12.50.23 rocketmq-master-1
18.191.180.186 rocketmq-nameserver-2
18.191.180.186 rocketmq-master-2

broker-a.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

启动命令

sed -i 's#${user.home}#/usr/local/mq/rocketmq#g' *.xml
启动nameserver
nohup sh mqnamesrv &
启动broker
nohup sh mqbroker -c  /usr/local/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
jps

单机模式启动配置broker-a.properties

# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不一样的配置文件填写的不同
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver-1:9876
# 在发送消息时,自动建立服务器不存在的Topic,默认建立的队列数
defaultTopicQueueNums=4
# 是否容许Broker 自动建立Topic, 建议线下开启, 线上关闭
autoCreateTopicEnable=true
# 是否容许Broker 自动建立订阅组, 建议线下开启, 线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每一个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每一个文件默认存30w条, 根据业务状况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/mq/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/mq/rocketmq/store/commitlog
# 消息队列储存路径
storePathConsumeQueue=/usr/local/mq/rocketmq/store/consumequeue
# 消息索引粗存路径
storePathIndex=/usr/local/mq/rocketmq/store/index
# checkpoint 文件储存路径
storeCheckpoint=/usr/local/mq/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/mq/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH