乐趣区

关于rocketmq:顺序消息的实现RocketMQ知识体系5

咱们晓得,kafka 如果要保障程序生产,必须保障音讯保留到同一个 patition 上,而且为了有序性,只能有一个消费者进行生产。这种状况下,Kafka 就进化成了繁多队列,毫无并发性可言,极大升高零碎性能。那么对于对业务比拟敌对的 RocketMQ 是如何实现的呢?首先,咱们循序渐进的来理解下程序音讯的实现。

程序音讯业务应用场景

1、电商场景中传递订单状态。

2、同步 mysql 的 binlong 日志,数据库的操作是有程序的。

3、其余音讯之间有先后的依赖关系,后一条音讯须要依赖于前一条音讯的处理结果的状况。

等等。。。

消息中间件中的程序音讯

程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。

程序音讯蕴含两种类型:

分区程序 :一个 Partition(queue) 内所有的音讯依照先进先出的程序进行公布和生产

全局程序:一个 Topic 内所有的音讯依照先进先出的程序进行公布和生产. 然而全局程序极大的升高了零碎的吞吐量,不合乎 mq 的设计初衷。

那么折中的方法就是抉择分区程序。

【部分程序生产】

如何保障程序

在 MQ 的模型中,程序须要由 3 个阶段去保障:

  1. 音讯被发送时放弃程序
  2. 音讯被存储时放弃和发送的程序统一
  3. 音讯被生产时放弃和存储的程序统一

发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯 A 和 B,存储时在空间上 A 肯定在 B 之前。而生产放弃和存储统一则要求音讯 A、B 达到 Consumer 之后必须依照先 A 后 B 的程序被解决。

第一点,音讯程序发送 ,多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单) 的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到 mq 中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。

第二点,音讯顺序存储,mq 的 topic 下会存在多个 queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个 queue 中。对应到 mq 中,须要应用 MessageQueueSelector 来抉择要发送的 queue,即对业务编号进行 hash,而后依据队列数量对 hash 值取余,将音讯发送到一个 queue 中。

第三点,音讯程序生产,要保障音讯程序生产,同一个 queue 就只能被一个消费者所生产,因而对 broker 中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。即,同一时刻,一个生产队列只能被一个消费者中的一个线程生产。

RocketMQ 中程序的实现

【Producer 端】

Producer 端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在 RocketMQ 中,通过 MessageQueueSelector 来实现分区的抉择。

/**
 * 音讯队列选择器
 */
public interface MessageQueueSelector {

    /**
     * 抉择音讯队列
     *
     * @param mqs 音讯队列
     * @param msg 音讯
     * @param arg 参数
     * @return 音讯队列
     */
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
  • List<MessageQueue> mqs:音讯要发送的 Topic 下所有的分区
  • Message msg:音讯对象
  • 额定的参数:用户能够传递本人的参数

比方如下实现就 能够保障雷同的订单的音讯被路由到雷同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

【Consumer 端】

尝试锁定锁定 MessageQueue。

首先咱们如何保障一个队列只被一个消费者生产?

生产队列存在于 broker 端,如果想保障一个队列被一个消费者生产,那么消费者在进行音讯拉勾销费时就 必须向 mq 服务器申请队列锁,消费者申请队列锁的代码存在于 RebalanceService 音讯队列负载的实现代码中。

消费者从新负载,并且调配完生产队列后,须要向 mq 服务器发动音讯拉取申请,代码实现在 RebalanceImpl#updateProcessQueueTableInRebalance 中,针对程序音讯的音讯拉取,mq 做了如下判断:

 // 减少 不在 processQueueTable && 存在于 mqSet 里的音讯队列。List<PullRequest> pullRequestList = new ArrayList<>(); // 拉音讯申请数组
        for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) { // 程序音讯锁定音讯队列
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

        // 发动音讯拉取申请
        this.dispatchPullRequest(pullRequestList);

核心思想就是,生产客户端先向 broker 端发动对 messageQueue 的加锁申请,只有加锁胜利时才创立 pullRequest 进行音讯拉取,上面看下 lock 加锁申请办法:

 /**
     * 申请 Broker 取得指定音讯队列的分布式锁
     *
     * @param mq 队列
     * @return 是否胜利
     */
    public boolean lock(final MessageQueue mq) {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);

            try {
                // 申请 Broker 取得指定音讯队列的分布式锁
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

                // 设置音讯解决队列锁定胜利。锁定音讯队列胜利,可能本地没有音讯解决队列,设置锁定胜利会在 lockAll()办法。for (MessageQueue mmqq : lockedMq) {ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    if (processQueue != null) {processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }

                boolean lockOK = lockedMq.contains(mq);
                log.info("the message queue lock {}, {} {}",
                    lockOK ? "OK" : "Failed",
                    this.consumerGroup,
                    mq);
                return lockOK;
            } catch (Exception e) {log.error("lockBatchMQ exception," + mq, e);
            }
        }

        return false;
    }

代码实现逻辑比拟清晰,就是调用 lockBatchMQ 办法发送了一个加锁申请,那么 broker 端收到加锁申请后的解决逻辑又是怎么样?

【broker 端实现】

broker 端收到加锁申请的解决逻辑在 RebalanceLockManager#tryLockBatch 办法中,RebalanceLockManager 中要害属性如下:

/**
     * 音讯队列锁过期工夫,默认 60s
     */
    private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
    /**
     * 锁
     */
    private final Lock lock = new ReentrantLock();
    /**
     * 生产分组的音讯队列锁映射
     */
    private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
            new ConcurrentHashMap<>(1024);

LockEntry 对象中要害属性如下:

 /**
     * 锁定记录
     */
    static class LockEntry {
        /**
         * 客户端编号
         */
        private String clientId;
        /**
         * 最初锁定工夫
         */
        private volatile long lastUpdateTimestamp = System.currentTimeMillis();

        public String getClientId() {return clientId;}

        public void setClientId(String clientId) {this.clientId = clientId;}

        public long getLastUpdateTimestamp() {return lastUpdateTimestamp;}

        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {this.lastUpdateTimestamp = lastUpdateTimestamp;}

        /**
         * 是否锁定
         *
         * @param clientId 客户端编号
         * @return 是否
         */
        public boolean isLocked(final String clientId) {boolean eq = this.clientId.equals(clientId);
            return eq && !this.isExpired();}

        /**
         * 锁定是否过期
         *
         * @return 是否
         */
        public boolean isExpired() {
            boolean expired =
                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

            return expired;
        }
    }

broker 端通过对 ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable 的保护来达到 messageQueue 加锁的目标,使得同一时刻,一个 messageQueue 只能被一个消费者生产。

【再次回到 Consumer 端,拿到锁后】

消费者对 messageQueue 的加锁曾经胜利,那么就进入到了第二个步骤,创立 pullRequest 进行音讯拉取,音讯拉取局部的代码实现在 PullMessageService 中,音讯拉取完后,须要提交到 ConsumeMessageService 中进行生产,程序生产的实现为 ConsumeMessageOrderlyService,提交音讯进行生产的办法为 ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:

    @Override
    public void submitConsumeRequest(//
        final List<MessageExt> msgs, //
        final ProcessQueue processQueue, //
        final MessageQueue messageQueue, //
        final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

构建了一个 ConsumeRequest 对象,并提交给了 ThreadPoolExecutor 来并行生产,看下程序生产的 ConsumeRequest 的 run 办法实现:

 public void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            // 取得 Consumer 音讯队列锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {// (播送模式) 或者 (集群模式 && Broker 音讯队列锁无效)
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();
                    // 循环
                    for (boolean continueConsume = true; continueConsume;) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

                        // 音讯队列分布式锁未锁定,提交提早取得锁并生产申请
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 音讯队列分布式锁曾经过期,提交提早取得锁并生产申请
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        // 以后周期生产工夫超过间断时长,默认:60s,提交提早生产申请。默认状况下,每生产 1 分钟劳动 10ms。long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }

                        // 获取生产音讯。此处和并发音讯申请不同,并发音讯申请曾经带了生产哪些音讯。final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            // Hook:before
                            ConsumeMessageContext consumeMessageContext = null;
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();
                                consumeMessageContext
                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                consumeMessageContext.setMq(messageQueue);
                                consumeMessageContext.setMsgList(msgs);
                                consumeMessageContext.setSuccess(false);
                                // init the consume context type
                                consumeMessageContext.setProps(new HashMap<String, String>());
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                            }

                            // 执行生产
                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {this.processQueue.getLockConsume().lock(); // 锁定队列生产锁

                                if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
                                    RemotingHelper.exceptionSimpleDesc(e), //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                                hasException = true;
                            } finally {this.processQueue.getLockConsume().unlock(); // 锁定队列生产锁}

                            if (null == status //
                                || ConsumeOrderlyStatus.ROLLBACK == status//
                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                            }

                            // 解析生产后果状态
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}
                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}

                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                            }

                            if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}

                            // Hook:after
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());
                                consumeMessageContext
                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                            }

                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            // 解决生产后果
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {continueConsume = false;}
                    }
                } else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

获取到锁对象后,应用 synchronized 尝试申请线程级独占锁。

如果加锁胜利,同一时刻只有一个线程进行音讯生产。

如果加锁失败,会提早 100ms 从新尝试向 broker 端申请锁定 messageQueue,锁定胜利后从新提交生产申请

至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。

创立音讯拉取工作时,音讯客户端向 broker 端申请锁定 MessageQueue,使得一个 MessageQueue 同一个时刻只能被一个生产客户端生产。

音讯生产时,多线程针对同一个音讯队列的生产先尝试应用 synchronized 申请独占锁,加锁胜利能力进行生产,使得一个 MessageQueue 同一个时刻只能被一个生产客户端中一个线程生产。

【程序生产问题拆解】

  1. broke 上要保障一个队列只有一个过程生产,即一个队列同一时间只有一个 consumer 生产
  2. broker 给 consumer 的音讯程序应该保持一致,这个通过 rpc 传输,序列化后音讯程序不变,所以很容易实现
  3. consumer 上的队列音讯要保障同一个工夫只有一个线程生产

通过问题的拆分,问题变成同一个共享资源串行解决了,要解决这个问题,通常的做法都是拜访资源的时候加锁,即 broker 上一个队列音讯在被 consumer 拜访的必须加锁,单个 consumer 端多线程并发解决音讯的时候须要加锁;这里还须要思考 broker 锁的异常情况,如果一个 broke 队列上的音讯被 consumer 锁住了,万一 consumer 解体了,这个锁就开释不了,所以 broker 上的锁须要加上锁的过期工夫。
实际上 RocketMQ 生产端也就是照着下面的思路做:

RocketMQ 中程序音讯注意事项

  1. 理论我的项目中并不是所有状况都须要用到程序音讯,但这也是设计方案的时候容易疏忽的一点
  2. 程序音讯是生产者和消费者配合协调作用的后果,然而生产端保障程序生产,是保障不了程序音讯的
  3. 生产端并行形式生产,只设置一次拉取音讯的数量为 1(即配置参数 consumeBatchSize),是否能够实现程序生产?这里理论是不能的,并发生产在生产端有多个线程同时生产,consumeBatchSize 只是一个线程一次拉取音讯的数量,对程序生产没有意义,这里大家有趣味能够看 ConsumeMessageConcurrentlyService 的代码,并发生产的逻辑都在哪里。

在应用程序音讯时,肯定要留神其异常情况的呈现,对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),重试最大值是 Integer.MAX\_VALUE. 这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。

重要的事再强调一次:在应用程序音讯时,肯定要留神其异常情况的呈现!避免资源不开释!

小结

通过以上的理解,咱们晓得了实现程序音讯所必要的条件:程序发送、顺序存储、程序生产。RocketMQ 的设计中思考到了这些,咱们只须要简略的应用 API,不须要额定应用代码来束缚业务,使得实现程序音讯更加简略。

退出移动版