一:RocketMQ总体理解与快速实战

2022年05月15日 阅读数:4
这篇文章主要向大家介绍一:RocketMQ总体理解与快速实战,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

目录java

 

1、MQ介绍linux

一、什么是MQ?为何要用MQ?git

二、MQ的优缺点程序员

三、几大MQ产品特色比较github

2、RocketMQ快速实战面试

一、下载RocketMQ 4.7.1版本apache

二、快速安装RocketMQ编程

三、 快速运行RocketMQbash

3.1 启动NameServer微信

3.2 启动Broker

3.3 命令行快速验证

3.4 关闭RocketMQ服务

3、RocketMQ集群架构

一、RocketMQ集群中的各个角色

二、RocketMQ集群搭建

三、RocketMQ的其余参考资料

总结


1、MQ介绍

一、什么是MQ?为何要用MQ?

MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,而后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。

MQ的做用主要有如下三个方面:

  • 异步

    例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只须要把快递放到菜鸟驿站,就能够继续发其余快递去了。客户再按本身的时间安排去菜鸟驿站取快递。

    做用:异步能提升系统的响应速度、吞吐量。

  • 解耦

    例子:《Thinking in JAVA》很经典,可是都是英文,咱们看不懂,因此须要编辑社,将文章翻译成其余语言,这样就能够完成英语与其余语言的交流。

    做用:

    一、服务之间进行解耦,才能够减小服务之间的影响。提升系统总体的稳定性以及可扩展性。

    二、另外,解耦后能够实现数据分发。生产者发送一个消息后,能够由一个或者多个消费者进行消费,而且消费者的增长或者减小对生产者没有影响。

  • 削峰

    例子:长江每一年都会涨水,可是下游出水口的速度是基本稳定的,因此会涨水。引入三峡大坝后,能够把水储存起来,下游慢慢排水。

    做用:以稳定的系统资源应对突发的流量冲击。

二、MQ的优缺点

​ 上面MQ的所用也就是使用MQ的优势。 可是引入MQ也是有他的缺点的:

  • 系统可用性下降

系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就须要考虑如何保证MQ的高可用。

  • 系统复杂度提升

引入MQ后系统的复杂度会大大提升。之前服务之间能够进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。而且还会带来其余一些问题。好比:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。

  • 消息一致性问题

A系统处理完业务,经过MQ发送消息给B、C系统进行后续的业务处理。若是B系统处理成功,C系统处理失败怎么办?这就须要考虑如何保证消息数据处理的一致性。

三、几大MQ产品特色比较

​ 经常使用的MQ产品包括Kafka、RabbitMQ和RocketMQ。咱们对这三个产品作下简单的比较,重点须要理解他们的适用场景。

MQ产品比较

另外,关于这三大产品更详细的比较,能够参见《kafka vs rabbitmq vs rocketmq

2、RocketMQ快速实战

​ RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等不少高并发场景的考验,可以处理亿万级别的消息。2016年开源后捐赠给Apache,如今是Apache的一个顶级项目。

​ 目前RocketMQ在阿里云上有一个购买便可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。咱们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,可是大致上功能是同样的。

​ RocketMQ的官网地址: http://rocketmq.apache.org ,github地址是 https://github.com/apache/rocketmq ,当前最新的版本是4.7.1。咱们就用这个4.7.1版原本进行学习。

一、下载RocketMQ 4.7.1版本

​ RocketMQ运行版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

​ RocketMQ源码版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip

或者从个人资源下载:网速会快一点

RocketMQ安装包.zip

RocketMQ源码包.zip

​ 这两个版本咱们都下载下来。

二、快速安装RocketMQ

​ RocketMQ的安装很是简单,就是上传解压就能够了。

​ 而后咱们准备一台CentOS7的Linux机器,快速把RocketMQ给运行起来。我使用的Linux版本以下:

[oper@worker1 jdk1.8]$ uname -a
Linux worker1 3.10.0-1127.el7.x86_64 #1 SMP Tue Mar 31 23:36:51 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

 咱们须要建立一个操做用户用来运行本身的程序,与root用户区分开。使用root用户建立一个oper用户,并给他建立一个工做目录。

[root@worker1 ~]# useradd oper
[root@worker1 ~]# passwd oper 
设置用户密码
[root@worker1 ~]# mkdir /app
[root@worker1 ~]# chown oper:oper /app

​ 运行RocketMQ须要先安装JDK。咱们采用目前最稳定的JDK1.8版本。CentOS能够采用课件资料中的jdk-8u171-linux-x64.tar.gz,也能够自行去Oracle官网上下载。而后用FTP上传到oper用户的工做目录下。由oper用户解压到/app/jdk1.8目录下。

[oper@worker1 tools]$ tar -zxvf jdk-8u171-linux-x64.tar.gz
[oper@worker1 tools]$ mv jdk1.8.0_171/ /app/jdk1.8

配置环境变量。使用 vi ~/.bash_profile编辑文件,在下面加入如下内容:

export JAVA_HOME=/app/jdk1.8/
PATH=$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

​ 编辑完成后,执行 source ~/.bash_profile让环境变量生效。输入java -version能查看到如下内容代表JDK安装成功了。

[oper@worker1 ~]$ java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

而后咱们把下载的rocketmq-all-4.7.1-bin-release.zip在本地完成解压,并上传到/app/rocketmq目录。完成后,把rocketmq的bin目录也配置到环境变量当中。 vi ~/.bash_profile,加入如下内容,并执行source ~/.bash_profile让环境变量生效:

export JAVA_HOME=/app/jdk1.8/
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

​ 这样RocketMQ就安装完成了。咱们把他运行起来。

这个ROCKETMQ_HOME的环境变量是必需要单独配置的,若是不配置的话,启动NameSever和Broker都会报错。

这个环境变量的做用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf之外的几个配置文件。因此实际状况中,能够不按这个配置,可是必定要能找到配置文件。

三、 快速运行RocketMQ

​ 运行以前,咱们须要对RocketMQ的组件结构有个大体的了解。

RocketMQ组件

​ RocketMQ由如下这几个组件组成

  • NameServer : 提供轻量级的Broker路由服务。
  • Broker:实际处理消息存储、转发等服务的核心组件。
  • Producer:消息生产者集群。一般是业务系统中的一个功能模块。
  • Consumer:消息消费者集群。一般也是业务系统中的一个功能模块。

因此咱们要启动RocketMQ服务,须要先启动NameServer。

3.1 启动NameServer

​ 启动NameServer很是简单, 在$ROCKETMQ_HOME/bin目录下有个mqnamesrv。直接执行这个脚本就能够启动RocketMQ的NameServer服务。

​ 可是要注意,RocketMQ默认预设的JVM内存是4G,这是RocketMQ给咱们的最佳配置。可是一般咱们用虚拟机的话都是不够4G内存的,因此须要调整下JVM内存大小。修改的方式是直接修改runserver.sh。 用vi runserver.sh编辑这个脚本,在脚本中找到这一行调整内存大小为512M

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

​ 而后咱们用静默启动的方式启动NameServer服务:

nohup bin/mqnamesrv & 

​ 启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。而且使用jps指令能够看到有一个NamesrvStartup进程。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS
collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

3.2 启动Broker

​ 启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,若是内存不够,一样须要调整下JVM内存。vi runbroker.sh,找到这一行,进行内存调整

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

​ 而后咱们须要找到$ROCKETMQ_HOME/conf/broker.conf, vi指令进行编辑,在最下面加入一个配置:

autoCreateTopicEnable=true

​ 而后也以静默启动的方式启动runbroker.sh

nohup ./mqbroker &

​ 启动完成后,一样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 而且jps指令能够看到一个BrokerStartup进程。

The broker[worker1, 192.168.232.128:10911] boot success. serializeType=JSON

在观察runserver.sh和runbroker.sh时,咱们还能够查看到其余的JVM执行参数,这些参数均可以进行定制。例如咱们观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1垃圾回收器。 关于垃圾回收器的知识你还记得吗?

3.3 命令行快速验证

​ 在RocketMQ的安装包中,提供了一个tools.sh工具能够用来在命令行快速验证RocketMQ服务。咱们在worker2上进入RocketMQ的安装目录:

首先须要配置一个环境变量NAMESRV_ADDR指向咱们启动的NameServer服务。

export NAMESRV_ADDR='localhost:9876'	

而后启动消息生产者发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

咱们能够看到发送消息的日志:

.....
SendResult [sendStatus=SEND_OK, msgId=C0A8E88007AC3764951D891CE9A003E7, offsetMsgId=C0A8E88000002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=worker1, queueId=1], queueOffset=249]
14:59:33.418 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:59:33.423 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.232.128:10911] result: true

这日志中,上面部分就是咱们发送的消息的内容。后面两句标识消息生产者正常关闭。

而后启动消息消费者接收消息:

bin/tools.sh  org.apache.rocketmq.example.quickstart.Consumer

启动后,能够看到消费到的消息。

......
ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=worker1, queueId=2, storeSize=203, queueOffset=53, sysFlag=0, bornTimestamp=1606460371999, bornHost=/192.168.232.128:43436, storeTimestamp=1606460372000, storeHost=/192.168.232.128:10911, msgId=C0A8E88000002A9F000000000000A7AE, commitLogOffset=42926, bodyCRC=1968636794, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1606460450150, UNIQ_KEY=C0A8E88007AC3764951D891CE41F00D4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 49, 50], transactionId='null'}]] 

日志中MessageExt后的整个内容就是一条完整的RocketMQ消息。咱们要对这个消息的结构有个大概的了解,后面会对这个消息进行深刻的理解。

其中比较关键的属性有:brokerName,queueId,msgId,topic,cluster,tags,body,transactionId。先找下这些属性在哪里。

而这个Consume指令并不会结束,他会继续挂起,等待消费其余的消息。咱们可使用CTRL+C中止该进程。

3.4 关闭RocketMQ服务

要关闭RocketMQ服务能够经过mqshutdown脚本直接关闭

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

3、RocketMQ集群架构

​ 刚才的演示中,咱们已经体验到了RocketMQ是如何工做的。这样,咱们回头看RocketMQ的集群架构,就可以有更全面的理解了。

RocketMQ组件

一、RocketMQ集群中的各个角色

一个完整的RocketMQ集群中,有以下几个角色

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者能够发送消息给一个或者多个Topic;一个消息的接收者能够订阅一个或者多个Topic消息

咱们以前的测试案例中,Topic是什么?topic='TopicTest'

如今你能看懂咱们以前在broker.conf中添加的autoCreateTopicEnable=true这个属性的用处了吗?

  • Message Queue:至关因而Topic的分区;用于并行发送和接收消息

在咱们以前的测试案例中,一个queueId就表明了一个MessageQueue。有哪些queueId? 0,1,2,3四个MessageQueue,你都找到了吗?

二、RocketMQ集群搭建

为了方便阅读,RocketMQ集群以及RocketMQ配套的管理页面rocketmq-console的搭建写到了另一个文档中。参见《RocketMQ集群搭建

三、RocketMQ的其余参考资料

还记得咱们以前把RocketMQ的源代码也下载下来了吗?咱们如今不须要去看源代码,可是在源码中有个docs目录,里面有很是有用的资料。例如,在他的docs/cn/architecture.md文档中,有对RocketMQ架构的更详细的介绍。这里面的内容就再也不搬运了,咱们直接看看把。

总结

​ 到这里,咱们能够完整的搭建RocketMQ,并进行简单的使用了。

​ 首先,咱们要对MQ的优缺点以及适用场景开始要有逐渐清晰的概念。成熟的MQ产品上手使用都很简单,因此,使用和面试的重点历来都不会是怎么编程,而是能结合项目场景完整落地,这才是考验程序员功力的地方。而这个功力的要点就在于对异步消息驱动场景的理解深度。这一部分的学习最好可以结合kafka、RabbitMQ和RocketMQ这几个产品一块儿进行横向对比。固然,没有基础的同窗也不用着急,可是在之后的学习中要有这个意识。

​ 而后,咱们要对RocketMQ总体的产品架构以及应用生态有个大体的了解。商业版本的RocketMQ提供了购买即用的高可用特性,而且功能也比开源版本略有改进。而在RocketMQ的开源版本以外,围绕RocketMQ的扩展生态包括管理控制台,大都整合在了rocketmq-externals社区项目中。关于RocketMQ的周边生态,其实跟kafka和RabbitMQ仍是有差距的,可是RocketMQ相比这两个产品,无论是开发语言仍是架构思惟,对咱们都更为友好,并且周边生态发展也有后发优点,因此对RocketMQ要抱着学习,改进的态度,从点到面横向拓宽技术视野。

​ 最后,咱们要对RocketMQ的总体架构有一个全面的了解。而且在后续的细节学习时,要保持对第一个问题的好奇心。