咱们晓得,kafka 如果要保障程序生产,必须保障音讯保留到同一个patition上,而且为了有序性,只能有一个消费者进行生产。这种状况下,Kafka 就进化成了繁多队列,毫无并发性可言,极大升高零碎性能。那么对于对业务比拟敌对的RocketMQ 是如何实现的呢?首先,咱们循序渐进的来理解下程序音讯的实现。
程序音讯业务应用场景
1、电商场景中传递订单状态。
2、同步mysql 的binlong 日志,数据库的操作是有程序的。
3、其余音讯之间有先后的依赖关系,后一条音讯须要依赖于前一条音讯的处理结果的状况。
等等。。。
消息中间件中的程序音讯
程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。
程序音讯蕴含两种类型:
分区程序:一个Partition(queue)内所有的音讯依照先进先出的程序进行公布和生产
全局程序:一个Topic内所有的音讯依照先进先出的程序进行公布和生产.然而全局程序极大的升高了零碎的吞吐量,不合乎mq的设计初衷。
那么折中的方法就是抉择分区程序。
【部分程序生产】
如何保障程序
在MQ的模型中,程序须要由3个阶段去保障:
- 音讯被发送时放弃程序
- 音讯被存储时放弃和发送的程序统一
- 音讯被生产时放弃和存储的程序统一
发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯A和B,存储时在空间上A肯定在B之前。而生产放弃和存储统一则要求音讯A、B达到Consumer之后必须依照先A后B的程序被解决。
第一点,音讯程序发送,多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单)的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到mq中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。
第二点,音讯顺序存储,mq的topic下会存在多个queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个queue中。对应到mq中,须要应用MessageQueueSelector来抉择要发送的queue,即对业务编号进行hash,而后依据队列数量对hash值取余,将音讯发送到一个queue中。
第三点,音讯程序生产,要保障音讯程序生产,同一个queue就只能被一个消费者所生产,因而对broker中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。即,同一时刻,一个生产队列只能被一个消费者中的一个线程生产。
RocketMQ中程序的实现
【Producer端】
Producer端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的抉择。
/** * 音讯队列选择器 */public interface MessageQueueSelector { /** * 抉择音讯队列 * * @param mqs 音讯队列 * @param msg 音讯 * @param arg 参数 * @return 音讯队列 */ MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
- List<MessageQueue> mqs:音讯要发送的Topic下所有的分区
- Message msg:音讯对象
- 额定的参数:用户能够传递本人的参数
比方如下实现就能够保障雷同的订单的音讯被路由到雷同的分区:
long orderId = ((Order) object).getOrderId;return mqs.get(orderId % mqs.size());
【Consumer端】
尝试锁定锁定MessageQueue。
首先咱们如何保障一个队列只被一个消费者生产?
生产队列存在于broker端,如果想保障一个队列被一个消费者生产,那么消费者在进行音讯拉勾销费时就必须向mq服务器申请队列锁,消费者申请队列锁的代码存在于RebalanceService音讯队列负载的实现代码中。
消费者从新负载,并且调配完生产队列后,须要向mq服务器发动音讯拉取申请,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance中,针对程序音讯的音讯拉取,mq做了如下判断:
// 减少 不在processQueueTable && 存在于mqSet 里的音讯队列。 List<PullRequest> pullRequestList = new ArrayList<>(); // 拉音讯申请数组 for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { // 程序音讯锁定音讯队列 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 发动音讯拉取申请 this.dispatchPullRequest(pullRequestList);
核心思想就是,生产客户端先向broker端发动对messageQueue的加锁申请,只有加锁胜利时才创立pullRequest进行音讯拉取,上面看下lock加锁申请办法:
/** * 申请Broker取得指定音讯队列的分布式锁 * * @param mq 队列 * @return 是否胜利 */ public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { // 申请Broker取得指定音讯队列的分布式锁 Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); // 设置音讯解决队列锁定胜利。锁定音讯队列胜利,可能本地没有音讯解决队列,设置锁定胜利会在lockAll()办法。 for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }
代码实现逻辑比拟清晰,就是调用lockBatchMQ办法发送了一个加锁申请,那么broker端收到加锁申请后的解决逻辑又是怎么样?
【broker端实现】
broker端收到加锁申请的解决逻辑在RebalanceLockManager#tryLockBatch办法中,RebalanceLockManager中要害属性如下:
/** * 音讯队列锁过期工夫,默认60s */ private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); /** * 锁 */ private final Lock lock = new ReentrantLock(); /** * 生产分组的音讯队列锁映射 */ private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = new ConcurrentHashMap<>(1024);
LockEntry对象中要害属性如下:
/** * 锁定记录 */ static class LockEntry { /** * 客户端编号 */ private String clientId; /** * 最初锁定工夫 */ private volatile long lastUpdateTimestamp = System.currentTimeMillis(); public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } /** * 是否锁定 * * @param clientId 客户端编号 * @return 是否 */ public boolean isLocked(final String clientId) { boolean eq = this.clientId.equals(clientId); return eq && !this.isExpired(); } /** * 锁定是否过期 * * @return 是否 */ public boolean isExpired() { boolean expired = (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; return expired; } }
broker端通过对ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable的保护来达到messageQueue加锁的目标,使得同一时刻,一个messageQueue只能被一个消费者生产。
【再次回到Consumer端,拿到锁后】
消费者对messageQueue的加锁曾经胜利,那么就进入到了第二个步骤,创立pullRequest进行音讯拉取,音讯拉取局部的代码实现在PullMessageService中,音讯拉取完后,须要提交到ConsumeMessageService中进行生产,程序生产的实现为ConsumeMessageOrderlyService,提交音讯进行生产的办法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:
@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行生产,看下程序生产的ConsumeRequest的run办法实现:
public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 取得 Consumer 音讯队列锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // (播送模式) 或者 (集群模式 && Broker音讯队列锁无效) if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); // 循环 for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 音讯队列分布式锁未锁定,提交提早取得锁并生产申请 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 音讯队列分布式锁曾经过期,提交提早取得锁并生产申请 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 以后周期生产工夫超过间断时长,默认:60s,提交提早生产申请。默认状况下,每生产1分钟劳动10ms。 long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } // 获取生产音讯。此处和并发音讯申请不同,并发音讯申请曾经带了生产哪些音讯。 final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; // Hook:before ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context type consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } // 执行生产 long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); // 锁定队列生产锁 if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // RemotingHelper.exceptionSimpleDesc(e), // ConsumeMessageOrderlyService.this.consumerGroup, // msgs, // messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); // 锁定队列生产锁 } if (null == status // || ConsumeOrderlyStatus.ROLLBACK == status// || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // ConsumeMessageOrderlyService.this.consumerGroup, // msgs, // messageQueue); } // 解析生产后果状态 long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // Hook:after if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 解决生产后果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
获取到锁对象后,应用synchronized尝试申请线程级独占锁。
如果加锁胜利,同一时刻只有一个线程进行音讯生产。
如果加锁失败,会提早100ms从新尝试向broker端申请锁定messageQueue,锁定胜利后从新提交生产申请
至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。
创立音讯拉取工作时,音讯客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个生产客户端生产。
音讯生产时,多线程针对同一个音讯队列的生产先尝试应用synchronized申请独占锁,加锁胜利能力进行生产,使得一个MessageQueue同一个时刻只能被一个生产客户端中一个线程生产。
【程序生产问题拆解】
- broke 上要保障一个队列只有一个过程生产,即一个队列同一时间只有一个consumer 生产
- broker 给consumer 的音讯程序应该保持一致,这个通过 rpc传输,序列化后音讯程序不变,所以很容易实现
- consumer 上的队列音讯要保障同一个工夫只有一个线程生产
通过问题的拆分,问题变成同一个共享资源串行解决了,要解决这个问题,通常的做法都是拜访资源的时候加锁,即broker 上一个队列音讯在被consumer 拜访的必须加锁,单个consumer 端多线程并发解决音讯的时候须要加锁;这里还须要思考broker 锁的异常情况,如果一个broke 队列上的音讯被consumer 锁住了,万一consumer 解体了,这个锁就开释不了,所以broker 上的锁须要加上锁的过期工夫。
实际上 RocketMQ 生产端也就是照着下面的思路做:
RocketMQ中程序音讯注意事项
- 理论我的项目中并不是所有状况都须要用到程序音讯,但这也是设计方案的时候容易疏忽的一点
- 程序音讯是生产者和消费者配合协调作用的后果,然而生产端保障程序生产,是保障不了程序音讯的
- 生产端并行形式生产,只设置一次拉取音讯的数量为 1(即配置参数 consumeBatchSize ),是否能够实现程序生产 ?这里理论是不能的,并发生产在生产端有多个线程同时生产,consumeBatchSize 只是一个线程一次拉取音讯的数量,对程序生产没有意义,这里大家有趣味能够看 ConsumeMessageConcurrentlyService 的代码,并发生产的逻辑都在哪里。
在应用程序音讯时,肯定要留神其异常情况的呈现,对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX\_VALUE.这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。
重要的事再强调一次:在应用程序音讯时,肯定要留神其异常情况的呈现!避免资源不开释!
小结
通过以上的理解,咱们晓得了实现程序音讯所必要的条件:程序发送、顺序存储、程序生产。RocketMQ的设计中思考到了这些,咱们只须要简略的应用API,不须要额定应用代码来束缚业务,使得实现程序音讯更加简略。