RocketMQ生产端

明天要来跟大家学习怎么应用RocketMQ来进行音讯的生产

先简略创立个Maven我的项目应用

  • 增加依赖
<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client</artifactId>    <version>4.9.2</version></dependency>
  • 启动消费者

    package mq.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class BroadcastConsumer {    public static void main(String[] args) throws MQClientException {        //创立一个push模式的生产组        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumer");        pushConsumer.setNamesrvAddr("localhost:9876");        //集群模式        pushConsumer.setMessageModel(MessageModel.CLUSTERING);        //  订阅的topic tag        pushConsumer.subscribe("topic_test01","Tag1 || Tag2");        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        pushConsumer.start();        System.out.printf("Broadcast Consumer Started.%n");    }
  • 启动生产者

    package mq.producer;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducerV2 {    /**     * 同步音讯发送     *     * @param args     * @throws MQClientException     * @throws MQBrokerException     * @throws RemotingException     * @throws InterruptedException     * @throws UnsupportedEncodingException     */    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {        System.out.println("SyncProducer start......");        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("pg_sync_01");        defaultMQProducer.setNamesrvAddr("localhost:9876");        defaultMQProducer.start();        for (int i = 0; i < 10; i++) {            send(defaultMQProducer, i, i % 3);        }        defaultMQProducer.shutdown();        System.out.println("SyncProducer end......");    }    private static void send(DefaultMQProducer defaultMQProducer, Integer i, int tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {        SendResult sendResult = defaultMQProducer.send(new Message("topic_test01", "Tag" + tag, ("hello this is sync message_" + i + "!").getBytes(RemotingHelper.DEFAULT_CHARSET)));        System.out.println(sendResult);    }}
  • 消费者生产
能够看到生产了Tag为Tag1、Tag2的音讯

其它Tag会被过滤掉

生产分类

RocketMQ的生产模式分为两种:BROADCASTING(播送)和CLUSTERING(集群)

那这两种模式有什么区别呢?

  • 播送:雷同生产组下的实例会反复生产同一个Topic的音讯,能够了解为大家做同样的工作,生产进度存储在客户端,有可能会导致局部音讯没有被生产
  • 集群:雷同生产组下的实例会负载平衡地生产同一个Topic的音讯,能够了解为分工合作,生产进度存储在Broker端

所以大部分零碎都会应用集群模式去生产信息,毕竟能够程度拓展消费者来接受更大的生产压力

播送模式相对来说应用比拟少,个别都是一些音讯告诉同步的场景,想同步刷新缓存等

本文由博客一文多发平台 OpenWrite 公布!