咱们晓得,kafka 如果要保障程序生产,必须保障音讯保留到同一个patition上,而且为了有序性,只能有一个消费者进行生产。这种状况下,Kafka 就进化成了繁多队列,毫无并发性可言,极大升高零碎性能。那么对于对业务比拟敌对的RocketMQ 是如何实现的呢?首先,咱们循序渐进的来理解下程序音讯的实现。
程序音讯业务应用场景
1、电商场景中传递订单状态。
2、同步mysql 的binlong 日志,数据库的操作是有程序的。
3、其余音讯之间有先后的依赖关系,后一条音讯须要依赖于前一条音讯的处理结果的状况。
等等。。。
消息中间件中的程序音讯
程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。
程序音讯蕴含两种类型:
分区程序:一个Partition(queue)内所有的音讯依照先进先出的程序进行公布和生产
全局程序:一个Topic内所有的音讯依照先进先出的程序进行公布和生产.然而全局程序极大的升高了零碎的吞吐量,不合乎mq的设计初衷。
那么折中的方法就是抉择分区程序。
【部分程序生产】
如何保障程序
在MQ的模型中,程序须要由3个阶段去保障:
- 音讯被发送时放弃程序
- 音讯被存储时放弃和发送的程序统一
- 音讯被生产时放弃和存储的程序统一
发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯A和B,存储时在空间上A肯定在B之前。而生产放弃和存储统一则要求音讯A、B达到Consumer之后必须依照先A后B的程序被解决。
第一点,音讯程序发送,多线程发送的音讯无奈保障有序性,因而,须要业务方在发送时,针对同一个业务编号(如同一笔订单)的音讯须要保障在一个线程内程序发送,在上一个音讯发送胜利后,在进行下一个音讯的发送。对应到mq中,音讯发送办法就得应用同步发送,异步发送无奈保障程序性。
第二点,音讯顺序存储,mq的topic下会存在多个queue,要保障音讯的顺序存储,同一个业务编号的音讯须要被发送到一个queue中。对应到mq中,须要应用MessageQueueSelector来抉择要发送的queue,即对业务编号进行hash,而后依据队列数量对hash值取余,将音讯发送到一个queue中。
第三点,音讯程序生产,要保障音讯程序生产,同一个queue就只能被一个消费者所生产,因而对broker中生产队列加锁是无奈防止的。同一时刻,一个生产队列只能被一个消费者生产,消费者外部,也只能有一个生产线程来生产该队列。即,同一时刻,一个生产队列只能被一个消费者中的一个线程生产。
RocketMQ中程序的实现
【Producer端】
Producer端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的抉择。
/**
* 音讯队列选择器
*/
public interface MessageQueueSelector {
/**
* 抉择音讯队列
*
* @param mqs 音讯队列
* @param msg 音讯
* @param arg 参数
* @return 音讯队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
- List<MessageQueue> mqs:音讯要发送的Topic下所有的分区
- Message msg:音讯对象
- 额定的参数:用户能够传递本人的参数
比方如下实现就能够保障雷同的订单的音讯被路由到雷同的分区:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
【Consumer端】
尝试锁定锁定MessageQueue。
首先咱们如何保障一个队列只被一个消费者生产?
生产队列存在于broker端,如果想保障一个队列被一个消费者生产,那么消费者在进行音讯拉勾销费时就必须向mq服务器申请队列锁,消费者申请队列锁的代码存在于RebalanceService音讯队列负载的实现代码中。
消费者从新负载,并且调配完生产队列后,须要向mq服务器发动音讯拉取申请,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance中,针对程序音讯的音讯拉取,mq做了如下判断:
// 减少 不在processQueueTable && 存在于mqSet 里的音讯队列。
List<PullRequest> pullRequestList = new ArrayList<>(); // 拉音讯申请数组
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) { // 程序音讯锁定音讯队列
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 发动音讯拉取申请
this.dispatchPullRequest(pullRequestList);
核心思想就是,生产客户端先向broker端发动对messageQueue的加锁申请,只有加锁胜利时才创立pullRequest进行音讯拉取,上面看下lock加锁申请办法:
/**
* 申请Broker取得指定音讯队列的分布式锁
*
* @param mq 队列
* @return 是否胜利
*/
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
// 申请Broker取得指定音讯队列的分布式锁
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 设置音讯解决队列锁定胜利。锁定音讯队列胜利,可能本地没有音讯解决队列,设置锁定胜利会在lockAll()办法。
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
代码实现逻辑比拟清晰,就是调用lockBatchMQ办法发送了一个加锁申请,那么broker端收到加锁申请后的解决逻辑又是怎么样?
【broker端实现】
broker端收到加锁申请的解决逻辑在RebalanceLockManager#tryLockBatch办法中,RebalanceLockManager中要害属性如下:
/**
* 音讯队列锁过期工夫,默认60s
*/
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
/**
* 锁
*/
private final Lock lock = new ReentrantLock();
/**
* 生产分组的音讯队列锁映射
*/
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<>(1024);
LockEntry对象中要害属性如下:
/**
* 锁定记录
*/
static class LockEntry {
/**
* 客户端编号
*/
private String clientId;
/**
* 最初锁定工夫
*/
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
/**
* 是否锁定
*
* @param clientId 客户端编号
* @return 是否
*/
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
/**
* 锁定是否过期
*
* @return 是否
*/
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
broker端通过对ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable的保护来达到messageQueue加锁的目标,使得同一时刻,一个messageQueue只能被一个消费者生产。
【再次回到Consumer端,拿到锁后】
消费者对messageQueue的加锁曾经胜利,那么就进入到了第二个步骤,创立pullRequest进行音讯拉取,音讯拉取局部的代码实现在PullMessageService中,音讯拉取完后,须要提交到ConsumeMessageService中进行生产,程序生产的实现为ConsumeMessageOrderlyService,提交音讯进行生产的办法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行生产,看下程序生产的ConsumeRequest的run办法实现:
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 取得 Consumer 音讯队列锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// (播送模式) 或者 (集群模式 && Broker音讯队列锁无效)
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
// 循环
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 音讯队列分布式锁未锁定,提交提早取得锁并生产申请
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 音讯队列分布式锁曾经过期,提交提早取得锁并生产申请
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 以后周期生产工夫超过间断时长,默认:60s,提交提早生产申请。默认状况下,每生产1分钟劳动10ms。
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 获取生产音讯。此处和并发音讯申请不同,并发音讯申请曾经带了生产哪些音讯。
final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
// Hook:before
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
// 执行生产
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
this.processQueue.getLockConsume().lock(); // 锁定队列生产锁
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock(); // 锁定队列生产锁
}
if (null == status //
|| ConsumeOrderlyStatus.ROLLBACK == status//
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
}
// 解析生产后果状态
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// Hook:after
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 解决生产后果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
获取到锁对象后,应用synchronized尝试申请线程级独占锁。
如果加锁胜利,同一时刻只有一个线程进行音讯生产。
如果加锁失败,会提早100ms从新尝试向broker端申请锁定messageQueue,锁定胜利后从新提交生产申请
至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。
创立音讯拉取工作时,音讯客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个生产客户端生产。
音讯生产时,多线程针对同一个音讯队列的生产先尝试应用synchronized申请独占锁,加锁胜利能力进行生产,使得一个MessageQueue同一个时刻只能被一个生产客户端中一个线程生产。
【程序生产问题拆解】
- broke 上要保障一个队列只有一个过程生产,即一个队列同一时间只有一个consumer 生产
- broker 给consumer 的音讯程序应该保持一致,这个通过 rpc传输,序列化后音讯程序不变,所以很容易实现
- consumer 上的队列音讯要保障同一个工夫只有一个线程生产
通过问题的拆分,问题变成同一个共享资源串行解决了,要解决这个问题,通常的做法都是拜访资源的时候加锁,即broker 上一个队列音讯在被consumer 拜访的必须加锁,单个consumer 端多线程并发解决音讯的时候须要加锁;这里还须要思考broker 锁的异常情况,如果一个broke 队列上的音讯被consumer 锁住了,万一consumer 解体了,这个锁就开释不了,所以broker 上的锁须要加上锁的过期工夫。
实际上 RocketMQ 生产端也就是照着下面的思路做:
RocketMQ中程序音讯注意事项
- 理论我的项目中并不是所有状况都须要用到程序音讯,但这也是设计方案的时候容易疏忽的一点
- 程序音讯是生产者和消费者配合协调作用的后果,然而生产端保障程序生产,是保障不了程序音讯的
- 生产端并行形式生产,只设置一次拉取音讯的数量为 1(即配置参数 consumeBatchSize ),是否能够实现程序生产 ?这里理论是不能的,并发生产在生产端有多个线程同时生产,consumeBatchSize 只是一个线程一次拉取音讯的数量,对程序生产没有意义,这里大家有趣味能够看 ConsumeMessageConcurrentlyService 的代码,并发生产的逻辑都在哪里。
在应用程序音讯时,肯定要留神其异常情况的呈现,对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX\_VALUE.这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。
重要的事再强调一次:在应用程序音讯时,肯定要留神其异常情况的呈现!避免资源不开释!
小结
通过以上的理解,咱们晓得了实现程序音讯所必要的条件:程序发送、顺序存储、程序生产。RocketMQ的设计中思考到了这些,咱们只须要简略的应用API,不须要额定应用代码来束缚业务,使得实现程序音讯更加简略。
发表回复