共计 3274 个字符,预计需要花费 9 分钟才能阅读完成。
RocketMQ 生产端
明天要来跟大家学习怎么应用 RocketMQ 来进行音讯的生产
先简略创立个 Maven 我的项目应用
- 增加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
-
启动消费者
package mq.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class BroadcastConsumer {public static void main(String[] args) throws MQClientException { // 创立一个 push 模式的生产组 DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumer"); pushConsumer.setNamesrvAddr("localhost:9876"); // 集群模式 pushConsumer.setMessageModel(MessageModel.CLUSTERING); // 订阅的 topic tag pushConsumer.subscribe("topic_test01","Tag1 || Tag2"); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf(Thread.currentThread().getName() + "Receive New Messages:" + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); pushConsumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }
-
启动生产者
package mq.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class SyncProducerV2 { /** * 同步音讯发送 * * @param args * @throws MQClientException * @throws MQBrokerException * @throws RemotingException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {System.out.println("SyncProducer start......"); DefaultMQProducer defaultMQProducer = new DefaultMQProducer("pg_sync_01"); defaultMQProducer.setNamesrvAddr("localhost:9876"); defaultMQProducer.start(); for (int i = 0; i < 10; i++) {send(defaultMQProducer, i, i % 3); } defaultMQProducer.shutdown(); System.out.println("SyncProducer end......"); } private static void send(DefaultMQProducer defaultMQProducer, Integer i, int tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {SendResult sendResult = defaultMQProducer.send(new Message("topic_test01", "Tag" + tag, ("hello this is sync message_" + i + "!").getBytes(RemotingHelper.DEFAULT_CHARSET))); System.out.println(sendResult); } }
- 消费者生产
能够看到生产了 Tag 为 Tag1、Tag2 的音讯
其它 Tag 会被过滤掉
生产分类
RocketMQ 的生产模式分为两种:BROADCASTING(播送)和CLUSTERING(集群)
那这两种模式有什么区别呢?
- 播送:雷同生产组下的实例会反复生产同一个 Topic 的音讯,能够了解为大家做同样的工作,生产进度存储在客户端,有可能会导致局部音讯没有被生产
- 集群:雷同生产组下的实例会负载平衡地生产同一个 Topic 的音讯,能够了解为分工合作,生产进度存储在 Broker 端
所以大部分零碎都会应用集群模式去生产信息,毕竟能够程度拓展消费者来接受更大的生产压力
播送模式相对来说应用比拟少,个别都是一些音讯告诉同步的场景,想同步刷新缓存等
本文由博客一文多发平台 OpenWrite 公布!
正文完