乐趣区

关于rocketmq:RocketMQ一RocketMQ简介

RocketMQ 装置

装置配置 jdk8

1. 上传 jdk 压缩文件

将文件 jdk-8u212-linux-x64.tar.gz 上传到 /root 目录

2. 解压缩

执行解压命令

# 将 jdk 解压到 /usr/local/ 目录
tar -xf jdk-8u212-linux-x64.tar.gz -C /usr/local/

# 切换到 /usr/local/ 目录, 显示列表, 查看解压缩的 jdk 目录
cd /usr/local
ll

3. 配置环境变量

批改 /etc/profile 配置文件, 配置环境变量

vim /etc/profile

# 在文件开端增加以下内容:
export JAVA_HOME=/usr/local/jdk1.8.0_212
export PATH=$JAVA_HOME/bin:$PATH

批改完后, 让环境变量立刻失效

source /etc/profile

4. 验证

java -version

----------------------------------------------------------------
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)

javac -version

---------------
javac 1.8.0_212

装置 RocketMQ

1. 下载 rocketmq 二进制文件

wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

2. 解压缩 rocketmq

将 rocketmq 解压到 /usr/local/ 目录

unzip rocketmq-all-4.7.0-bin-release.zip -d /usr/local/

# 批改一下文件夹名,改成 rocketmq 方便使用
mv /usr/local/rocketmq-all-4.7.0-bin-release /usr/local/rocketmq

3. 配置环境变量 ROCKETMQ_HOME 和 PATH

为了后续操作不便能够配置环境变量,之后在任意地位都能够执行 rocketmq 的操作命令。

vim /etc/profile

# 在文件开端增加以下内容:
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH

批改完后, 让环境变量立刻失效

source /etc/profile

4. 减小 rocketmq 应用的内存

rocketmq 须要启动两个服务: name serverbroker, name server 默认配置 JVM 应用的内存是 4g, broker默认配置 JVM 应用的内存是 8g.

开发环境中如果内存不足, 服务可能会无奈启动, 能够通过升高两个服务的内存, 使服务能够失常启动, 也能够节俭内存.

批改 name server 内存改为 256m

cd /usr/local/rocketmq/

# 编辑 bin/runserver.sh
vim bin/runserver.sh

# 找到文件中上面这一行:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

# 将 -Xms4g -Xmx4g -Xmn2g 批改为 -Xms256m -Xmx256m -Xmn128m
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

批改 broker 内存改为 256m

# 编辑 bin/runbroker.sh
vim bin/runbroker.sh

# 找到文件中上面这一行:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

# 将 -Xms8g -Xmx8g -Xmn4g 批改为 -Xms256m -Xmx256m -Xmn128m
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

5. 启动 rocketmq

先启动 name server

# 进入 rocketmq 目录
cd /usr/local/rocketmq/

# 启动 name server
nohup sh bin/mqnamesrv &

# 查看运行日志, 看到 "The Name Server boot success." 示意启动胜利
tail -f ~/logs/rocketmqlogs/namesrv.log

再启动 broker

# 启动 broker, 连贯 name server: localhost:9876
nohup sh bin/mqbroker -n localhost:9876 &

# 查看运行日志, 看到 "The broker[......:10911] boot success." 示意启动胜利
tail -f ~/logs/rocketmqlogs/broker.log

6. 敞开防火墙

rocketmq 的通信会用到多个端口, 为了不便测试咱们敞开防火墙

# 敞开防火墙
systemctl stop firewalld.service

# 禁止防火墙开机启动
systemctl disable firewalld.service

测试

运行测试, 启动生产者发送音讯, 启动消费者接管音讯

# 通过环境变量, 通知客户端程序 name server 的地址
export NAMESRV_ADDR=localhost:9876

# 启动生产者来测试发送音讯
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

# 启动消费者来测试接管音讯
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ 的敞开命令

敞开 broker

mqshutdown broker

敞开 nameserver

mqshutdown namesrv

治理界面

在开源我的项目 rocketmq-externals 中提供了 rocketmq 的治理界面: 地址为: https://github.com/apache/rocketmq-externals

github 在国内拜访迟缓, 也能够应用码云的镜像我的项目, 地址为: https://gitee.com/mirrors/RocketMQ-Externals

1. 克隆我的项目

cd /usr/local/rocketmq/

# 克隆 rocketmq-externals 我的项目
git clone https://gitee.com/mirrors/RocketMQ-Externals

2. maven 打包治理界面我的项目

如果没有装置 maven, 请先执行 maven 装置命令

yum install -y maven

打包治理界面我的项目 rocketmq-console.
打包过程中会下载各种依赖, 比拟迟缓, 请急躁期待

# 进入治理界面我的项目的文件夹
cd RocketMQ-Externals/rocketmq-console

# 执行 maven 打包命令, 执行工夫较长, 请急躁期待
mvn clean package -Dmaven.test.skip=true

3. 运行启动治理界面

打包的 jar 文件在 target 目录, 进入目录执行 jar 文件

# 进入 target 目录
cd target

# 运行治理界面
nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876 &

拜访治理界面:

http://192.168.64.141:8080

RocketMQ 双主双从同步复制集群计划

部署环境

作为测试环境,咱们应用两台虚拟机来部署双主双从环境,具体构造如下:


整个集群由 两个 name server 实例和 四个 broker 实例组成

name server:

  • 两台服务器别离启动两个 name server

broker A 主从:

  • 服务器 1 部署 broker A 主服务
  • 服务器 2 部署 broker A 从服务

broker B 主从:

  • 服务器 2 部署 broker B 主服务
  • 服务器 1 部署 broker B 从服务

装置 Rocketmq

首先参照《RocketMQ (一) 装置》笔记,在两台虚构机上安装 Rocketmq。或在一台虚拟机上装好后进行克隆。

建文件夹

在一台服务器上启动两个 broker 实例,须要为不同实例设置独自的数据存储目录。

为了不便起见,咱们在两台服务器上都创立这四个实例所须要的的目录。

mkdir /usr/local/rocketmq/store/
mkdir /usr/local/rocketmq/store/broker-a
mkdir /usr/local/rocketmq/store/broker-a/commitlog
mkdir /usr/local/rocketmq/store/broker-b
mkdir /usr/local/rocketmq/store/broker-b/commitlog
mkdir /usr/local/rocketmq/store/broker-as
mkdir /usr/local/rocketmq/store/broker-as/commitlog
mkdir /usr/local/rocketmq/store/broker-bs
mkdir /usr/local/rocketmq/store/broker-bs/commitlog

配置

rocketmq/conf 目录下提供了四种集群计划的配置样例

  • 2m-2s-async:双主双从异步复制
  • 2m-2s-sync:双主双从同步复制
  • 2m-noslave:双主
  • dledger:raft 主从切换

这里咱们抉择 双主双从同步复制 计划。

  1. broker-a,a 主服务器配置

服务器 1 批改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a.properties

在样例配置文件中,增加三项配置:

  • listenPort:咱们在一台服务器上要运行两个 broker 实例,所以两个实例的端口要有所辨别。这里 broker- a 主服务器的端口应用默认的 10911。
  • storePathRootDir:数据存储目录
  • storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

listenPort=10911
storePathRootDir=/usr/local/rocketmq/store/broker-a
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
  1. broker-a slave,a 从服务器配置

服务器 2 批改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a-s.properties

在样例配置文件中,增加三项配置:

  • listenPort:咱们在一台服务器上要运行两个 broker 实例,所以两个实例的端口要有所辨别。这里 broker-a slave 从服务器的端口应用 11911。
  • storePathRootDir:数据存储目录
  • storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=11911
storePathRootDir=/usr/local/rocketmq/store/broker-as
storePathCommitLog=/usr/local/rocketmq/store/broker-as/commitlog
  1. broker-b,b 主服务器配置

服务器 2 批改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b.properties

在样例配置文件中,增加三项配置:

  • listenPort:咱们在一台服务器上要运行两个 broker 实例,所以两个实例的端口要有所辨别。这里 broker- b 主服务器的端口应用默认的 10911。
  • storePathRootDir:数据存储目录
  • storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

listenPort=10911
storePathRootDir=/usr/local/rocketmq/store/broker-b
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
  1. broker-b slave,b 从服务器配置

服务器 1 批改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b-s.properties

在样例配置文件中,增加三项配置:

  • listenPort:咱们在一台服务器上要运行两个 broker 实例,所以两个实例的端口要有所辨别。这里 broker-b slave 从服务器的端口应用 11911。
  • storePathRootDir:数据存储目录
  • storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=11911
storePathRootDir=/usr/local/rocketmq/store/broker-bs
storePathCommitLog=/usr/local/rocketmq/store/broker-bs/commitlog

配置要点阐明

  1. 四台服务器的集群名 brokerClusterName 雷同。集群名称雷同的服务器独特组成服务集群。
  2. 从服务器通过名字与主服务器关联在一起,brokerName 与主服务器雷同。
  3. brokerId为 0 是主服务器。从服务器的值是非零值,例如如果有四个从服务器,他们的 brokerId 应该是 1,2,3,4。
  4. brokerRole的值为 SYNC_MASTER 是同步复制的主服务器。如果是 ASYNC_MASTER 则为异步复制的主服务器。
  • 同步复制:音讯复制到从服务器后才向生产者发回反馈信息。
  • 异步复制:音讯发到主服务器就向生产者发回反馈信息,之后再向从服务器复制。

启动

  1. 启动两个 name server

在两台服务器上启动两个 name server,它们不必做任何集群的配置,都是作为独立服务运行,它们之间也不会进行数据复制。

所有 broker 服务启动后,要同时连贯这两个 name server,向两个 name server 进行注册。

在两台服务器上都启动 name server

nohup sh mqnamesrv &
  1. 启动 broker a 的主从两台服务器

在服务器 1 上启动 broker a 主服务器

参数阐明:

  • - n 参数:指定 name server 地址列表,多个地址用分号分隔
  • - c 参数:指定配置文件,应用指定的配置文件启动 broker
nohup sh mqbroker 
-n '192.168.64.151:9876;192.168.64.152:9876' 
-c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties 
&

在服务器 2 上启动 broker a 从服务器

nohup sh mqbroker 
-n '192.168.64.151:9876;192.168.64.152:9876' 
-c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties 
&
  1. 启动 broker b 的主从两台服务器

在服务器 2 上启动 broker b 主服务器

nohup sh mqbroker 
-n '192.168.64.151:9876;192.168.64.152:9876' 
-c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties 
&

在服务器 1 上启动 broker b 从服务器

nohup sh mqbroker 
-n '192.168.64.151:9876;192.168.64.152:9876' 
-c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties 
&

查看启动的服务

在两台服务器上别离查看 java 过程,确认两台服务器上是否各启动了三个 java 过程,别离运行 name server 和两个 broker。

# 查看 java 过程
jps 

---------------------
12081 NamesrvStartup
15745 BrokerStartup
15595 BrokerStartup
16655 Jps

启动治理界面

# 进入 rocketmq-console 我的项目打包文件目录
cd /usr/local/rocketmq/RocketMQ-Externals/rocketmq-console/target/

# 启动治理界面
nohup java -jar rocketmq-console-ng-1.0.1.jar 
--server.port=8080 
--rocketmq.config.namesrvAddr='192.168.64.151:9876;192.168.64.152:9876' 
&

查看集群状态

RocketMQ 基本原理

Topic 基本原理

在 Rocketmq 集群中新建 Topic1

在治理界面中新建 主题 Topic1,为了不便察看测试成果,这里把 写队列 读队列 的数量都设置成 3。

这样,在 broker-a 和 broker-b 上都创立了 Topic1 主题,并各创立了 3 写 3 读队列,共 6 写 6 读,如下图所示:


你也能够批改 Topic1 别离配置 broker-a 和 borker-b 上的队列数量。

perm 参数的含意

perm 参数是设置队列的读写权限,上面表格列出了可配置的值及其含意:

取值 含意
6 同时开启读写
4 禁写
2 禁读

Topic 收发音讯原理


生产者将音讯发送到 Topic1 的其中一个 写队列 ,消费者从对应的一个 读队列 接管音讯。

生产者的负载平衡


生产者以 轮询 的形式向所有写队列发送音讯,这些队列可能会散布在多个 broker 实例上。

消费者的负载平衡

一个 group 中的多个消费者,能够以负载平衡的形式来接管音讯。

读取队列 被平均调配给这些消费者,它们从指定的队列来接管音讯。队列的调配能够采纳不同的策略,这里简略介绍以下三种策略:

AllocateMessageQueueAveragely 平均分配

这是默认策略,它是这样调配队列的:

AllocateMessageQueueAveragelyByCircle 环形调配

如果应用环形调配,在消费者的代码中须要设置调配策略,代码如下:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());

这种调配策略的逻辑很简略,所有 0 号队列分给 0 号消费者,所有 1 号队列分给 1 号消费者,以此类推。

AllocateMessageQueueConsistentHash 一致性哈希

如果应用一致性哈希算法进行调配,在消费者的代码中须要设置调配策略,代码如下:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueConsistentHash());

这种算法依附一致性哈希算法,看以后消费者能够落到哪个虚构节点,该虚构节点对应哪个队列。

问题

思考一下,如果写队列比读队列多会怎么?反之会怎么?

NameServer 基本原理

NameServer 是 rocketmq 本人开发的一个轻型注册核心,他的作用相当于是 zk、eureka 等。

rocketmq 为什么不应用 zk 呢?实际上 rocketmq 的晚期版本应用的就是 zookeeper。

而 rocketmq 的架构设计决定了只须要一个轻量级的元数据服务器就足够了。杀鸡焉用牛刀?小区里,搞个货架就行了,建个仓库,又占中央,保护老本又高。

甚至,NameServer 都不须要有一个集群的管理者。以至于,NameServer 看起来都不像一个集群。事实上,NameServer 实质上来看,也不是一个集群。因为它的各个节点是独立的,不相干的。每个 NameServer 都是独立和 Producer、Consumer 打交道。

根本意识

  1. NameServer 次要用于存储 Topic,Broker 关系信息,性能简略,稳定性高。
  2. 各个 NameServer 节点之间不相干,不须要通信,单台宕机不影响其它节点。
  3. NameServer 集群整体宕机不影响已建设关系的 Concumer,Producer,Broker。

Broker、Producer、Consumer 与 NameServer 的通信

  1. 每个 Borker 和所有 NameServer 放弃长连贯,心跳距离为 30 秒。每次心跳时还会携带以后的 Topic 信息。当某个 Broker 两分钟之内没有心跳,则认为该 Broker 下线,并调整内存中与该 Broker 相干的 Topic 信息。
  2. Consumer 从 NameServer 取得 Topic 的路由信息,与对应的 Broker 建设长连贯。距离 30 秒发送心跳至 Broker。Broker 查看若发现某 Consumer 两分钟内无心跳则认为该 Consumer 下线,并告诉该 Consumer 所有的消费者集群中的其余实例,触发该消费者集群从新负载平衡。
  3. Producer 与消费者一样,也是从 NameServer 取得 Topic 的路由信息,与对应的 Broker 建设长连贯,30 秒发送一次心跳。Broker 也会认为两分钟内没有心跳的 Producer 下线。
退出移动版