乐趣区

关于java:深入学习RocketMQ之快速入门

RocketMQ – 整体介绍

简介

  • RocketMQ 是一款分布式、队列模型的消息中间件。
  • 反对集群模型、负载平衡、程度扩大能力。
  • 采纳零拷贝的原理、程序写盘、随机读。
  • 代码优良,底层通信框架应用 Netty。
  • 强调集群无单点,可扩大,任意一点高可用,程度可扩大。
  • 音讯失败重试机制、音讯可查问。

RcoketMQ 是一款低提早、高牢靠、可伸缩、易于应用的消息中间件,具备以下个性:

  1. 反对公布 / 订阅(Pub/Sub)和点对点(P2P)音讯模型。
  2. 在一个队列中牢靠的先进先出(FIFO)和严格的程序传递。
  3. 反对拉(pull)和推(push)两种音讯模式。
  4. 繁多队列百万音讯的沉积能力。
  5. 反对多种音讯协定,如 JMS、MQTT 等。
  6. 分布式高可用的部署架构, 满足至多一次消息传递语义。
  7. 提供 docker 镜像用于隔离测试和星散群部署。
  8. 提供配置、指标和监控等功能丰富的 Dashboard。

概念模型

Producer:音讯生产者,负责生产音讯,个别由业务零碎负责产生音讯。

Consumer:音讯消费者,负责生产音讯,个别是后盾零碎负责异步生产。

Push Consumer:Consumer 的一种,须要向 Consumer 对象注册监听。

Pull Consumer:Consumer 的一种,须要被动申请 Broker 拉取音讯。

Producer Group:生产者汇合,个别用于发送一类音讯。

Consumer Group:消费者汇合,个别用于接管一类音讯进行生产。

Broker:MQ 音讯服务(直达角色,用于音讯存储于生产音讯转发)。

环境搭建

环境:JDK8、Centos7、RocketMQ 4.3

首先咱们编辑 Hosts

vim /etc/hosts

退出上面两句话,批改为你本人的 ip。

192.168.3.160 rocketmq-nameserver1
192.168.3.160 rocketmq-master1

随后咱们将 RocketMQ tar.gz 传入服务器。

传入之后咱们创立文件夹。

# 创立文件夹
mkdir /usr/local/apache-rocketmq
# 而后解压
tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq
# 建设软连贯
ln -s apache-rocketmq rocketmq

创立存储门路。

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

批改配置文件。

vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
brokerClusterName=rocketmq-cluster
#broker 名字,留神此处不同的配置文件填写的不一样
brokerName=broker-a
#0 示意 Master,>0 示意 Slave
brokerId=0
#nameServer 地址,分号宰割 肯定要和咱们配置的 hosts 里的雷同
namesrvAddr=rocketmq-nameserver1:9876
#在发送音讯时,主动创立服务器不存在的 topic,默认创立的队列数
defaultTopicQueueNums=4
#是否容许 Broker 主动创立 Topic,倡议线下开启,线上敞开
autoCreateTopicEnable=true
#是否容许 Broker 主动创立订阅组,倡议线下开启,线上敞开
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件工夫点,默认凌晨 4 点
deleteWhen=04
#文件保留工夫,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,依据业务状况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储门路
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储门路
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#生产队列存储门路存储门路
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#音讯索引存储门路
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储门路
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储门路
abortFile=/usr/local/rocketmq/store/abort
#限度的音讯大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘形式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉音讯线程池数量
#pullMessageThreadPoolNums=128

批改日志配置文件。

mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

批改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================
# 开发环境 JVM Configuration
#==============================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g 
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -
XX:MaxPermSize=320m"

启动 NameServer

cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &
# 应用 jps 查看
[root@localhost bin]# jps
22321 NamesrvStartup
22335 Jps

启动 BrokerServer

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
# jps 查看
[root@localhost bin]# jps
22321 NamesrvStartup
22535 Jps
22440 BrokerStartup

控制台应用

下载代码:https://github.com/apache/roc…

关上 rocketmq-console。

批改 properties

rocketmq.config.namesrvAddr=192.168.3.160:9876

启动代码,拜访 localhost:8080

RocketMQ – 急速入门

生产者应用

首先咱们创立一个 SpringBoot 我的项目,引入 RocketMQ 依赖。

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.3.0</version>
</dependency>

创立一个 Producer 类

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class Producer {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

    public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        producer.setNamesrvAddr(NAME_SRV_ADDR);
        producer.start();
        for (int i = 0; i < 5; i++) {
            //1. 创立音讯
            Message message = new Message("test_quick_topic",// 主题
                    "TagA_" + i, // 标签
                    "KeyA_" + i, // 用户自定义 key, 惟一标识
                    "Hello RocketMQ".getBytes());// 音讯内容实体
            //2. 发送音讯
            SendResult result = producer.send(message);
            System.out.println("音讯发送后果:" + result);
        }
        producer.shutdown();}

}

点击运行后,咱们能够在控制台看到 5 条后果曾经发送胜利。

咱们关上 web 治理界面能够看到明天有五条音讯进来。

并且在 Message 里咱们能够看到五条音讯。

消费者应用

创立一个 Consumer 类

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;


/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class Consumer {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

    public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
        consumer.setNamesrvAddr(NAME_SRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从最初端开始生产
        consumer.subscribe("test_quick_topic",// 订阅的主题
                "*");// * 代表蕴含所有
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {MessageExt messageExt = msgs.get(0);
            try {String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                String keys = messageExt.getKeys();
                String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ", tags :" + tags + ", keys :" + keys + ", msgBody:" + msgBody);
            } catch (Exception e) {e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();}

}

点击运行后,咱们能够在控制台看到曾经收到 5 条后果。

音讯失败重试

上面咱们测试一下音讯发送失败的状况。

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class Consumer {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

    public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
        consumer.setNamesrvAddr(NAME_SRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从最初端开始生产
        consumer.subscribe("test_quick_topic",// 订阅的主题
                "*");// * 代表蕴含所有
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {MessageExt messageExt = msgs.get(0);
            try {String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                String keys = messageExt.getKeys();
                if (keys.equals("KeyA_1")) {int i = 1 / 0; // 抛出异样}
                String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ", tags :" + tags + ", keys :" + keys + ", msgBody:" + msgBody);
            } catch (Exception e) {e.printStackTrace();
                int reconsumeTimes = messageExt.getReconsumeTimes(); // 失败次数
                System.out.println("失败音讯已被重发次数:"+ reconsumeTimes);
                if(reconsumeTimes == 3){
                    // 记录日志...
                    // 弥补机制...
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();}

}

咱们重新启动 Producer 和 Consumer。

能够看到 RocketMQ 会在一段时间距离后从新发送此音讯,直到达到三次咱们进行 SUCCESS 做日志或者弥补机制。

四种集群环境构建详解

Name Server

Name Server 是一个简直无状态节点,可集群部署,节点之间无任何信息同步。

Broker

Broker 部署绝对简单,Broker 分为 Master 与 Slave,一个 Master 能够对应多个 Slave,然而一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定雷同的 Broker Name,不同的 Broker Id 来定义,BrokerId 为 0 示意 Master,非 0 示意 Slave。Master 也能够部署多个。

每个 Broker 与 Name Server 集群中的所有节点建设长连贯,定时 (每隔 30s) 注册 Topic 信息到所有 Name Server。Name Server 定时 (每隔 10s) 扫描所有存活 broker 的连贯,如果 Name Server 超过 2 分钟没有收到心跳,则 Name Server 断开与 Broker 的连贯。

Producer

Producer 与 Name Server 集群中的其中一个节点 (随机抉择) 建设长连贯,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建设长连贯,且定时向 Master 发送心跳。Producer 齐全无状态,可集群部署。

Producer 每隔 30s(由 ClientConfig 的 pollNameServerInterval)从 Name server 获取所有 topic 队列的最新状况,这意味着如果 Broker 不可用,Producer 最多 30s 可能感知,在此期间内发往 Broker 的所有音讯都会失败。

Producer 每隔 30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔 10s 中扫描所有存活的连贯,如果 Broker 在 2 分钟内没有收到心跳数据,则敞开与 Producer 的连贯。

Consumer

Consumer 与 Name Server 集群中的其中一个节点 (随机抉择) 建设长连贯,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建设长连贯,且定时向 Master、Slave 发送心跳。Consumer 既能够从 Master 订阅音讯,也能够从 Slave 订阅音讯,订阅规定由 Broker 配置决定。

Consumer 每隔 30s 从 Name server 获取 topic 的最新队列状况,这意味着 Broker 不可用时,Consumer 最多最须要 30s 能力感知。

Consumer 每隔 30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔 10s 扫描所有存活的连贯,若某个连贯 2 分钟内没有发送心跳数据,则敞开连贯;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,而后持续生产。

当 Consumer 失去 master 宕机告诉后,转向 slave 生产,slave 不能保障 master 的音讯 100% 都同步过去了,因而会有大量的音讯失落。然而一旦 master 复原,未同步过来的音讯会被最终生产掉。

集群模式之 - 单点模式

这种模式很显著,一旦节点挂掉,整体服务就不可用了。

集群模式之 - 主从模式

多 Master 多 Slave 模式,同步双写(NM-NS,SYNC)

  • 每个 Master 装备一个 Slave,共有多对 Master-Slave,HA 采纳同步双写机制,主从都写入音讯胜利后,再向利用返回 ACK。
  • 长处:数据与服务都无单点故障问题,Master 宕机状况下,音讯无提早,服务可用性和数据可用性都十分高。
  • 毛病:性能比异步复制略低,大略低 10%,发送单个音讯的 RT 会略高。目前宕机状况下,从节点不能主动切换成主节点,后续会反对主动切换性能。

多 Master 多 Slave 模式,异步复制(NM-NS,ASYNC)

  • 每个 Master 装备一个 Slave,共有多对 Master-Slave,HA 采纳异步复制形式,主从有短暂音讯提早,毫秒级别。
  • 长处:即便磁盘损坏,音讯的失落也非常少,而且音讯的实时性不会受到影响,因为 Master 宕机后,消费者依然能够从 Slave 中生产音讯,此过程对利用齐全通明,不须要人工干预,性能同多 Master 模式简直一样。
  • 毛病:Master 宕机后,如果磁盘呈现损坏,可能失落大量音讯。

集群模式之 - 双主模式

双 Master 模式 / 多 Master 模式(2M)

  • 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master。
  • 长处:配置简略,单个 Master 宕机或者重启对利用无影响,在磁盘配置为 RAID10 时,即便机器宕机不可复原状况下,因为 RAID10 磁盘可靠性十分高,音讯也不会失落(异步刷盘失落大量音讯,同步刷盘齐全不失落),性能最高。
  • 毛病:单台机器宕机时,这台机器上未被生产的音讯在机器复原之前不可订阅,音讯的实时性会受到影响。

另外还有双主双从模式、多主多从模式。

主从集群模式搭建

主节点:192.168.3.160

从节点:192.168.3.161

首先关上主节点

vim /etc/hosts

减少 161 节点数据

192.168.3.161 rocketmq-nameserver2
192.168.3.161 rocketmq-master1-slave

而后将 4 条数据复制到 161 节点的 hosts 文件中。

接着咱们把 tar.gz 复制到 161。

scp apache-rocketmq.tar.gz 192.168.3.161:/usr/local/

还是之前的操作

# 创立文件夹
mkdir /usr/local/apache-rocketmq
# 而后解压
tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq
# 建设软连贯
ln -s apache-rocketmq rocketmq

创立存储门路。

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

批改日志配置文件。

mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

批改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================
# 开发环境 JVM Configuration
#==============================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g 
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -
XX:MaxPermSize=320m"

接下来批改配置

咱们进入 160 服务器。

cd /usr/local/rocketmq/conf/2m-2s-async

首先咱们批改

vim broker-a.properties

减少 2 节点的地址。

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

随后咱们批改

vim broker-a-s.properties
brokerClusterName=rocketmq-cluster
#broker 名字,留神此处不同的配置文件填写的不一样
brokerName=broker-a
#0 示意 Master,>0 示意 Slave
brokerId=1
#nameServer 地址,分号宰割 肯定要和咱们配置的 hosts 里的雷同
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送音讯时,主动创立服务器不存在的 topic,默认创立的队列数
defaultTopicQueueNums=4
#是否容许 Broker 主动创立 Topic,倡议线下开启,线上敞开
autoCreateTopicEnable=true
#是否容许 Broker 主动创立订阅组,倡议线下开启,线上敞开
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件工夫点,默认凌晨 4 点
deleteWhen=04
#文件保留工夫,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,依据业务状况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储门路
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储门路
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#生产队列存储门路存储门路
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#音讯索引存储门路
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储门路
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储门路
abortFile=/usr/local/rocketmq/store/abort
#限度的音讯大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘形式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉音讯线程池数量
#pullMessageThreadPoolNums=128

批改实现后保留,而后执行复制命令拷贝到 161 节点。

scp broker-a.properties broker-a-s.properties 192.168.3.161:/usr/local/rocketmq/conf/2m-2s-async/

查看没有问题后,咱们回到 160 节点。

进入 bin 目录,进行启动,同时 161 节点同样。

nohup sh mqnamesrv &

随后启动 Broker

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
[root@localhost bin]# jps
4642 NamesrvStartup
4761 BrokerStartup
4777 Jps

切换到 161,咱们执行上面命令,留神启动的配置是broker-a-s.properties

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
[root@localhost bin]# jps
23377 NamesrvStartup
23524 Jps
23414 BrokerStartup

随后批改咱们的 web 我的项目配置。减少 161 地址。

rocketmq.config.namesrvAddr=192.168.3.160:9876;192.168.3.161:9876

重启后查看 Cluster。能够看到曾经有两个节点。

主从模式高可用机制故障演练

Producer 类:

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class Producer {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

    public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        producer.setNamesrvAddr(NAME_SRV_ADDR);
        producer.start();
        //1. 创立音讯
        Message message = new Message("test_quick_topic",// 主题
                "TagA_", // 标签
                "KeyA_", // 用户自定义 key, 惟一标识
                "Hello RocketMQ".getBytes());// 音讯内容实体
        //2. 发送音讯
        SendResult result = producer.send(message);
        System.out.println("音讯发送后果:" + result);
        producer.shutdown();}

}

Consumer 类:

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class Consumer {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

    public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
        consumer.setNamesrvAddr(NAME_SRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从最初端开始生产
        consumer.subscribe("test_quick_topic",// 订阅的主题
                "*");// * 代表蕴含所有
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {MessageExt messageExt = msgs.get(0);
            try {String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                String keys = messageExt.getKeys();
                String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ", tags :" + tags + ", keys :" + keys + ", msgBody:" + msgBody);
            } catch (Exception e) {e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();}

}

咱们启动 Producer 类。查看控制台。没有问题。

音讯发送后果:SendResult [sendStatus=SEND_OK, msgId=C0A803A5174918B4AAC284383C9C0000, offsetMsgId=C0A803A000002A9F0000000000000FE0, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=3]

这个时候咱们进行 160 主节点。

sh mqshutdown broker

能够看到目前只有 slave 节点。

随后咱们启动 Consumer 类。能够看到音讯仍然能够被生产。

退出移动版