关于rocketmq:RocketMQ之消费者启动与消费流程

120次阅读

共计 17662 个字符,预计需要花费 45 分钟才能阅读完成。

vivo 互联网服务器团队 – Li Kui

一、简介

1.1 RocketMQ 简介

RocketMQ 是由阿里巴巴开源的分布式消息中间件,反对程序音讯、定时音讯、自定义过滤器、负载平衡、pull/push 音讯等性能。RocketMQ 次要由 Producer、Broker、Consumer、NameServer 四局部组成,其中 Producer 负责生产音讯,Consumer 负责生产音讯,Broker 负责存储音讯。NameServer 充当名字路由服务,整体架构图如下所示:

  • roducer:负责生产音讯,个别由业务零碎生产音讯,可通过集群形式部署。RocketMQ 提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要 Broker 返回确认信息,单向发送不须要。
  • Consumer:负责生产音讯,个别是后盾零碎负责异步生产,可通过集群形式部署。一个音讯消费者会从 Broker 服务器拉取音讯、并将其提供给应用程序。提供 pull/push 两者生产模式。
  • Broker Server:负责存储音讯、转发音讯。RocketMQ 零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备,存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
  • Name Server:名字服务,充当路由音讯的提供者。生产者或消费者可能通过名字服务查找各主题相应的 Broker IP 列表。多个 NameServer 实例组成集群,互相独立,没有信息替换。

本文基于 Apache RocketMQ 最新版本次要讲述 RocketMQ 的消费者机制,剖析其启动流程、pull/push 机制,音讯 ack 机制以及定时音讯和程序音讯的不同。

1.2 工作流程

(1)启动 NameServer。

NameServer 起来后监听端口,期待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

(2)启动 Broker。

跟所有的 NameServer 放弃长连贯,定时发送心跳包。心跳包中蕴含以后 Broker 信息 (IP+ 端口等) 以及存储所有 Topic 信息。注册胜利后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

(3)创立 Topic。

创立 Topic 时须要指定该 Topic 要存储在哪些 Broker 上,也能够在发送音讯时主动创立 Topic。

(4)Producer 发送音讯。

启动时先跟 NameServer 集群中的其中一台建设长连贯,并从 NameServer 中获取以后发送的 Topic 存在哪些 Broker 上,轮询从队列列表中抉择一个队列,而后与队列所在的 Broker 建设长连贯从而向 Broker 发消息。

(5)Consumer 生产音讯。

跟其中一台 NameServer 建设长连贯,获取以后订阅 Topic 存在哪些 Broker 上,而后间接跟 Broker 建设连贯通道,开始生产音讯。

二、消费者启动流程

官网给出的消费者实现代码如下所示:

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
        // 设置 NameServer 的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个 Topic,以及 Tag 来过滤须要生产的音讯
        consumer.subscribe("Test", "*");
        // 注册回调实现类来解决从 broker 拉取回来的音讯
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该音讯曾经被胜利生产
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

上面让咱们来剖析消费者在启动中每一阶段中做了什么吧,let’s go.

2.1 实例化消费者

第一步次要是实例化消费者,这里采取默认的 Push 消费者模式,结构器中参数为对应的消费者分组,指定同一分组能够生产同一类型的音讯,如果没有指定,将会采取默认的分组模式,这里实例化了一个 DefaultMQPushConsumerImpl 对象,它是前面生产性能的次要实现类。

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");

次要通过 DefaultMQPushConsumer 实例化 DefaultMQPushConsumerImpl,它是次要的生产性能实现类。

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
       AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
       this.consumerGroup = consumerGroup;
       this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
       defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
   }

2.2 设置 NameServer 和订阅 topic 过程

// 设置 NameServer 的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个 Topic,以及 Tag 来过滤须要生产的音讯
consumer.subscribe("Test", "*");

2.2.1 增加 tag

设置 NameServer 地址后,这个地址为你名字服务集群的地址,相似于 zookeeper 集群地址,样例给出的是单机本地地址,搭建集群后,能够设置为集群地址,接下来咱们须要订阅一个主题 topic 下的音讯,设置对应的 topic,能够进行分类,通过设置不同的 tag 来实现,但目前只反对 ”||” 进行连贯,如:”tag1 || tag2 || tag3″。归根在于结构订阅数据时,源码通过 ”||” 进行了字符串的宰割,如下所示:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);
 
    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();
                    if (trimString.length() > 0) {subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {throw new Exception("subString split error");
        }
    }
 
    return subscriptionData;
}

2.2.2 发送心跳至 Broker

后面结构好订阅主题和分类后,将其放入了一个 ConcurrentMap 中,并调用 sendHeartbeatToAllBrokerWithLock()办法,进行心跳检测和上传过滤器类至 broker 集群(生产者启动过程也会进行此步骤)。如下所示:

public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {
        try {this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);
        } finally {this.lockHeartbeat.unlock();
        }
    } else {log.warn("lock heartBeat, but failed.");
    }
}

首先会对 broker 集群进行心跳检测,在此过程中会施加锁,它会执行 sendHeartbeatToAllBroker 办法,构建心跳数据 heartbeatData,而后遍历生产和生产者 table,将消费者和生产者信息退出到 heartbeatData 中,当都存在消费者和生产者的状况下,会遍历 brokerAddrTable,往每个 broker 地址发送心跳,相当于往对应地址发送一次 http 申请,用于探测以后 broker 是否存活。

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

2.2.3 上传过滤器类至 FilterServer

之后会执行 uploadFilterClassSource()办法,只有 push 模式才会有此过程,在此模式下,它会循环遍历订阅数据 SubscriptionData,如果此订阅数据应用了类模式过滤,会调 uploadFilterClassToAllFilterServer()办法:上传用户自定义的过滤音讯实现类至过滤器服务器。

private void uploadFilterClassSource() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {Entry<String, MQConsumerInner> next = it.next();
        MQConsumerInner consumer = next.getValue();
        if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {final String consumerGroup = consumer.groupName();
                    final String className = sub.getSubString();
                    final String topic = sub.getTopic();
                    final String filterClassSource = sub.getFilterClassSource();
                    try {this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                    } catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);
                    }
                }
            }
        }
    }
}

过滤器类的作用:生产端能够上传一个 Class 类文件到 FilterServer,Consumer 从 FilterServer 拉取音讯时,FilterServer 会把申请转发给 Broker,FilterServer 收取到 Broker 音讯后,依据上传的过滤类中的逻辑做过滤操作,过滤实现后再把音讯给到 Consumer,用户能够自定义过滤音讯的实现类。

2.3 注册回调实现类

接下来就是代码中的注册回调实现类了,当然,如果你是 pull 模式的话就不须要实现它了,push 模式须要定义,两者区别前面会讲到,它次要用于从 broker 实时获取音讯,这里有两种生产上下文类型,用于不同的生产类型。

ConsumeConcurrentlyContext:延时类音讯上下文,用于延时音讯,即定时音讯,默认不提早,能够设置提早等级,每个等级对应固定工夫刻度,RocketMQ 中不能自定义延迟时间,提早等级从 1 开始,对应的工夫距离如下所示:

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ConsumeOrderlyContext:程序类音讯上下文,管制发送音讯的程序,生产者设置分片路由规定后,雷同 key 只落到指定 queue 上,生产过程中会对程序音讯所在的 queue 加锁,保障音讯的有序性。

2.4 消费者启动

咱们先来看下消费者启动的过程,如下所示:

(1)this.checkConfig():首先是检测生产配置项,包含生产分组 group、音讯模型(集群、播送)、订阅数据、音讯监听器等是否存在,如果不存在的话,会抛出异样。

(2)copySubscription():构建主题订阅信息 SubscriptionData 并退出到 RebalanceImpl 负载平衡办法的订阅信息中。

(3)getAndCreateMQClientInstance():初始化 MQ 客户端实例。

(4)offsetStore.load():依据不同音讯模式创立生产进度 offsetStore 并加载:BROADCASTING- 播送模式,同一个生产 group 中的 consumer 都生产一次,CLUSTERING- 集群模式,默认形式,只被生产一次。

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}

能够通过 setMessageModel 形式设置不同模式;播送模式下同生产组的消费者互相独立,生产进度在本地独自进行存储;集群模式下,同一条音讯只会被同一个生产组生产一次,生产进度会参加到负载平衡中,生产进度是共享在整个生产组中的。

(5)consumeMessageService.start():依据不同音讯监听类型实例化并启动。这里有延时音讯和程序音讯。

这里次要讲下程序音讯,RocketMQ 也帮咱们实现了,在启动时,如果是集群模式并是程序类型,它会启动定时工作,定时向 broker 发送批量锁,锁住以后程序生产发往的音讯队列,程序音讯因为生产者生产音讯时指定了分片策略和音讯上下文,只会发往一个生产队列。

定时工作发送批量锁,锁住以后程序音讯队列。

public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

发送锁住队列的音讯至 broker,broker 端返回锁住胜利的队列汇合 lockOKMQSet,程序音讯具体实现可查看前面第四节。

(6)mQClientFactory.registerConsumer():MQClientInstance 注册消费者,并启动 MQClientInstance,没有注册胜利会完结生产服务。

(7)mQClientFactory.start():最初会启动如下服务:近程客户端、定时工作、pull 音讯服务、负载平衡服务、push 音讯服务,而后将状态改为运行中。

switch (this.serviceState) {
    case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
        // If not specified,looking address from name server
        if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
        }
        // Start request-response channel
        this.mQClientAPIImpl.start();
        // Start various schedule tasks
        this.startScheduledTask();
        // Start pull service
        this.pullMessageService.start();
        // Start rebalance service
        this.rebalanceService.start();
        // Start push service
        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
        log.info("the client factory [{}] start OK", this.clientId);
        this.serviceState = ServiceState.RUNNING;
        break;
    case RUNNING:
        break;
    case SHUTDOWN_ALREADY:
        break;
    case START_FAILED:
        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    default:
        break;
}

全副启动结束后,整个消费者也就启动好了,接下来就能够对生产者发送过去的音讯进行生产了,那么是如何进行音讯生产的呢?不同的音讯模式有何区别呢?

三、pull/push 模式生产

3.1 pull 模式 -DefaultMQPullConsumer

pull 拉取式生产:利用通常被动调用 Consumer 的拉音讯办法从 Broker 服务器拉音讯、主动权由利用程序控制,能够指定生产的位移,【伪代码】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");
// 设置 NameServer 的地址
consumer.setNamesrvAddr("localhost:9876");
// 启动消费者实例
consumer.start();
// 获取主题下所有的音讯队列,这里依据主题从 nameserver 获取的
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");
for (MessageQueue queue : mqs) {
    // 获取以后队列的生产位移,指定生产进度 offset,fromstore:从 broker 中获取还是本地获取,true-broker
    long offset = consumer.fetchConsumeOffset(queue, true);
    PullResult pullResult = null;
    while (offset < pullResult.getMaxOffset()) {
        // 第二个参数为 tag,获取指定 topic 下的 tag
        // 第三个参数示意从哪个位移下开始生产音讯
        // 第四个参数示意一次最大拉取多少个音讯
        try {pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);
        } catch (Exception e) {e.printStackTrace();
            System.out.println("pull 拉取音讯失败");
        }
        // 代码省略,记录音讯位移
        offset = pullResult.getNextBeginOffset();
        // 代码省略,这里为生产音讯
    }
}

能够看到咱们是被动拉取 topic 对应下的音讯队列,而后遍历它们,获取以后生产进度并进行生产。

3.2 push 模式 -DefaultMQPushConsumer

该模式下 Broker 收到数据后会被动推送给生产端,该生产模式个别实时性较高,当初个别举荐应用该形式,具体示例能够观看第一章结尾的官网 demo。

它也是通过实现 pull 形式来实现的,首先,后面 2.4 消费者启动 之后,最初会启动拉取音讯服务 pullMessageService 和负载平衡 rebalanceService 服务,它们启动后会始终有线程进行生产。

case CREATE_JUST:
               //......
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                //.......
                this.serviceState = ServiceState.RUNNING;
                break;
  case RUNNING:

这外面调用 doRebalance()办法,进行负载平衡,默认每 20s 做一次,会轮询所有订阅该实例的 topic。

public class RebalanceService extends ServiceThread {
    // 初始化,省略....
 
    @Override
    public void run() {log.info(this.getServiceName() + "service started");
 
        while (!this.isStopped()) {this.waitForRunning(waitInterval);
            // 做负载平衡
            this.mqClientFactory.doRebalance();}
 
        log.info(this.getServiceName() + "service end");
    }
 
    @Override
    public String getServiceName() {return RebalanceService.class.getSimpleName();
    }
}

而后依据每个 topic,以及它是否程序音讯模式来做 rebalance。

具体做法就是先对 Topic 下的音讯生产队列、消费者 Id 进行排序,而后用音讯队列的平均分配算法,计算出待拉取的音讯队列,将调配到的音讯队列汇合与 processQueueTable 做一个过滤比对,新队列不蕴含或已过期,则进行移除。

public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
      if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();
              try {
                  / 依据 / 每个 topic,以及它是否程序音讯模式来做 rebalance
                  this.rebalanceByTopic(topic, isOrder);
              } catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);
                  }
              }
          }
      }
 
      this.truncateMessageQueueNotMyTopic();}

rebalanceByTopic 中播送和集群模式都会执行 updateProcessQueueTableInRebalance()办法,最初会散发申请 dispatchPullRequest,通过 executePullRequestImmediately()办法将 pull 申请放入 pull 申请队列 pullRequestQueue 中,留神,pull 模式下散发申请办法 dispatchPullRequest()理论实现是一个空办法,这里两者很大不同,push 模式实现如下

@Override
 public void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
     }
 }

而后再 PullMessageService 中,因为后面 consumer 启动胜利了,PullMessageService 线程会实时去取 pullRequestQueue 中的 pull 申请。

@Override
  public void run() {log.info(this.getServiceName() + "service started");
 
      while (!this.isStopped()) {
          try {PullRequest pullRequest = this.pullRequestQueue.take();
              if (pullRequest != null) {this.pullMessage(pullRequest);
              }
          } catch (InterruptedException e) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);
          }
      }
 
      log.info(this.getServiceName() + "service end");
  }

取出来的 pull 申请又会经由 DefaultMQPushConsumerImpl 的音讯监听类,调用 pullMessage()办法。

private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
     if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
         impl.pullMessage(pullRequest);
     } else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
     }
 }

pullMessage()中 pullKernelImpl()有一个 Pullback 办法用于执行音讯的回调,它会通过 submitConsumeRequest()这个办法来解决音讯,总而言之就是通过线程回调的形式让 push 模式下的监听器可能感知到。

//Pull 回调
PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
 
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                         // 省略... 生产位移更新
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);

这个办法对应的不同生产模式有着不同实现,但都是会构建一个生产申请 ConsumeRequest,外面有一个 run()办法,构建结束后,会把它放入到 listener 监听器中。

// 监听音讯
 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

还记得后面咱们样例给出的注册监听器回调解决办法吗?

咱们能够点击下面的 consumeMessage 办法,查看它在源码中的实现地位,发现它就回到了咱们后面的 2.3 注册回调实现类外面了,整个流程是不是通顺了呢?这个监听器中就会收到 push 的音讯,拉取出来进行业务生产逻辑,上面是咱们本人定义的音讯回调解决办法。

// 注册回调实现类来解决从 broker 拉取回来的音讯
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
         // 标记该音讯曾经被胜利生产
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

3.3 小结

push 模式相比拟于 pull 模式不同的是,做负载平衡时,pullRequest 申请会放入 pullRequestQueue,而后 PullMessageService 线程会实时去取出这个申请,将音讯存入 ProcessQueue,通过线程回调的形式让 push 模式下的监听器可能感知到,这样音讯从散发申请到接管都是实时的,而 pull 模式是生产端被动去拉取指定音讯的,须要指定生产进度。

对于咱们开发者来说,选取哪种模式实现咱们的业务逻辑比拟适合呢?别急,先让咱们总结下他们的特点:

共同点:

  1. 两者底层理论一样,push 模式也是基于 pull 模式来实现的。
  2. pull 模式须要咱们通过程序被动通过 consumer 向 broker 拉音讯,而音讯的 push 模式则只须要咱们提供一个 listener 监听,实时获取音讯。

长处:

push 模式采纳长轮询阻塞的形式获取音讯,实时性十分高;

push 模式 rocketMQ 解决了获取音讯的细节,应用起来比较简单不便;

pull 模式能够指定生产进度,想生产多少就生产多少,灵活性大。

毛病:

  1. push 模式当消费者能力远远低于生产者能力的时候,会产生肯定的消费者音讯沉积;
  2. pull 模式实时性很低,频率不好设置;
  3. 拉取音讯的距离不好设置,太短则产生很多有效 Pull 申请的 RPC 开销,影响 MQ 整体的网络性能,太长则实时性差。

实用场景:

  1. 对于服务端生产音讯数据比拟大时,而生产端解决比较复杂,生产能力绝对较低时,这种状况就实用 pull 模式;
  2. 对于数据实时性要求高的场景,就比拟实用与 push 模式。

当初的你是否明确业务中该应用哪种模式了呢?

四、程序音讯

4.1 实现 MQ 程序音讯发送存在问题

(1)个别音讯发送会采取轮询形式把音讯发送到不同的 queue(分区队列);而生产音讯的时候从多个 queue 上拉取音讯,broker 之间是无感知的,这种状况发送和生产是不能保障程序。

(2)异步形式发送音讯时,发送的时候不是按着一条一条程序发送的,保障不了音讯达到 Broker 的工夫也是依照发送的程序来的。

音讯发送到存储,最初到生产要经验这么多步骤,咱们该如何在业务中应用程序音讯呢?让咱们来一步步拆解下吧。

4.2 实现 MQ 程序音讯关键点

既然扩散到多个 broker 上无奈追踪程序,那么能够管制发送的程序音讯只顺次发送到同一个 queue 中,生产的时候只从这个 queue 上顺次拉取,则就保障了程序。在发送时设置分片路由规定,让雷同 key 的音讯只落到指定 queue 上,而后生产过程中对程序音讯所在的 queue 加锁,保障音讯的有序性,让这个 queue 上的音讯就依照 FIFO 程序来进行生产。因而咱们满足以下三个条件是否就能够呢?

1)音讯程序发送:多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号 (如同一笔订单) 的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到 mq 中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。

// 采纳的同步发送形式,在一个线程内程序发送, 异步发送形式为:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

2)音讯顺序存储:MQ 的 topic 下会存在多个 queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个 queue 中。对应到 mq 中,须要应用 MessageQueueSelector 来抉择要发送的 queue。即能够对业务编号设置路由规定,像依据队列数量对业务字段 hash 取余,将音讯发送到一个 queue 中。

// 应用 "%" 操作,使得订单 id 取余后雷同的数据路由到同一个 queue 中,也能够自定义路由规定
long index = id % mqs.size();  
return mqs.get((int) index);

3)音讯程序生产:要保障音讯程序生产,同一个 queue 就只能被一个消费者所生产,因而对 broker 中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。这里 RocketMQ 曾经为咱们实现好了。

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
 
         //.... 省略
        }
    }

消费者从新负载,并且调配完生产队列后,须要向 mq 服务器发动音讯拉取申请,代码实现在 RebalanceImpl#updateProcessQueueTableInRebalance()中,针对程序音讯的音讯拉取,mq 做了以上判断,即生产客户端先向 broker 端发动对 messageQueue 的加锁申请,只有加锁胜利时才创立 pullRequest 进行音讯拉取,这里的 pullRequest 就是后面 pull 和 push 模式音讯体,而 updateProcessQueueTableInRebalance 这个办法也是在后面消费者启动过程中有讲到过哦。

具体加锁逻辑如下:

public boolean lock(final MessageQueue mq) {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
     if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();
         requestBody.setConsumerGroup(this.consumerGroup);
         requestBody.setClientId(this.mQClientFactory.getClientId());
         requestBody.getMqSet().add(mq);
 
         try {
             Set<MessageQueue> lockedMq =
                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
             for (MessageQueue mmqq : lockedMq) {ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                 if (processQueue != null) {processQueue.setLocked(true);
                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
                 }
             }
 
             boolean lockOK = lockedMq.contains(mq);
             log.info("the message queue lock {}, {} {}",
                 lockOK ? "OK" : "Failed",
                 this.consumerGroup,
                 mq);
             return lockOK;
         } catch (Exception e) {log.error("lockBatchMQ exception," + mq, e);
         }
     }
 
     return false;
 }

能够看到,就是调用 lockBatchMQ 办法发送了一个加锁申请,胜利获取到音讯解决队列就设为获取到锁,返回锁定胜利,如果加锁胜利,同一时刻只有一个线程进行音讯生产。加锁失败,会提早 1000ms 从新尝试向 broker 端申请锁定 messageQueue,锁定胜利后从新提交生产申请。

怎么样,这样的加锁形式是不是很像咱们平时用到的分布式锁呢?由你来设计实现你会怎么做呢?

五、音讯 ack 机制

5.1 音讯生产失败解决

音讯被消费者生产了,那么如何保障被生产胜利呢?音讯生产失败会呈现什么状况呢?

音讯被生产,那么如何保障被生产胜利呢?这里只有应用方管制,只有应用方确认胜利了,才会生产胜利,否则会从新投递。

RocketMQ 其实是通过 ACK 机制来对失败音讯进行重试和告诉的,具体流程如下所示:

音讯胜利与否是由应用方管制,只有应用方确认胜利了,才会生产胜利,否则会从新投递,Consumer 会通过监听器监听回调过去的音讯,返回 ConsumeConcurrentlyStatus.CONSUME\_SUCCESS 示意生产胜利,如果生产失败,返回 ConsumeConcurrentlyStatus.RECONSUME\_LATER 状态(生产重试),RocketMQ 就会默认为这条音讯失败了,提早肯定工夫后(默认 10s,可配置),会再次投送到 ConsumerGroup,重试次数与间隔时间关系上图所示。如果继续这样,失败到肯定次数(默认 16 次),就会进入到 DLQ 死信队列,不再投递,此时能够通过监控人工来干涉。

5.2 音讯重投带来问题

RocketMQ 生产音讯因为音讯重投很大一个问题就是无奈保障音讯只被生产一次,因而须要开发人员在业务外面本人去解决。

六、总结

本文次要介绍了 RocketMQ 的消费者启动流程,联合官网源码和示例,一步步讲述消费者在启动和音讯生产中的的工作原理及内容,并联合平时业务工作中,对咱们所相熟的程序、push/pull 模式等进行详细分析,以及对于音讯生产失败和重投带来问题去进行剖析。

对于本人而言,心愿通过被动学习源码形式,可能明确其中启动的原理,学习外面优良的计划,像对于 pull/push,程序音讯这些,学习之后可能理解到 push 模式是何如做到实时拉取音讯的,程序音讯是如何保障的,再就是可能联想到平时遇到这种问题该如何解决,像程序音讯在音讯被生产时放弃和存储的程序统一,这里本人施加分布式锁写能不能实现等,文中也有很多引导性问题,心愿能引起读者本人的思考,可能对整个消费者启动和音讯生产流程有着较为直观的认知,但还有着一些技术细节因为篇幅起因没做出具体阐明,也欢送大家一起探讨交换~

参考资料:

  • RocketMQ 官网示例
  • RocketMQ 系列之 pull(拉)音讯模式(七)
  • RocketMQ 的程序音讯(程序生产)

正文完
 0