咱们晓得,kafka 如果要保障程序生产,必须保障音讯保留到同一个patition上,而且为了有序性,只能有一个消费者进行生产。这种状况下,Kafka 就进化成了繁多队列,毫无并发性可言,极大升高零碎性能。那么对于对业务比拟敌对的RocketMQ 是如何实现的呢?首先,咱们循序渐进的来理解下程序音讯的实现。

程序音讯业务应用场景

1、电商场景中传递订单状态。

2、同步mysql 的binlong 日志,数据库的操作是有程序的。

3、其余音讯之间有先后的依赖关系,后一条音讯须要依赖于前一条音讯的处理结果的状况。

等等。。。

消息中间件中的程序音讯

程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。

程序音讯蕴含两种类型:

分区程序:一个Partition(queue)内所有的音讯依照先进先出的程序进行公布和生产

全局程序:一个Topic内所有的音讯依照先进先出的程序进行公布和生产.然而全局程序极大的升高了零碎的吞吐量,不合乎mq的设计初衷。

那么折中的方法就是抉择分区程序。

【部分程序生产】

如何保障程序

在MQ的模型中,程序须要由3个阶段去保障:

  1. 音讯被发送时放弃程序
  2. 音讯被存储时放弃和发送的程序统一
  3. 音讯被生产时放弃和存储的程序统一

发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯A和B,存储时在空间上A肯定在B之前。而生产放弃和存储统一则要求音讯A、B达到Consumer之后必须依照先A后B的程序被解决。

第一点,音讯程序发送,多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单)的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到mq中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。

第二点,音讯顺序存储,mq的topic下会存在多个queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个queue中。对应到mq中,须要应用MessageQueueSelector来抉择要发送的queue,即对业务编号进行hash,而后依据队列数量对hash值取余,将音讯发送到一个queue中。

第三点,音讯程序生产,要保障音讯程序生产,同一个queue就只能被一个消费者所生产,因而对broker中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。即,同一时刻,一个生产队列只能被一个消费者中的一个线程生产。

RocketMQ中程序的实现

【Producer端】

Producer端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的抉择。

/** * 音讯队列选择器 */public interface MessageQueueSelector {    /**     * 抉择音讯队列     *     * @param mqs 音讯队列     * @param msg 音讯     * @param arg 参数     * @return 音讯队列     */    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
  • List<MessageQueue> mqs:音讯要发送的Topic下所有的分区
  • Message msg:音讯对象
  • 额定的参数:用户能够传递本人的参数

比方如下实现就能够保障雷同的订单的音讯被路由到雷同的分区:

long orderId = ((Order) object).getOrderId;return mqs.get(orderId % mqs.size());

【Consumer端】

尝试锁定锁定MessageQueue。

首先咱们如何保障一个队列只被一个消费者生产?

生产队列存在于broker端,如果想保障一个队列被一个消费者生产,那么消费者在进行音讯拉勾销费时就必须向mq服务器申请队列锁,消费者申请队列锁的代码存在于RebalanceService音讯队列负载的实现代码中。

消费者从新负载,并且调配完生产队列后,须要向mq服务器发动音讯拉取申请,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance中,针对程序音讯的音讯拉取,mq做了如下判断:

 // 减少 不在processQueueTable && 存在于mqSet 里的音讯队列。        List<PullRequest> pullRequestList = new ArrayList<>(); // 拉音讯申请数组        for (MessageQueue mq : mqSet) {            if (!this.processQueueTable.containsKey(mq)) {                if (isOrder && !this.lock(mq)) { // 程序音讯锁定音讯队列                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);                    continue;                }                this.removeDirtyOffset(mq);                ProcessQueue pq = new ProcessQueue();                long nextOffset = this.computePullFromWhere(mq);                if (nextOffset >= 0) {                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);                    if (pre != null) {                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);                    } else {                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);                        PullRequest pullRequest = new PullRequest();                        pullRequest.setConsumerGroup(consumerGroup);                        pullRequest.setNextOffset(nextOffset);                        pullRequest.setMessageQueue(mq);                        pullRequest.setProcessQueue(pq);                        pullRequestList.add(pullRequest);                        changed = true;                    }                } else {                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);                }            }        }        // 发动音讯拉取申请        this.dispatchPullRequest(pullRequestList);

核心思想就是,生产客户端先向broker端发动对messageQueue的加锁申请,只有加锁胜利时才创立pullRequest进行音讯拉取,上面看下lock加锁申请办法:

 /**     * 申请Broker取得指定音讯队列的分布式锁     *     * @param mq 队列     * @return 是否胜利     */    public boolean lock(final MessageQueue mq) {        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);        if (findBrokerResult != null) {            LockBatchRequestBody requestBody = new LockBatchRequestBody();            requestBody.setConsumerGroup(this.consumerGroup);            requestBody.setClientId(this.mQClientFactory.getClientId());            requestBody.getMqSet().add(mq);            try {                // 申请Broker取得指定音讯队列的分布式锁                Set<MessageQueue> lockedMq =                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);                // 设置音讯解决队列锁定胜利。锁定音讯队列胜利,可能本地没有音讯解决队列,设置锁定胜利会在lockAll()办法。                for (MessageQueue mmqq : lockedMq) {                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);                    if (processQueue != null) {                        processQueue.setLocked(true);                        processQueue.setLastLockTimestamp(System.currentTimeMillis());                    }                }                boolean lockOK = lockedMq.contains(mq);                log.info("the message queue lock {}, {} {}",                    lockOK ? "OK" : "Failed",                    this.consumerGroup,                    mq);                return lockOK;            } catch (Exception e) {                log.error("lockBatchMQ exception, " + mq, e);            }        }        return false;    }

代码实现逻辑比拟清晰,就是调用lockBatchMQ办法发送了一个加锁申请,那么broker端收到加锁申请后的解决逻辑又是怎么样?

【broker端实现】

broker端收到加锁申请的解决逻辑在RebalanceLockManager#tryLockBatch办法中,RebalanceLockManager中要害属性如下:

/**     * 音讯队列锁过期工夫,默认60s     */    private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));    /**     * 锁     */    private final Lock lock = new ReentrantLock();    /**     * 生产分组的音讯队列锁映射     */    private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =            new ConcurrentHashMap<>(1024);

LockEntry对象中要害属性如下:

 /**     * 锁定记录     */    static class LockEntry {        /**         * 客户端编号         */        private String clientId;        /**         * 最初锁定工夫         */        private volatile long lastUpdateTimestamp = System.currentTimeMillis();        public String getClientId() {            return clientId;        }        public void setClientId(String clientId) {            this.clientId = clientId;        }        public long getLastUpdateTimestamp() {            return lastUpdateTimestamp;        }        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {            this.lastUpdateTimestamp = lastUpdateTimestamp;        }        /**         * 是否锁定         *         * @param clientId 客户端编号         * @return 是否         */        public boolean isLocked(final String clientId) {            boolean eq = this.clientId.equals(clientId);            return eq && !this.isExpired();        }        /**         * 锁定是否过期         *         * @return 是否         */        public boolean isExpired() {            boolean expired =                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;            return expired;        }    }

broker端通过对ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable的保护来达到messageQueue加锁的目标,使得同一时刻,一个messageQueue只能被一个消费者生产。

【再次回到Consumer端,拿到锁后】

消费者对messageQueue的加锁曾经胜利,那么就进入到了第二个步骤,创立pullRequest进行音讯拉取,音讯拉取局部的代码实现在PullMessageService中,音讯拉取完后,须要提交到ConsumeMessageService中进行生产,程序生产的实现为ConsumeMessageOrderlyService,提交音讯进行生产的办法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:

    @Override    public void submitConsumeRequest(//        final List<MessageExt> msgs, //        final ProcessQueue processQueue, //        final MessageQueue messageQueue, //        final boolean dispathToConsume) {        if (dispathToConsume) {            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);            this.consumeExecutor.submit(consumeRequest);        }    }

构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行生产,看下程序生产的ConsumeRequest的run办法实现:

 public void run() {            if (this.processQueue.isDropped()) {                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                return;            }            // 取得 Consumer 音讯队列锁            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);            synchronized (objLock) {                // (播送模式) 或者 (集群模式 && Broker音讯队列锁无效)                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {                    final long beginTime = System.currentTimeMillis();                    // 循环                    for (boolean continueConsume = true; continueConsume; ) {                        if (this.processQueue.isDropped()) {                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                            break;                        }                        // 音讯队列分布式锁未锁定,提交提早取得锁并生产申请                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                            && !this.processQueue.isLocked()) {                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);                            break;                        }                        // 音讯队列分布式锁曾经过期,提交提早取得锁并生产申请                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                            && this.processQueue.isLockExpired()) {                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);                            break;                        }                        // 以后周期生产工夫超过间断时长,默认:60s,提交提早生产申请。默认状况下,每生产1分钟劳动10ms。                        long interval = System.currentTimeMillis() - beginTime;                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);                            break;                        }                        // 获取生产音讯。此处和并发音讯申请不同,并发音讯申请曾经带了生产哪些音讯。                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);                        if (!msgs.isEmpty()) {                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);                            ConsumeOrderlyStatus status = null;                            // Hook:before                            ConsumeMessageContext consumeMessageContext = null;                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                consumeMessageContext = new ConsumeMessageContext();                                consumeMessageContext                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());                                consumeMessageContext.setMq(messageQueue);                                consumeMessageContext.setMsgList(msgs);                                consumeMessageContext.setSuccess(false);                                // init the consume context type                                consumeMessageContext.setProps(new HashMap<String, String>());                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);                            }                            // 执行生产                            long beginTimestamp = System.currentTimeMillis();                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;                            boolean hasException = false;                            try {                                this.processQueue.getLockConsume().lock(); // 锁定队列生产锁                                if (this.processQueue.isDropped()) {                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",                                        this.messageQueue);                                    break;                                }                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);                            } catch (Throwable e) {                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //                                    RemotingHelper.exceptionSimpleDesc(e), //                                    ConsumeMessageOrderlyService.this.consumerGroup, //                                    msgs, //                                    messageQueue);                                hasException = true;                            } finally {                                this.processQueue.getLockConsume().unlock(); // 锁定队列生产锁                            }                            if (null == status //                                || ConsumeOrderlyStatus.ROLLBACK == status//                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //                                    ConsumeMessageOrderlyService.this.consumerGroup, //                                    msgs, //                                    messageQueue);                            }                            // 解析生产后果状态                            long consumeRT = System.currentTimeMillis() - beginTimestamp;                            if (null == status) {                                if (hasException) {                                    returnType = ConsumeReturnType.EXCEPTION;                                } else {                                    returnType = ConsumeReturnType.RETURNNULL;                                }                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {                                returnType = ConsumeReturnType.TIME_OUT;                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {                                returnType = ConsumeReturnType.FAILED;                            } else if (ConsumeOrderlyStatus.SUCCESS == status) {                                returnType = ConsumeReturnType.SUCCESS;                            }                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());                            }                            if (null == status) {                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                            }                            // Hook:after                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                consumeMessageContext.setStatus(status.toString());                                consumeMessageContext                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);                            }                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);                            // 解决生产后果                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);                        } else {                            continueConsume = false;                        }                    }                } else {                    if (this.processQueue.isDropped()) {                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                        return;                    }                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);                }            }        }

获取到锁对象后,应用synchronized尝试申请线程级独占锁。

如果加锁胜利,同一时刻只有一个线程进行音讯生产。

如果加锁失败,会提早100ms从新尝试向broker端申请锁定messageQueue,锁定胜利后从新提交生产申请

至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。

创立音讯拉取工作时,音讯客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个生产客户端生产。

音讯生产时,多线程针对同一个音讯队列的生产先尝试应用synchronized申请独占锁,加锁胜利能力进行生产,使得一个MessageQueue同一个时刻只能被一个生产客户端中一个线程生产。

【程序生产问题拆解】

  1. broke 上要保障一个队列只有一个过程生产,即一个队列同一时间只有一个consumer 生产
  2. broker 给consumer 的音讯程序应该保持一致,这个通过 rpc传输,序列化后音讯程序不变,所以很容易实现
  3. consumer 上的队列音讯要保障同一个工夫只有一个线程生产

通过问题的拆分,问题变成同一个共享资源串行解决了,要解决这个问题,通常的做法都是拜访资源的时候加锁,即broker 上一个队列音讯在被consumer 拜访的必须加锁,单个consumer 端多线程并发解决音讯的时候须要加锁;这里还须要思考broker 锁的异常情况,如果一个broke 队列上的音讯被consumer 锁住了,万一consumer 解体了,这个锁就开释不了,所以broker 上的锁须要加上锁的过期工夫。
实际上 RocketMQ 生产端也就是照着下面的思路做:

RocketMQ中程序音讯注意事项

  1. 理论我的项目中并不是所有状况都须要用到程序音讯,但这也是设计方案的时候容易疏忽的一点
  2. 程序音讯是生产者和消费者配合协调作用的后果,然而生产端保障程序生产,是保障不了程序音讯的
  3. 生产端并行形式生产,只设置一次拉取音讯的数量为 1(即配置参数 consumeBatchSize ),是否能够实现程序生产 ?这里理论是不能的,并发生产在生产端有多个线程同时生产,consumeBatchSize 只是一个线程一次拉取音讯的数量,对程序生产没有意义,这里大家有趣味能够看 ConsumeMessageConcurrentlyService 的代码,并发生产的逻辑都在哪里。

在应用程序音讯时,肯定要留神其异常情况的呈现,对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX\_VALUE.这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。

重要的事再强调一次:在应用程序音讯时,肯定要留神其异常情况的呈现!避免资源不开释!

小结

通过以上的理解,咱们晓得了实现程序音讯所必要的条件:程序发送、顺序存储、程序生产。RocketMQ的设计中思考到了这些,咱们只须要简略的应用API,不须要额定应用代码来束缚业务,使得实现程序音讯更加简略。