共计 5250 个字符,预计需要花费 14 分钟才能阅读完成。
RocketMQ 有两种生产模式,集群模式和播送模式。
集群模式是指 RocketMQ 中的一条音讯只能被同一个消费者组中的一个消费者生产。如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新音讯,别离被调配到了 MessageQueue1~MessageQueue3 这 3 个队列,而后 Group 中的三个 Consumer 别离生产了一条音讯:
播送模式是 RocketMQ 中的音讯会被生产组中的每个消费者都生产一次,如下图:
应用 RocketMQ 的播送模式时,须要在生产端进行定义,上面是一段官网示例:
public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
从代码中能够看到,在定义 Consumer 时,通过 messageModel 这个属性指定生产模式,这里指定为 BROADCASTING,也就启动了播送模式的消费者。
1 消费者启动
以 RocketMQ 推模式为例,看一下消费者调用关系类图:
DefaultMQPushConsumer 作为启动入口类,它的 start 办法调用了 DefaultMQPushConsumerImpl 类的 start 办法,上面重点看一下这个办法。
1.1 拷贝订阅关系
start 办法中调用了 copySubscription 办法,代码如下:
private void copySubscription() throws MQClientException {
try {
// 拷贝订阅关系
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {throw new MQClientException("subscription exception", e);
}
}
这里的代码有一点须要留神:集群模式会创立一个重试 Topic 的订阅关系,而播送模式是不会创立这个订阅关系的。也就是说播送模式不思考重试。
1.2 初始化偏移量
上面是初始化 offset 的代码:
if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
从下面的代码能够看到,播送模式应用了 LocalFileOffsetStore,也就是说偏移量保留在客户端本地,除了在内存中会保留,在本地文件中也会保留。
2 音讯拉取
ConsumeMessageService 是真正拉取音讯的中央,消费者初始化时会初始化 ConsumeMessageService,并且这里会辨别并发音讯还是程序音讯。
2.1 程序音讯
在集群模式下,须要获取到 processQueue 的锁才会拉取音讯,而在播送模式下,不必获取锁,间接就能够拉取音讯。判断逻辑如下:
//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {}}
这里有个疑难,对于程序音讯,获取锁是必须的,这样能力保障一个 processQueue 只能由一个线程进行解决,从而保障生产的程序性。那对于播送模式,为什么不必获取 processQueue 的 锁呢?难道播送模式不反对程序音讯?
2.2 并发音讯
对于并发音讯,播送模式不同的是,对生产后果的解决。集群模式生产失败后须要把音讯发送回 Broker 期待再次被拉取,而播送模式则不须要重试。代码如下:
//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
这再次阐明,播送模式是不反对音讯重试的。
3 重均衡
在消费者启动过程中,会调用 RebalanceService 的 start 办法,进行重均衡。从重均衡的代码中能够看到,播送模式消费者会生产所有 MessageQueue,而集群模式下会依据负载平衡策略抉择其中几个 MessageQueue。代码如下:
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {
case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
// 省略局部逻辑
} else { }
break;
}
case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// 省略局部逻辑
if (mqSet != null && cidAll != null) {
// 省略局部逻辑
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {return;}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// 省略局部逻辑
}
break;
}
default:
break;
}
}
下面 updateProcessQueueTableInRebalance 这个办法调用前,要获取到须要生产的 MessageQueue 汇合。播送模式下,间接取了订阅的 Topic 下的所有汇合元素,而集群模式下,则须要通过负责平衡获取以后消费者本人要生产的 MessageQueue 汇合。
4 总结
本文次要解说了 RocketMQ 播送音讯的实现机制,了解播送音讯,要把握上面几点:
1. 偏移量保留在消费者本地内存和文件中;
2. 播送音讯不反对重试;
3. 从源码上看,播送模式并不能反对程序音讯;
4. 播送模式消费者订阅了 Topic 下的所有 MessageQueue,不会重均衡。