vivo 互联网服务器团队 - Li Kui

一、简介

1.1 RocketMQ 简介

RocketMQ是由阿里巴巴开源的分布式消息中间件,反对程序音讯、定时音讯、自定义过滤器、负载平衡、pull/push音讯等性能。RocketMQ次要由 Producer、Broker、Consumer 、NameServer四局部组成,其中Producer 负责生产音讯,Consumer 负责生产音讯,Broker 负责存储音讯。NameServer充当名字路由服务,整体架构图如下所示:

  • roducer:负责生产音讯,个别由业务零碎生产音讯,可通过集群形式部署。RocketMQ提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要Broker返回确认信息,单向发送不须要。
  • Consumer:负责生产音讯,个别是后盾零碎负责异步生产,可通过集群形式部署。一个音讯消费者会从Broker服务器拉取音讯、并将其提供给应用程序。提供pull/push两者生产模式。
  • Broker Server:负责存储音讯、转发音讯。RocketMQ零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备,存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
  • Name Server:名字服务,充当路由音讯的提供者。生产者或消费者可能通过名字服务查找各主题相应的Broker IP列表。多个NameServer实例组成集群,互相独立,没有信息替换。

本文基于Apache RocketMQ 最新版本次要讲述RocketMQ的消费者机制,剖析其启动流程、pull/push机制,音讯ack机制以及定时音讯和程序音讯的不同。

1.2 工作流程

(1)启动NameServer。

NameServer起来后监听端口,期待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

(2)启动Broker。

跟所有的NameServer放弃长连贯,定时发送心跳包。心跳包中蕴含以后Broker信息(IP+端口等)以及存储所有Topic信息。注册胜利后,NameServer集群中就有Topic跟Broker的映射关系。

(3)创立Topic。

创立Topic时须要指定该Topic要存储在哪些Broker上,也能够在发送音讯时主动创立Topic。

(4)Producer发送音讯。

启动时先跟NameServer集群中的其中一台建设长连贯,并从NameServer中获取以后发送的Topic存在哪些Broker上,轮询从队列列表中抉择一个队列,而后与队列所在的Broker建设长连贯从而向Broker发消息。

(5)Consumer生产音讯。

跟其中一台NameServer建设长连贯,获取以后订阅Topic存在哪些Broker上,而后间接跟Broker建设连贯通道,开始生产音讯。

二、消费者启动流程

官网给出的消费者实现代码如下所示:

public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        // 实例化消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");        // 设置NameServer的地址        consumer.setNamesrvAddr("localhost:9876");        // 订阅一个Topic,以及Tag来过滤须要生产的音讯        consumer.subscribe("Test", "*");        // 注册回调实现类来解决从broker拉取回来的音讯        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);                // 标记该音讯曾经被胜利生产                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 启动消费者实例        consumer.start();        System.out.printf("Consumer Started.%n");    }}

上面让咱们来剖析消费者在启动中每一阶段中做了什么吧,let’s go.

2.1 实例化消费者

第一步次要是实例化消费者,这里采取默认的Push消费者模式,结构器中参数为对应的消费者分组,指定同一分组能够生产同一类型的音讯,如果没有指定,将会采取默认的分组模式,这里实例化了一个DefaultMQPushConsumerImpl对象,它是前面生产性能的次要实现类。

// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");

次要通过DefaultMQPushConsumer实例化DefaultMQPushConsumerImpl,它是次要的生产性能实现类。

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,       AllocateMessageQueueStrategy allocateMessageQueueStrategy) {       this.consumerGroup = consumerGroup;       this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;       defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);   }

2.2 设置NameServer和订阅topic过程

// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤须要生产的音讯consumer.subscribe("Test", "*");

2.2.1 增加tag

设置NameServer地址后,这个地址为你名字服务集群的地址,相似于zookeeper集群地址,样例给出的是单机本地地址,搭建集群后,能够设置为集群地址,接下来咱们须要订阅一个主题topic下的音讯,设置对应的topic,能够进行分类,通过设置不同的tag来实现,但目前只反对"||"进行连贯,如:"tag1 || tag2 || tag3"。归根在于结构订阅数据时,源码通过"||"进行了字符串的宰割,如下所示:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,    String subString) throws Exception {    SubscriptionData subscriptionData = new SubscriptionData();    subscriptionData.setTopic(topic);    subscriptionData.setSubString(subString);     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {        subscriptionData.setSubString(SubscriptionData.SUB_ALL);    } else {        String[] tags = subString.split("\\|\\|");        if (tags.length > 0) {            for (String tag : tags) {                if (tag.length() > 0) {                    String trimString = tag.trim();                    if (trimString.length() > 0) {                        subscriptionData.getTagsSet().add(trimString);                        subscriptionData.getCodeSet().add(trimString.hashCode());                    }                }            }        } else {            throw new Exception("subString split error");        }    }     return subscriptionData;}

2.2.2 发送心跳至Broker

后面结构好订阅主题和分类后,将其放入了一个ConcurrentMap中,并调用sendHeartbeatToAllBrokerWithLock()办法,进行心跳检测和上传过滤器类至broker集群(生产者启动过程也会进行此步骤)。如下所示:

public void sendHeartbeatToAllBrokerWithLock() {    if (this.lockHeartbeat.tryLock()) {        try {            this.sendHeartbeatToAllBroker();            this.uploadFilterClassSource();        } catch (final Exception e) {            log.error("sendHeartbeatToAllBroker exception", e);        } finally {            this.lockHeartbeat.unlock();        }    } else {        log.warn("lock heartBeat, but failed.");    }}

首先会对broker集群进行心跳检测,在此过程中会施加锁,它会执行sendHeartbeatToAllBroker办法,构建心跳数据heartbeatData,而后遍历生产和生产者table,将消费者和生产者信息退出到heartbeatData中,当都存在消费者和生产者的状况下,会遍历brokerAddrTable,往每个broker 地址发送心跳,相当于往对应地址发送一次http申请,用于探测以后broker是否存活。

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

2.2.3上传过滤器类至FilterServer

之后会执行uploadFilterClassSource()办法,只有push模式才会有此过程,在此模式下,它会循环遍历订阅数据SubscriptionData,如果此订阅数据应用了类模式过滤,会调uploadFilterClassToAllFilterServer()办法:上传用户自定义的过滤音讯实现类至过滤器服务器。

private void uploadFilterClassSource() {    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();    while (it.hasNext()) {        Entry<String, MQConsumerInner> next = it.next();        MQConsumerInner consumer = next.getValue();        if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {            Set<SubscriptionData> subscriptions = consumer.subscriptions();            for (SubscriptionData sub : subscriptions) {                if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {                    final String consumerGroup = consumer.groupName();                    final String className = sub.getSubString();                    final String topic = sub.getTopic();                    final String filterClassSource = sub.getFilterClassSource();                    try {                        this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);                    } catch (Exception e) {                        log.error("uploadFilterClassToAllFilterServer Exception", e);                    }                }            }        }    }}

过滤器类的作用:生产端能够上传一个Class类文件到 FilterServer,Consumer从FilterServer拉取音讯时,FilterServer会把申请转发给Broker,FilterServer收取到Broker音讯后,依据上传的过滤类中的逻辑做过滤操作,过滤实现后再把音讯给到Consumer,用户能够自定义过滤音讯的实现类。

2.3 注册回调实现类

接下来就是代码中的注册回调实现类了,当然,如果你是pull模式的话就不须要实现它了,push模式须要定义,两者区别前面会讲到,它次要用于从broker实时获取音讯,这里有两种生产上下文类型,用于不同的生产类型。

ConsumeConcurrentlyContext:延时类音讯上下文,用于延时音讯,即定时音讯,默认不提早,能够设置提早等级,每个等级对应固定工夫刻度,RocketMQ中不能自定义延迟时间,提早等级从1开始,对应的工夫距离如下所示:

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ConsumeOrderlyContext :程序类音讯上下文,管制发送音讯的程序,生产者设置分片路由规定后,雷同key只落到指定queue上,生产过程中会对程序音讯所在的queue加锁,保障音讯的有序性。

2.4 消费者启动

咱们先来看下消费者启动的过程,如下所示:

(1)this.checkConfig():首先是检测生产配置项,包含生产分组group、音讯模型(集群、播送)、订阅数据、音讯监听器等是否存在,如果不存在的话,会抛出异样。

(2)copySubscription():构建主题订阅信息SubscriptionData并退出到RebalanceImpl负载平衡办法的订阅信息中。

(3)getAndCreateMQClientInstance():初始化MQ客户端实例。

(4)offsetStore.load():依据不同音讯模式创立生产进度offsetStore并加载:BROADCASTING-播送模式,同一个生产group中的consumer都生产一次,CLUSTERING-集群模式,默认形式,只被生产一次。

switch (this.defaultMQPushConsumer.getMessageModel()) {    case BROADCASTING:        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    case CLUSTERING:        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    default:        break;}

能够通过setMessageModel形式设置不同模式;播送模式下同生产组的消费者互相独立,生产进度在本地独自进行存储;集群模式下,同一条音讯只会被同一个生产组生产一次,生产进度会参加到负载平衡中,生产进度是共享在整个生产组中的。

(5)consumeMessageService.start():依据不同音讯监听类型实例化并启动。这里有延时音讯和程序音讯。

这里次要讲下程序音讯,RocketMQ也帮咱们实现了,在启动时,如果是集群模式并是程序类型,它会启动定时工作,定时向broker发送批量锁,锁住以后程序生产发往的音讯队列,程序音讯因为生产者生产音讯时指定了分片策略和音讯上下文,只会发往一个生产队列。

定时工作发送批量锁,锁住以后程序音讯队列。

public void start() {        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {                @Override                public void run() {                    ConsumeMessageOrderlyService.this.lockMQPeriodically();                }            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);        }    }

发送锁住队列的音讯至broker,broker端返回锁住胜利的队列汇合lockOKMQSet,程序音讯具体实现可查看前面第四节。

(6)mQClientFactory.registerConsumer():MQClientInstance注册消费者,并启动MQClientInstance,没有注册胜利会完结生产服务。

(7)mQClientFactory.start():最初会启动如下服务:近程客户端、定时工作、pull音讯服务、负载平衡服务、push音讯服务,而后将状态改为运行中。

switch (this.serviceState) {    case CREATE_JUST:        this.serviceState = ServiceState.START_FAILED;        // If not specified,looking address from name server        if (null == this.clientConfig.getNamesrvAddr()) {            this.mQClientAPIImpl.fetchNameServerAddr();        }        // Start request-response channel        this.mQClientAPIImpl.start();        // Start various schedule tasks        this.startScheduledTask();        // Start pull service        this.pullMessageService.start();        // Start rebalance service        this.rebalanceService.start();        // Start push service        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);        log.info("the client factory [{}] start OK", this.clientId);        this.serviceState = ServiceState.RUNNING;        break;    case RUNNING:        break;    case SHUTDOWN_ALREADY:        break;    case START_FAILED:        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);    default:        break;}

全副启动结束后,整个消费者也就启动好了,接下来就能够对生产者发送过去的音讯进行生产了,那么是如何进行音讯生产的呢?不同的音讯模式有何区别呢?

三、pull/push 模式生产

3.1 pull模式-DefaultMQPullConsumer

pull拉取式生产:利用通常被动调用Consumer的拉音讯办法从Broker服务器拉音讯、主动权由利用程序控制,能够指定生产的位移,【伪代码】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 启动消费者实例consumer.start();//获取主题下所有的音讯队列,这里依据主题从nameserver获取的Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");for (MessageQueue queue : mqs) {    //获取以后队列的生产位移,指定生产进度offset,fromstore:从broker中获取还是本地获取,true-broker    long offset = consumer.fetchConsumeOffset(queue, true);    PullResult pullResult = null;    while (offset < pullResult.getMaxOffset()) {        //第二个参数为tag,获取指定topic下的tag        //第三个参数示意从哪个位移下开始生产音讯        //第四个参数示意一次最大拉取多少个音讯        try {            pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);        } catch (Exception e) {            e.printStackTrace();            System.out.println("pull拉取音讯失败");        }        //代码省略,记录音讯位移        offset = pullResult.getNextBeginOffset();        //代码省略,这里为生产音讯    }}

能够看到咱们是被动拉取topic对应下的音讯队列,而后遍历它们,获取以后生产进度并进行生产。

3.2 push模式-DefaultMQPushConsumer

该模式下Broker收到数据后会被动推送给生产端,该生产模式个别实时性较高,当初个别举荐应用该形式,具体示例能够观看第一章结尾的官网demo。

它也是通过实现pull形式来实现的,首先,后面2.4消费者启动之后,最初会启动拉取音讯服务pullMessageService和负载平衡rebalanceService服务,它们启动后会始终有线程进行生产。

case CREATE_JUST:               //......                // Start pull service                this.pullMessageService.start();                // Start rebalance service                this.rebalanceService.start();                //.......                this.serviceState = ServiceState.RUNNING;                break;  case RUNNING:

这外面调用doRebalance()办法,进行负载平衡,默认每20s做一次,会轮询所有订阅该实例的topic。

public class RebalanceService extends ServiceThread {    //初始化,省略....     @Override    public void run() {        log.info(this.getServiceName() + " service started");         while (!this.isStopped()) {            this.waitForRunning(waitInterval);            //做负载平衡            this.mqClientFactory.doRebalance();        }         log.info(this.getServiceName() + " service end");    }     @Override    public String getServiceName() {        return RebalanceService.class.getSimpleName();    }}

而后依据每个topic,以及它是否程序音讯模式来做rebalance。

具体做法就是先对Topic下的音讯生产队列、消费者Id进行排序,而后用音讯队列的平均分配算法,计算出待拉取的音讯队列,将调配到的音讯队列汇合与processQueueTable做一个过滤比对,新队列不蕴含或已过期,则进行移除 。

public void doRebalance(final boolean isOrder) {      Map<String, SubscriptionData> subTable = this.getSubscriptionInner();      if (subTable != null) {          for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {              final String topic = entry.getKey();              try {                  /依据 /每个topic,以及它是否程序音讯模式来做rebalance                  this.rebalanceByTopic(topic, isOrder);              } catch (Throwable e) {                  if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      log.warn("rebalanceByTopic Exception", e);                  }              }          }      }       this.truncateMessageQueueNotMyTopic();  }

rebalanceByTopic中播送和集群模式都会执行updateProcessQueueTableInRebalance()办法,最初会散发申请dispatchPullRequest,通过executePullRequestImmediately()办法将pull申请放入pull申请队列pullRequestQueue中,留神,pull模式下散发申请办法dispatchPullRequest()理论实现是一个空办法,这里两者很大不同,push模式实现如下

@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) {     for (PullRequest pullRequest : pullRequestList) {         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);     } }

而后再PullMessageService中,因为后面consumer启动胜利了,PullMessageService线程会实时去取pullRequestQueue中的pull申请。

@Override  public void run() {      log.info(this.getServiceName() + " service started");       while (!this.isStopped()) {          try {              PullRequest pullRequest = this.pullRequestQueue.take();              if (pullRequest != null) {                  this.pullMessage(pullRequest);              }          } catch (InterruptedException e) {          } catch (Exception e) {              log.error("Pull Message Service Run Method exception", e);          }      }       log.info(this.getServiceName() + " service end");  }

取出来的pull申请又会经由DefaultMQPushConsumerImpl的音讯监听类,调用pullMessage()办法。

private void pullMessage(final PullRequest pullRequest) {     final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());     if (consumer != null) {         DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;         impl.pullMessage(pullRequest);     } else {         log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);     } }

pullMessage()中pullKernelImpl()有一个Pullback办法用于执行音讯的回调,它会通过submitConsumeRequest()这个办法来解决音讯,总而言之就是通过线程回调的形式让push模式下的监听器可能感知到。

//Pull回调PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                     switch (pullResult.getPullStatus()) {                        case FOUND:                         //省略...生产位移更新                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispathToConsume);

这个办法对应的不同生产模式有着不同实现,但都是会构建一个生产申请ConsumeRequest,外面有一个run()办法,构建结束后,会把它放入到listener监听器中。

//监听音讯 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

还记得后面咱们样例给出的注册监听器回调解决办法吗?

咱们能够点击下面的consumeMessage办法,查看它在源码中的实现地位,发现它就回到了咱们后面的2.3注册回调实现类外面了,整个流程是不是通顺了呢?这个监听器中就会收到push的音讯,拉取出来进行业务生产逻辑,上面是咱们本人定义的音讯回调解决办法。

// 注册回调实现类来解决从broker拉取回来的音讯 consumer.registerMessageListener(new MessageListenerConcurrently() {     @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);         // 标记该音讯曾经被胜利生产         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } });

3.3 小结

push模式相比拟于pull模式不同的是,做负载平衡时,pullRequest申请会放入pullRequestQueue,而后PullMessageService线程会实时去取出这个申请,将音讯存入ProcessQueue,通过线程回调的形式让push模式下的监听器可能感知到,这样音讯从散发申请到接管都是实时的,而pull模式是生产端被动去拉取指定音讯的,须要指定生产进度。

对于咱们开发者来说,选取哪种模式实现咱们的业务逻辑比拟适合呢?别急,先让咱们总结下他们的特点:

共同点:

  1. 两者底层理论一样,push模式也是基于pull模式来实现的。
  2. pull模式须要咱们通过程序被动通过consumer向broker拉音讯,而音讯的push模式则只须要咱们提供一个listener监听,实时获取音讯。

长处:

push模式采纳长轮询阻塞的形式获取音讯,实时性十分高;

push模式rocketMQ解决了获取音讯的细节,应用起来比较简单不便;

pull模式能够指定生产进度,想生产多少就生产多少,灵活性大。

毛病:

  1. push模式当消费者能力远远低于生产者能力的时候,会产生肯定的消费者音讯沉积;
  2. pull模式实时性很低,频率不好设置;
  3. 拉取音讯的距离不好设置,太短则产生很多有效Pull申请的RPC开销,影响MQ整体的网络性能,太长则实时性差。

实用场景:

  1. 对于服务端生产音讯数据比拟大时,而生产端解决比较复杂,生产能力绝对较低时,这种状况就实用pull模式;
  2. 对于数据实时性要求高的场景,就比拟实用与push模式。

当初的你是否明确业务中该应用哪种模式了呢?

四、程序音讯

4.1 实现MQ程序音讯发送存在问题

(1)个别音讯发送会采取轮询形式把音讯发送到不同的queue(分区队列);而生产音讯的时候从多个queue上拉取音讯,broker之间是无感知的,这种状况发送和生产是不能保障程序。

(2)异步形式发送音讯时,发送的时候不是按着一条一条程序发送的,保障不了音讯达到Broker的工夫也是依照发送的程序来的。

音讯发送到存储,最初到生产要经验这么多步骤,咱们该如何在业务中应用程序音讯呢?让咱们来一步步拆解下吧。

4.2 实现MQ程序音讯关键点

既然扩散到多个broker上无奈追踪程序,那么能够管制发送的程序音讯只顺次发送到同一个queue中,生产的时候只从这个queue上顺次拉取,则就保障了程序。在发送时设置分片路由规定,让雷同key的音讯只落到指定queue上,而后生产过程中对程序音讯所在的queue加锁,保障音讯的有序性,让这个queue上的音讯就依照FIFO程序来进行生产。因而咱们满足以下三个条件是否就能够呢?

1)音讯程序发送:多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单)的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到mq中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。

//采纳的同步发送形式,在一个线程内程序发送,异步发送形式为:producer.send(msg, new SendCallback() {...})SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

2)音讯顺序存储:MQ 的topic下会存在多个queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个queue中。对应到mq中,须要应用MessageQueueSelector来抉择要发送的queue。即能够对业务编号设置路由规定,像依据队列数量对业务字段hash取余,将音讯发送到一个queue中。

//应用"%"操作,使得订单id取余后雷同的数据路由到同一个queue中,也能够自定义路由规定long index = id % mqs.size();  return mqs.get((int) index);

3)音讯程序生产:要保障音讯程序生产,同一个queue就只能被一个消费者所生产,因而对broker中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。这里RocketMQ曾经为咱们实现好了。

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();    for (MessageQueue mq : mqSet) {        if (!this.processQueueTable.containsKey(mq)) {            if (isOrder && !this.lock(mq)) {                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);                continue;            }          //....省略        }    }

消费者从新负载,并且调配完生产队列后,须要向mq服务器发动音讯拉取申请,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance()中,针对程序音讯的音讯拉取,mq做了以上判断,即生产客户端先向broker端发动对messageQueue的加锁申请,只有加锁胜利时才创立pullRequest进行音讯拉取,这里的pullRequest就是后面pull和push模式音讯体,而updateProcessQueueTableInRebalance这个办法也是在后面消费者启动过程中有讲到过哦。

具体加锁逻辑如下:

public boolean lock(final MessageQueue mq) {     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);     if (findBrokerResult != null) {         LockBatchRequestBody requestBody = new LockBatchRequestBody();         requestBody.setConsumerGroup(this.consumerGroup);         requestBody.setClientId(this.mQClientFactory.getClientId());         requestBody.getMqSet().add(mq);          try {             Set<MessageQueue> lockedMq =                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);             for (MessageQueue mmqq : lockedMq) {                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);                 if (processQueue != null) {                     processQueue.setLocked(true);                     processQueue.setLastLockTimestamp(System.currentTimeMillis());                 }             }              boolean lockOK = lockedMq.contains(mq);             log.info("the message queue lock {}, {} {}",                 lockOK ? "OK" : "Failed",                 this.consumerGroup,                 mq);             return lockOK;         } catch (Exception e) {             log.error("lockBatchMQ exception, " + mq, e);         }     }      return false; }

能够看到,就是调用lockBatchMQ办法发送了一个加锁申请,胜利获取到音讯解决队列就设为获取到锁,返回锁定胜利,如果加锁胜利,同一时刻只有一个线程进行音讯生产。加锁失败,会提早1000ms从新尝试向broker端申请锁定messageQueue,锁定胜利后从新提交生产申请。

怎么样,这样的加锁形式是不是很像咱们平时用到的分布式锁呢?由你来设计实现你会怎么做呢?

五、音讯ack机制

5.1 音讯生产失败解决

音讯被消费者生产了,那么如何保障被生产胜利呢?音讯生产失败会呈现什么状况呢?

音讯被生产,那么如何保障被生产胜利呢?这里只有应用方管制,只有应用方确认胜利了,才会生产胜利,否则会从新投递。

RocketMQ其实是通过ACK机制来对失败音讯进行重试和告诉的,具体流程如下所示:

音讯胜利与否是由应用方管制,只有应用方确认胜利了,才会生产胜利,否则会从新投递,Consumer会通过监听器监听回调过去的音讯,返回ConsumeConcurrentlyStatus.CONSUME\_SUCCESS示意生产胜利,如果生产失败,返回ConsumeConcurrentlyStatus.RECONSUME\_LATER状态(生产重试),RocketMQ就会默认为这条音讯失败了,提早肯定工夫后(默认10s,可配置),会再次投送到ConsumerGroup,重试次数与间隔时间关系上图所示。如果继续这样,失败到肯定次数(默认16次),就会进入到DLQ死信队列,不再投递,此时能够通过监控人工来干涉。

5.2 音讯重投带来问题

RocketMQ 生产音讯因为音讯重投很大一个问题就是无奈保障音讯只被生产一次,因而须要开发人员在业务外面本人去解决。

六、总结

本文次要介绍了RocketMQ的消费者启动流程,联合官网源码和示例,一步步讲述消费者在启动和音讯生产中的的工作原理及内容,并联合平时业务工作中,对咱们所相熟的程序、push/pull模式等进行详细分析,以及对于音讯生产失败和重投带来问题去进行剖析。

对于本人而言,心愿通过被动学习源码形式,可能明确其中启动的原理,学习外面优良的计划,像对于pull/push,程序音讯这些,学习之后可能理解到push模式是何如做到实时拉取音讯的,程序音讯是如何保障的,再就是可能联想到平时遇到这种问题该如何解决,像程序音讯在音讯被生产时放弃和存储的程序统一,这里本人施加分布式锁写能不能实现等,文中也有很多引导性问题,心愿能引起读者本人的思考,可能对整个消费者启动和音讯生产流程有着较为直观的认知,但还有着一些技术细节因为篇幅起因没做出具体阐明,也欢送大家一起探讨交换~

参考资料:

  • RocketMQ官网示例
  • RocketMQ系列之pull(拉)音讯模式(七)
  • RocketMQ的程序音讯(程序生产)