RocketMQ - 整体介绍
简介
- RocketMQ是一款分布式、队列模型的消息中间件。
- 反对集群模型、负载平衡、程度扩大能力。
- 采纳零拷贝的原理、程序写盘、随机读。
- 代码优良,底层通信框架应用 Netty 。
- 强调集群无单点,可扩大,任意一点高可用,程度可扩大。
- 音讯失败重试机制、音讯可查问。
RcoketMQ 是一款低提早、高牢靠、可伸缩、易于应用的消息中间件,具备以下个性:
- 反对公布/订阅(Pub/Sub)和点对点(P2P)音讯模型。
- 在一个队列中牢靠的先进先出(FIFO)和严格的程序传递。
- 反对拉(pull)和推(push)两种音讯模式。
- 繁多队列百万音讯的沉积能力。
- 反对多种音讯协定,如 JMS、MQTT 等。
- 分布式高可用的部署架构,满足至多一次消息传递语义。
- 提供 docker 镜像用于隔离测试和星散群部署。
- 提供配置、指标和监控等功能丰富的 Dashboard。
概念模型
Producer:音讯生产者,负责生产音讯,个别由业务零碎负责产生音讯。
Consumer:音讯消费者,负责生产音讯,个别是后盾零碎负责异步生产。
Push Consumer:Consumer的一种,须要向Consumer对象注册监听。
Pull Consumer:Consumer的一种,须要被动申请Broker拉取音讯。
Producer Group:生产者汇合,个别用于发送一类音讯。
Consumer Group:消费者汇合,个别用于接管一类音讯进行生产。
Broker:MQ音讯服务(直达角色,用于音讯存储于生产音讯转发)。
环境搭建
环境:JDK8、Centos7、RocketMQ 4.3
首先咱们编辑Hosts
vim /etc/hosts
退出上面两句话,批改为你本人的ip。
192.168.3.160 rocketmq-nameserver1192.168.3.160 rocketmq-master1
随后咱们将RocketMQ tar.gz传入服务器。
传入之后咱们创立文件夹。
# 创立文件夹mkdir /usr/local/apache-rocketmq# 而后解压tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq# 建设软连贯ln -s apache-rocketmq rocketmq
创立存储门路。
mkdir /usr/local/rocketmq/storemkdir /usr/local/rocketmq/store/commitlogmkdir /usr/local/rocketmq/store/consumequeuemkdir /usr/local/rocketmq/store/index
批改配置文件。
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
brokerClusterName=rocketmq-cluster#broker 名字,留神此处不同的配置文件填写的不一样brokerName=broker-a#0 示意 Master,>0 示意 SlavebrokerId=0#nameServer 地址,分号宰割 肯定要和咱们配置的hosts里的雷同namesrvAddr=rocketmq-nameserver1:9876#在发送音讯时,主动创立服务器不存在的 topic,默认创立的队列数defaultTopicQueueNums=4#是否容许 Broker 主动创立 Topic,倡议线下开启,线上敞开autoCreateTopicEnable=true#是否容许 Broker 主动创立订阅组,倡议线下开启,线上敞开autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件工夫点,默认凌晨 4 点deleteWhen=04#文件保留工夫,默认 48 小时fileReservedTime=120#commitLog 每个文件的大小默认 1GmapedFileSizeCommitLog=1073741824#ConsumeQueue 每个文件默认存 30W 条,依据业务状况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储门路storePathRootDir=/usr/local/rocketmq/store#commitLog 存储门路storePathCommitLog=/usr/local/rocketmq/store/commitlog#生产队列存储门路存储门路storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#音讯索引存储门路storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储门路storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储门路abortFile=/usr/local/rocketmq/store/abort#限度的音讯大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制 Master#- SYNC_MASTER 同步双写 Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘形式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉音讯线程池数量#pullMessageThreadPoolNums=128
批改日志配置文件。
mkdir -p /usr/local/rocketmq/logscd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
批改启动脚本参数。
vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================# 开发环境 JVM Configuration#==============================================================================JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -XX:MaxPermSize=320m"
启动 NameServer
cd /usr/local/rocketmq/binnohup sh mqnamesrv &# 应用jps查看[root@localhost bin]# jps22321 NamesrvStartup22335 Jps
启动BrokerServer
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &# jps查看[root@localhost bin]# jps22321 NamesrvStartup22535 Jps22440 BrokerStartup
控制台应用
下载代码:https://github.com/apache/roc...
关上rocketmq-console。
批改properties
rocketmq.config.namesrvAddr=192.168.3.160:9876
启动代码,拜访localhost:8080
RocketMQ - 急速入门
生产者应用
首先咱们创立一个SpringBoot我的项目,引入RocketMQ依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version></dependency>
创立一个Producer类
import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class Producer { public static final String NAME_SRV_ADDR = "192.168.3.160:9876"; public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name"); producer.setNamesrvAddr(NAME_SRV_ADDR); producer.start(); for (int i = 0; i < 5; i++) { //1.创立音讯 Message message = new Message("test_quick_topic",//主题 "TagA_" + i, // 标签 "KeyA_" + i, //用户自定义key,惟一标识 "Hello RocketMQ".getBytes());//音讯内容实体 //2.发送音讯 SendResult result = producer.send(message); System.out.println("音讯发送后果:" + result); } producer.shutdown(); }}
点击运行后,咱们能够在控制台看到5条后果曾经发送胜利。
咱们关上web治理界面能够看到明天有五条音讯进来。
并且在Message里咱们能够看到五条音讯。
消费者应用
创立一个Consumer类
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.remoting.common.RemotingHelper;/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class Consumer { public static final String NAME_SRV_ADDR = "192.168.3.160:9876"; public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name"); consumer.setNamesrvAddr(NAME_SRV_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最初端开始生产 consumer.subscribe("test_quick_topic",//订阅的主题 "*");// *代表蕴含所有 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { MessageExt messageExt = msgs.get(0); try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}
点击运行后,咱们能够在控制台看到曾经收到5条后果。
音讯失败重试
上面咱们测试一下音讯发送失败的状况。
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class Consumer { public static final String NAME_SRV_ADDR = "192.168.3.160:9876"; public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name"); consumer.setNamesrvAddr(NAME_SRV_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最初端开始生产 consumer.subscribe("test_quick_topic",//订阅的主题 "*");// *代表蕴含所有 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { MessageExt messageExt = msgs.get(0); try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); if (keys.equals("KeyA_1")) { int i = 1 / 0; // 抛出异样 } String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody); } catch (Exception e) { e.printStackTrace(); int reconsumeTimes = messageExt.getReconsumeTimes(); // 失败次数 System.out.println("失败音讯已被重发次数:"+ reconsumeTimes); if(reconsumeTimes == 3){ // 记录日志... // 弥补机制... return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}
咱们重新启动Producer和Consumer。
能够看到RocketMQ会在一段时间距离后从新发送此音讯,直到达到三次咱们进行SUCCESS做日志或者弥补机制。
四种集群环境构建详解
Name Server
Name Server是一个简直无状态节点,可集群部署,节点之间无任何信息同步。
Broker
Broker部署绝对简单,Broker分为Master与Slave,一个Master能够对应多个Slave,然而一个Slave只能对应一个Master,Master与Slave的对应关系通过指定雷同的Broker Name,不同的Broker Id来定义,BrokerId为0示意Master,非0示意Slave。Master也能够部署多个。
每个Broker与Name Server集群中的所有节点建设长连贯,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连贯,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连贯。
Producer
Producer与Name Server集群中的其中一个节点(随机抉择)建设长连贯,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建设长连贯,且定时向Master发送心跳。Producer齐全无状态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新状况,这意味着如果Broker不可用,Producer最多30s可能感知,在此期间内发往Broker的所有音讯都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连贯,如果Broker在2分钟内没有收到心跳数据,则敞开与Producer的连贯。
Consumer
Consumer与Name Server集群中的其中一个节点(随机抉择)建设长连贯,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建设长连贯,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅音讯,也能够从Slave订阅音讯,订阅规定由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列状况,这意味着Broker不可用时,Consumer最多最须要30s能力感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连贯,若某个连贯2分钟内没有发送心跳数据,则敞开连贯;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,而后持续生产。
当Consumer失去master宕机告诉后,转向slave生产,slave不能保障master的音讯100%都同步过去了,因而会有大量的音讯失落。然而一旦master复原,未同步过来的音讯会被最终生产掉。
集群模式之-单点模式
这种模式很显著,一旦节点挂掉,整体服务就不可用了。
集群模式之-主从模式
多Master多Slave模式,同步双写(NM-NS,SYNC)
- 每个Master装备一个Slave,共有多对Master-Slave,HA采纳同步双写机制,主从都写入音讯胜利后,再向利用返回ACK。
- 长处:数据与服务都无单点故障问题,Master宕机状况下,音讯无提早,服务可用性和数据可用性都十分高。
- 毛病:性能比异步复制略低,大略低10%,发送单个音讯的RT会略高。目前宕机状况下,从节点不能主动切换成主节点,后续会反对主动切换性能。
多Master多Slave模式,异步复制(NM-NS,ASYNC)
- 每个Master装备一个Slave,共有多对Master-Slave,HA采纳异步复制形式,主从有短暂音讯提早,毫秒级别。
- 长处:即便磁盘损坏,音讯的失落也非常少,而且音讯的实时性不会受到影响,因为Master宕机后,消费者依然能够从Slave中生产音讯,此过程对利用齐全通明,不须要人工干预,性能同多Master模式简直一样。
- 毛病:Master宕机后,如果磁盘呈现损坏,可能失落大量音讯。
集群模式之-双主模式
双Master模式/多Master模式(2M)
- 一个集群无Slave,全是Master,例如2个Master或者3个Master。
- 长处:配置简略,单个Master宕机或者重启对利用无影响,在磁盘配置为RAID10时,即便机器宕机不可复原状况下,因为RAID10磁盘可靠性十分高,音讯也不会失落(异步刷盘失落大量音讯,同步刷盘齐全不失落),性能最高。
- 毛病:单台机器宕机时,这台机器上未被生产的音讯在机器复原之前不可订阅,音讯的实时性会受到影响。
另外还有双主双从模式、多主多从模式。
主从集群模式搭建
主节点:192.168.3.160
从节点:192.168.3.161
首先关上主节点
vim /etc/hosts
减少161节点数据
192.168.3.161 rocketmq-nameserver2192.168.3.161 rocketmq-master1-slave
而后将4条数据复制到161节点的hosts文件中。
接着咱们把tar.gz复制到161。
scp apache-rocketmq.tar.gz 192.168.3.161:/usr/local/
还是之前的操作
# 创立文件夹mkdir /usr/local/apache-rocketmq# 而后解压tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq# 建设软连贯ln -s apache-rocketmq rocketmq
创立存储门路。
mkdir /usr/local/rocketmq/storemkdir /usr/local/rocketmq/store/commitlogmkdir /usr/local/rocketmq/store/consumequeuemkdir /usr/local/rocketmq/store/index
批改日志配置文件。
mkdir -p /usr/local/rocketmq/logscd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
批改启动脚本参数。
vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================# 开发环境 JVM Configuration#==============================================================================JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -XX:MaxPermSize=320m"
接下来批改配置
咱们进入160服务器。
cd /usr/local/rocketmq/conf/2m-2s-async
首先咱们批改
vim broker-a.properties
减少2节点的地址。
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
随后咱们批改
vim broker-a-s.properties
brokerClusterName=rocketmq-cluster#broker 名字,留神此处不同的配置文件填写的不一样brokerName=broker-a#0 示意 Master,>0 示意 SlavebrokerId=1#nameServer 地址,分号宰割 肯定要和咱们配置的hosts里的雷同namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在发送音讯时,主动创立服务器不存在的 topic,默认创立的队列数defaultTopicQueueNums=4#是否容许 Broker 主动创立 Topic,倡议线下开启,线上敞开autoCreateTopicEnable=true#是否容许 Broker 主动创立订阅组,倡议线下开启,线上敞开autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件工夫点,默认凌晨 4 点deleteWhen=04#文件保留工夫,默认 48 小时fileReservedTime=120#commitLog 每个文件的大小默认 1GmapedFileSizeCommitLog=1073741824#ConsumeQueue 每个文件默认存 30W 条,依据业务状况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储门路storePathRootDir=/usr/local/rocketmq/store#commitLog 存储门路storePathCommitLog=/usr/local/rocketmq/store/commitlog#生产队列存储门路存储门路storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#音讯索引存储门路storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储门路storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储门路abortFile=/usr/local/rocketmq/store/abort#限度的音讯大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制 Master#- SYNC_MASTER 同步双写 Master#- SLAVEbrokerRole=SLAVE#刷盘形式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉音讯线程池数量#pullMessageThreadPoolNums=128
批改实现后保留,而后执行复制命令拷贝到161节点。
scp broker-a.properties broker-a-s.properties 192.168.3.161:/usr/local/rocketmq/conf/2m-2s-async/
查看没有问题后,咱们回到160节点。
进入bin目录,进行启动,同时161节点同样。
nohup sh mqnamesrv &
随后启动Broker
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
[root@localhost bin]# jps4642 NamesrvStartup4761 BrokerStartup4777 Jps
切换到161,咱们执行上面命令,留神启动的配置是broker-a-s.properties
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
[root@localhost bin]# jps23377 NamesrvStartup23524 Jps23414 BrokerStartup
随后批改咱们的web我的项目配置。减少161地址。
rocketmq.config.namesrvAddr=192.168.3.160:9876;192.168.3.161:9876
重启后查看Cluster。能够看到曾经有两个节点。
主从模式高可用机制故障演练
Producer类:
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class Producer { public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name"); producer.setNamesrvAddr(NAME_SRV_ADDR); producer.start(); //1.创立音讯 Message message = new Message("test_quick_topic",//主题 "TagA_", // 标签 "KeyA_", //用户自定义key,惟一标识 "Hello RocketMQ".getBytes());//音讯内容实体 //2.发送音讯 SendResult result = producer.send(message); System.out.println("音讯发送后果:" + result); producer.shutdown(); }}
Consumer类:
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class Consumer { public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name"); consumer.setNamesrvAddr(NAME_SRV_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最初端开始生产 consumer.subscribe("test_quick_topic",//订阅的主题 "*");// *代表蕴含所有 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { MessageExt messageExt = msgs.get(0); try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}
咱们启动Producer类。查看控制台。没有问题。
音讯发送后果:SendResult [sendStatus=SEND_OK, msgId=C0A803A5174918B4AAC284383C9C0000, offsetMsgId=C0A803A000002A9F0000000000000FE0, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=3]
这个时候咱们进行160主节点。
sh mqshutdown broker
能够看到目前只有slave节点。
随后咱们启动Consumer类。能够看到音讯仍然能够被生产。