咱们晓得,kafka 如果要保障程序生产,必须保障音讯保留到同一个 patition 上,而且为了有序性,只能有一个消费者进行生产。这种状况下,Kafka 就进化成了繁多队列,毫无并发性可言,极大升高零碎性能。那么对于对业务比拟敌对的 RocketMQ 是如何实现的呢?首先,咱们循序渐进的来理解下程序音讯的实现。
程序音讯业务应用场景
1、电商场景中传递订单状态。
2、同步 mysql 的 binlong 日志,数据库的操作是有程序的。
3、其余音讯之间有先后的依赖关系,后一条音讯须要依赖于前一条音讯的处理结果的状况。
等等。。。
消息中间件中的程序音讯
程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。
程序音讯蕴含两种类型:
分区程序 :一个 Partition(queue) 内所有的音讯依照先进先出的程序进行公布和生产
全局程序:一个 Topic 内所有的音讯依照先进先出的程序进行公布和生产. 然而全局程序极大的升高了零碎的吞吐量,不合乎 mq 的设计初衷。
那么折中的方法就是抉择分区程序。
【部分程序生产】
如何保障程序
在 MQ 的模型中,程序须要由 3 个阶段去保障:
- 音讯被发送时放弃程序
- 音讯被存储时放弃和发送的程序统一
- 音讯被生产时放弃和存储的程序统一
发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯 A 和 B,存储时在空间上 A 肯定在 B 之前。而生产放弃和存储统一则要求音讯 A、B 达到 Consumer 之后必须依照先 A 后 B 的程序被解决。
第一点,音讯程序发送 ,多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单) 的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到 mq 中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。
第二点,音讯顺序存储,mq 的 topic 下会存在多个 queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个 queue 中。对应到 mq 中,须要应用 MessageQueueSelector 来抉择要发送的 queue,即对业务编号进行 hash,而后依据队列数量对 hash 值取余,将音讯发送到一个 queue 中。
第三点,音讯程序生产,要保障音讯程序生产,同一个 queue 就只能被一个消费者所生产,因而对 broker 中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。即,同一时刻,一个生产队列只能被一个消费者中的一个线程生产。
RocketMQ 中程序的实现
【Producer 端】
Producer 端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在 RocketMQ 中,通过 MessageQueueSelector 来实现分区的抉择。
/**
* 音讯队列选择器
*/
public interface MessageQueueSelector {
/**
* 抉择音讯队列
*
* @param mqs 音讯队列
* @param msg 音讯
* @param arg 参数
* @return 音讯队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
- List<MessageQueue> mqs:音讯要发送的 Topic 下所有的分区
- Message msg:音讯对象
- 额定的参数:用户能够传递本人的参数
比方如下实现就 能够保障雷同的订单的音讯被路由到雷同的分区:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
【Consumer 端】
尝试锁定锁定 MessageQueue。
首先咱们如何保障一个队列只被一个消费者生产?
生产队列存在于 broker 端,如果想保障一个队列被一个消费者生产,那么消费者在进行音讯拉勾销费时就 必须向 mq 服务器申请队列锁,消费者申请队列锁的代码存在于 RebalanceService 音讯队列负载的实现代码中。
消费者从新负载,并且调配完生产队列后,须要向 mq 服务器发动音讯拉取申请,代码实现在 RebalanceImpl#updateProcessQueueTableInRebalance 中,针对程序音讯的音讯拉取,mq 做了如下判断:
// 减少 不在 processQueueTable && 存在于 mqSet 里的音讯队列。List<PullRequest> pullRequestList = new ArrayList<>(); // 拉音讯申请数组
for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) { // 程序音讯锁定音讯队列
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 发动音讯拉取申请
this.dispatchPullRequest(pullRequestList);
核心思想就是,生产客户端先向 broker 端发动对 messageQueue 的加锁申请,只有加锁胜利时才创立 pullRequest 进行音讯拉取,上面看下 lock 加锁申请办法:
/**
* 申请 Broker 取得指定音讯队列的分布式锁
*
* @param mq 队列
* @return 是否胜利
*/
public boolean lock(final MessageQueue mq) {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
// 申请 Broker 取得指定音讯队列的分布式锁
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 设置音讯解决队列锁定胜利。锁定音讯队列胜利,可能本地没有音讯解决队列,设置锁定胜利会在 lockAll()办法。for (MessageQueue mmqq : lockedMq) {ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {log.error("lockBatchMQ exception," + mq, e);
}
}
return false;
}
代码实现逻辑比拟清晰,就是调用 lockBatchMQ 办法发送了一个加锁申请,那么 broker 端收到加锁申请后的解决逻辑又是怎么样?
【broker 端实现】
broker 端收到加锁申请的解决逻辑在 RebalanceLockManager#tryLockBatch 办法中,RebalanceLockManager 中要害属性如下:
/**
* 音讯队列锁过期工夫,默认 60s
*/
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
/**
* 锁
*/
private final Lock lock = new ReentrantLock();
/**
* 生产分组的音讯队列锁映射
*/
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<>(1024);
LockEntry 对象中要害属性如下:
/**
* 锁定记录
*/
static class LockEntry {
/**
* 客户端编号
*/
private String clientId;
/**
* 最初锁定工夫
*/
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public String getClientId() {return clientId;}
public void setClientId(String clientId) {this.clientId = clientId;}
public long getLastUpdateTimestamp() {return lastUpdateTimestamp;}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {this.lastUpdateTimestamp = lastUpdateTimestamp;}
/**
* 是否锁定
*
* @param clientId 客户端编号
* @return 是否
*/
public boolean isLocked(final String clientId) {boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();}
/**
* 锁定是否过期
*
* @return 是否
*/
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
broker 端通过对 ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable 的保护来达到 messageQueue 加锁的目标,使得同一时刻,一个 messageQueue 只能被一个消费者生产。
【再次回到 Consumer 端,拿到锁后】
消费者对 messageQueue 的加锁曾经胜利,那么就进入到了第二个步骤,创立 pullRequest 进行音讯拉取,音讯拉取局部的代码实现在 PullMessageService 中,音讯拉取完后,须要提交到 ConsumeMessageService 中进行生产,程序生产的实现为 ConsumeMessageOrderlyService,提交音讯进行生产的办法为 ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
构建了一个 ConsumeRequest 对象,并提交给了 ThreadPoolExecutor 来并行生产,看下程序生产的 ConsumeRequest 的 run 办法实现:
public void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 取得 Consumer 音讯队列锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {// (播送模式) 或者 (集群模式 && Broker 音讯队列锁无效)
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();
// 循环
for (boolean continueConsume = true; continueConsume;) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 音讯队列分布式锁未锁定,提交提早取得锁并生产申请
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 音讯队列分布式锁曾经过期,提交提早取得锁并生产申请
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 以后周期生产工夫超过间断时长,默认:60s,提交提早生产申请。默认状况下,每生产 1 分钟劳动 10ms。long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 获取生产音讯。此处和并发音讯申请不同,并发音讯申请曾经带了生产哪些音讯。final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
// Hook:before
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
// 执行生产
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {this.processQueue.getLockConsume().lock(); // 锁定队列生产锁
if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
hasException = true;
} finally {this.processQueue.getLockConsume().unlock(); // 锁定队列生产锁}
if (null == status //
|| ConsumeOrderlyStatus.ROLLBACK == status//
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
}
// 解析生产后果状态
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}
// Hook:after
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 解决生产后果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {continueConsume = false;}
}
} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
获取到锁对象后,应用 synchronized 尝试申请线程级独占锁。
如果加锁胜利,同一时刻只有一个线程进行音讯生产。
如果加锁失败,会提早 100ms 从新尝试向 broker 端申请锁定 messageQueue,锁定胜利后从新提交生产申请
至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。
创立音讯拉取工作时,音讯客户端向 broker 端申请锁定 MessageQueue,使得一个 MessageQueue 同一个时刻只能被一个生产客户端生产。
音讯生产时,多线程针对同一个音讯队列的生产先尝试应用 synchronized 申请独占锁,加锁胜利能力进行生产,使得一个 MessageQueue 同一个时刻只能被一个生产客户端中一个线程生产。
【程序生产问题拆解】
- broke 上要保障一个队列只有一个过程生产,即一个队列同一时间只有一个 consumer 生产
- broker 给 consumer 的音讯程序应该保持一致,这个通过 rpc 传输,序列化后音讯程序不变,所以很容易实现
- consumer 上的队列音讯要保障同一个工夫只有一个线程生产
通过问题的拆分,问题变成同一个共享资源串行解决了,要解决这个问题,通常的做法都是拜访资源的时候加锁,即 broker 上一个队列音讯在被 consumer 拜访的必须加锁,单个 consumer 端多线程并发解决音讯的时候须要加锁;这里还须要思考 broker 锁的异常情况,如果一个 broke 队列上的音讯被 consumer 锁住了,万一 consumer 解体了,这个锁就开释不了,所以 broker 上的锁须要加上锁的过期工夫。
实际上 RocketMQ 生产端也就是照着下面的思路做:
RocketMQ 中程序音讯注意事项
- 理论我的项目中并不是所有状况都须要用到程序音讯,但这也是设计方案的时候容易疏忽的一点
- 程序音讯是生产者和消费者配合协调作用的后果,然而生产端保障程序生产,是保障不了程序音讯的
- 生产端并行形式生产,只设置一次拉取音讯的数量为 1(即配置参数 consumeBatchSize),是否能够实现程序生产?这里理论是不能的,并发生产在生产端有多个线程同时生产,consumeBatchSize 只是一个线程一次拉取音讯的数量,对程序生产没有意义,这里大家有趣味能够看 ConsumeMessageConcurrentlyService 的代码,并发生产的逻辑都在哪里。
在应用程序音讯时,肯定要留神其异常情况的呈现,对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),重试最大值是 Integer.MAX\_VALUE. 这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。
重要的事再强调一次:在应用程序音讯时,肯定要留神其异常情况的呈现!避免资源不开释!
小结
通过以上的理解,咱们晓得了实现程序音讯所必要的条件:程序发送、顺序存储、程序生产。RocketMQ 的设计中思考到了这些,咱们只须要简略的应用 API,不须要额定应用代码来束缚业务,使得实现程序音讯更加简略。