乐趣区

关于java:RocketMQ负载均衡和广播模式

RocketMQ 的消费者生产音讯,有 2 种模式:

  • 负载平衡模式
  • 播送模式

首先咱们先启动一个生产者:发送了 10 条音讯,主题是 TopicTest,tag 是 TagA

public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        // 设置生产者组
        defaultMQProducer.setProducerGroup("syncProducer");

        // 设置 nameserver
        defaultMQProducer.setNamesrvAddr("localhost:9876");

        // 启动生产者
        defaultMQProducer.start();

        for (int i = 0; i < 10; i++) {
            // 构建音讯 topic tag 内容
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ" +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 同步发送,且返回后果
            SendResult sendResult = defaultMQProducer.send(msg);
            System.out.println("发送后果"+sendResult);
        }
        // 敞开生产者
        defaultMQProducer.shutdown();}
}
// 运行后果
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
发送后果 SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

接下来,咱们启动 2 个消费者,别离用负载平衡模式和播送模式去进行生产信息:

负载平衡(或叫集群模式)

public class ClusterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");

        consumer.setNamesrvAddr("localhost:9876");

        // 设置集群模式,也就是负载平衡模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 订阅主题和标签
        consumer.subscribe("TopicTest","TagA");


        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("生产信息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();}
}
// 运行 2 个实例,后果如下:实例 1:生产信息:Hello RocketMQ 3
生产信息:Hello RocketMQ 2
生产信息:Hello RocketMQ 7
生产信息:Hello RocketMQ 6

实例 2:生产信息:Hello RocketMQ 1
生产信息:Hello RocketMQ 0
生产信息:Hello RocketMQ 4
生产信息:Hello RocketMQ 5
生产信息:Hello RocketMQ 8
生产信息:Hello RocketMQ 9

播送模式

public class BoardConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");

        consumer.setNamesrvAddr("localhost:9876");

        // 设置集群模式,也就是负载平衡模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 订阅主题和标签
        consumer.subscribe("TopicTest","TagA");


        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("生产信息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();}
// 运行 2 个实例,后果如下:实例 1:生产信息:Hello RocketMQ 0
生产信息:Hello RocketMQ 1
生产信息:Hello RocketMQ 2
生产信息:Hello RocketMQ 3
生产信息:Hello RocketMQ 5
生产信息:Hello RocketMQ 9
生产信息:Hello RocketMQ 4
生产信息:Hello RocketMQ 6
生产信息:Hello RocketMQ 7
生产信息:Hello RocketMQ 8


实例 2:生产信息:Hello RocketMQ 2
生产信息:Hello RocketMQ 3
生产信息:Hello RocketMQ 1
生产信息:Hello RocketMQ 0
生产信息:Hello RocketMQ 5
生产信息:Hello RocketMQ 9
生产信息:Hello RocketMQ 4
生产信息:Hello RocketMQ 8
生产信息:Hello RocketMQ 7
生产信息:Hello RocketMQ 6

明天分享了消费者负载平衡模式和播送模式。
在生产中,个别都是用负载平衡模式。播送模式比拟少用。但还是得具体场景具体分析。

后续文章

  • RocketMQ- 入门(已更新)
  • RocketMQ- 音讯发送(已更新)
  • RocketMQ- 生产信息
  • RocketMQ- 消费者的播送模式和集群模式(已更新)
  • RocketMQ- 程序音讯
  • RocketMQ- 提早音讯
  • RocketMQ- 批量音讯
  • RocketMQ- 过滤音讯
  • RocketMQ- 事务音讯
  • RocketMQ- 音讯存储
  • RocketMQ- 高可用
  • RocketMQ- 高性能
  • RocketMQ- 主从复制
  • RocketMQ- 刷盘机制
  • RocketMQ- 幂等性
  • RocketMQ- 音讯重试
  • RocketMQ- 死信队列

欢送各位入 (guan) 股(zhu),后续文章干货多多。

退出移动版