关于rocketmq:RocketMQ学习十三消息的PUSH与PULL消费方式

52次阅读

共计 4958 个字符,预计需要花费 13 分钟才能阅读完成。

在 RocketMQ 里生产形式虽有 PUSH 与 PULL 两种,但实现机制实为 PULL 模式,PUSH 模式是一种伪推送,是对 PULL 模式的封装,每拉去一批音讯后,提交到生产端的线程池(异步),而后马上向 Broker 拉取音讯,即实现相似“推”的成果。

在 RocketMQ 中绝大数场景中,通常会抉择应用 PUSH 模式,具体起因下方也会进行阐明。

上面别离介绍下两者的关联。

一,PUSH 与 PULL 别离是什么

  • PUSH 指的是客户端与服务端建设好网络长连贯,当服务端有数据时立刻通过连贯将数据推送给客户端。
  • PULL 指的是客户端被动向服务端申请,拉取数据。

二,PUSH 与 PULL 的特点

  • PUSH 的一个特点是及时,一旦有数据服务端立刻将数据推送给客户端;对客户端来说比拟敌对,毋庸解决无数据的情景;不过服务端并不知道客户端的解决能力,如果客户端解决能力会造成音讯沉积在客户端的问题。
  • PULL 因为是客户端被动去服务端拉取数据,所以不存在音讯沉积问题;但什么时候有数据客户端是无奈感知的,所以拉取工夫距离不好管制,距离长音讯生产不及时;距离短会呈现有效拉取的申请。
    在 PULL 模式下为了保障生产的实时性,采起了长轮询音讯服务器拉取音讯的形式,每隔肯定工夫客户端向服务端发动一次申请,如果有数据则取回进行生产,如果服务端没数据客户端线程会阻塞,阻塞工夫为 15S,有数据了就会被唤醒。长轮询还是由 consumer 发动的,因而就算 broker 端有大量数据也不会被动推送给 consumer。
    对于长轮询的实现在 PullRequestHoldService 类里。

三,PUSH 与 PULL 的实现

先看看 PULL 应用的一个示例:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class PullConsumerTest {public static void main(String[] args) throws Exception {Semaphore semaphore = new Semaphore();
        Thread t = new Thread(new Task(semaphore));
        t.start();
        CountDownLatch cdh = new CountDownLatch(1);
        try {
            // 程序运行 120s
            cdh.await(120 * 1000, TimeUnit.MILLISECONDS);
        } finally {semaphore.running = false;}
    }
    /**
     * 音讯拉取外围实现逻辑
     */
    static class Task implements Runnable {Semaphore s = new Semaphore();
        public Task(Semaphore s) {this.s = s;}
        public void run() {
            try {
                DefaultMQPullConsumer consumer = new 
                    DefaultMQPullConsumer("dw_pull_consumer");
                consumer.setNamesrvAddr("127.0.01:9876");
                consumer.start();
                Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
                Set<MessageQueue> msgQueueList = consumer.
                    fetchSubscribeMessageQueues("TOPIC_TEST"); // 获取该 Topic 的所有队列
                if(msgQueueList != null && !msgQueueList.isEmpty()) {
                    boolean noFoundFlag = false;
                    while(this.s.running) {if(noFoundFlag) { // 没有找到音讯,暂停一下生产
                            Thread.sleep(1000);
                        }
                        for(MessageQueue q : msgQueueList) {
                            PullResult pullResult = consumer.pull(q, "*",                                          decivedPulloffset(offsetTable
                             , q, consumer) , 3000);
                            System.out.println("pullStatus:" + 
                                               pullResult.getPullStatus());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    doSomething(pullResult.getMsgFoundList());
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                case OFFSET_ILLEGAL:
                                    noFoundFlag = true;
                                    break;
                                default:
                                    continue ;
                            }
                            // 提交位点
                            consumer.updateConsumeOffset(q, 
                                 pullResult.getNextBeginOffset());
                        }
                        System.out.println("balacne queue is empty:" + consumer.
                              fetchMessageQueuesInBalance("TOPIC_TEST").isEmpty());
                    }
                } else {System.out.println("end,because queue is enmpty");
                }
                consumer.shutdown();
                System.out.println("consumer shutdown");
            } catch (Throwable e) {e.printStackTrace();
            }
        }
    }
    /** 拉取到音讯后具体的解决逻辑 */
    private static void doSomething(List<MessageExt> msgs) {System.out.println("本次拉取到的音讯条数:" + msgs.size());
    }
    public static long decivedPulloffset(Map<MessageQueue, Long> offsetTable, 
             MessageQueue queue, DefaultMQPullConsumer consumer) throws Exception {long offset = consumer.fetchConsumeOffset(queue, false);
        if(offset < 0) {offset = 0;}
        System.out.println("offset:" + offset);
        return offset;
    }
    static class Semaphore {public volatile boolean running = true;}
}

音讯的拉取实现次要在工作 Task 的 run 办法中,重点看下:

  1. 首先依据 MQConsumer 的 fetchSubscribeMessageQueues 的办法获取 Topic 的所有队列信息
  2. 而后遍历所有队列,顺次通过 MQConsuemr 的 PULL 办法从 Broker 端拉取音讯。
  3. 对拉取的音讯进行生产解决
  4. 通过调用 MQConsumer 的 updateConsumeOffset 办法更新位点,但须要留神的是这个办法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。

下面的示例逻辑倒是挺清晰,不过以下这些问题咱们在应用时须要思考的:

  1. 从 broker 拉取了一批音讯后多个消费者须要手动实现队列的调配。上例是只是一个生产组且组里只有一个消费者,如果是多个咱们须要思考队列的分配情况
  2. 生产完音讯后咱们须要被动上报生产进度,而后拉取下一批。
  3. 如果遇到音讯生产失败,须要告知 Broker,该条音讯生产失败,后续须要重试,通过手动调用 sendMessageBack 办法实现

咱们再来看一下 PUSH 应用的示例

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new 
            DefaultMQPushConsumer("dw_test_consumer_6");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TOPIC_TEST", "*");
        consumer.setAllocateMessageQueueStrategy(new 
               AllocateMessageQueueAveragelyByCircle());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                try {
                    System.out.printf("%s Receive New Messages: %s %n", 
                          Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Throwable e) {e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

理下流程:

  1. 首先 new DefaultMQPushConsumer 对象,并指定一个生产组名。
  2. 而后设置相干参数,例如 nameSrvAdd、生产失败重试次数、线程数等
  3. 通过调用 setConsumeFromWhere 办法指定首次启动时从什么中央生产,默认是最新的音讯开始生产。
  4. 通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。
  5. 通过调用 registerMessageListener 设置音讯监听器,即音讯解决逻辑,最终返回 CONSUME_SUCCESS(胜利生产)或 RECONSUME_LATER(须要重试)。

相较于 PULL 形式,咱们在应用 PUSH 形式时只需指定好相干策略而后在 MessageListener 的回调里进行音讯解决就行。至于开始生产的偏移量,队列的负载咱们毋庸干涉太多,这些问题都被封装了。

本文次要介绍了生产的两种形式别离为 PULL 与 PUSH,也介绍了二者的特点,而后别离给出了相应的示例,相较于 PULL 形式,PUSH 形式封装了更多,应用起来对用户更敌对,大数场景通常会抉择应用 PUSH 的形式。

参考文章:08 音讯生产 API 与版本变迁阐明
【RocketMq 实战第四篇】- 不同类型消费者(DefaultMQPushConsumer&DefaultMQPullConsumer)
RocketMQ 音讯生产形式 推拉模式

正文完
 0