RocketMQ的消费者生产音讯,有2种模式:
- 负载平衡模式
- 播送模式
首先咱们先启动一个生产者:发送了10条音讯,主题是TopicTest,tag是TagA
public class SyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); //设置生产者组 defaultMQProducer.setProducerGroup("syncProducer"); //设置nameserver defaultMQProducer.setNamesrvAddr("localhost:9876"); //启动生产者 defaultMQProducer.start(); for (int i = 0; i < 10; i++) { //构建音讯 topic tag 内容 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //同步发送,且返回后果 SendResult sendResult = defaultMQProducer.send(msg); System.out.println("发送后果"+sendResult); } //敞开生产者 defaultMQProducer.shutdown(); }}//运行后果发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]发送后果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
接下来,咱们启动2个消费者,别离用负载平衡模式和播送模式去进行生产信息:
负载平衡(或叫集群模式)
public class ClusterConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); //设置集群模式,也就是负载平衡模式 consumer.setMessageModel(MessageModel.CLUSTERING); //订阅主题和标签 consumer.subscribe("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("生产信息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}//运行2个实例,后果如下:实例1:生产信息:Hello RocketMQ 3生产信息:Hello RocketMQ 2生产信息:Hello RocketMQ 7生产信息:Hello RocketMQ 6实例2:生产信息:Hello RocketMQ 1生产信息:Hello RocketMQ 0生产信息:Hello RocketMQ 4生产信息:Hello RocketMQ 5生产信息:Hello RocketMQ 8生产信息:Hello RocketMQ 9
播送模式
public class BoardConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); //设置集群模式,也就是负载平衡模式 consumer.setMessageModel(MessageModel.BROADCASTING); //订阅主题和标签 consumer.subscribe("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("生产信息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }//运行2个实例,后果如下:实例1:生产信息:Hello RocketMQ 0生产信息:Hello RocketMQ 1生产信息:Hello RocketMQ 2生产信息:Hello RocketMQ 3生产信息:Hello RocketMQ 5生产信息:Hello RocketMQ 9生产信息:Hello RocketMQ 4生产信息:Hello RocketMQ 6生产信息:Hello RocketMQ 7生产信息:Hello RocketMQ 8实例2:生产信息:Hello RocketMQ 2生产信息:Hello RocketMQ 3生产信息:Hello RocketMQ 1生产信息:Hello RocketMQ 0生产信息:Hello RocketMQ 5生产信息:Hello RocketMQ 9生产信息:Hello RocketMQ 4生产信息:Hello RocketMQ 8生产信息:Hello RocketMQ 7生产信息:Hello RocketMQ 6
明天分享了消费者负载平衡模式和播送模式。
在生产中,个别都是用负载平衡模式。播送模式比拟少用。但还是得具体场景具体分析。
后续文章
- RocketMQ-入门(已更新)
- RocketMQ-音讯发送(已更新)
- RocketMQ-生产信息
- RocketMQ-消费者的播送模式和集群模式(已更新)
- RocketMQ-程序音讯
- RocketMQ-提早音讯
- RocketMQ-批量音讯
- RocketMQ-过滤音讯
- RocketMQ-事务音讯
- RocketMQ-音讯存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-音讯重试
- RocketMQ-死信队列
...
欢送各位入(guan)股(zhu),后续文章干货多多。