通过上一篇咱们曾经晓得只有先从ConsumerQueue里获取到了CommitLog物理偏移量后才能够疾速的从CommintLog里找到对应的message。那ConsumerQueue里的信息又是如何定位的呢?这里就波及到咱们明天的主题:offset治理。
在RocketMQ中,音讯生产实现后须要将offset存储下来,offset用来治理每个生产队列的不同生产组的生产进度,依据生产模式的不同又有所差别:
- 在播送模式下,因为每条音讯会被生产组内所有的消费者生产,同生产组的消费者互相独立,生产进度要独自存储,会以文本文件的模式存储在客户端,对应的数据结构为LocalFileOffsetStore
- 在集群模式下,同一条音讯只会被同一个生产组生产一次,生产进度会参加到负载平衡中,故生产进度是须要共享的,另外,消费者产生异样或重启为了保障能够从上一次生产的中央持续进行生产,这时的offset是对立保留到broker服务端的。对应的数据结构为RemoteBrokerOffsetStore。
理论应用RocketMQ大多是集群模式,这里也只针对集群模式下的offset治理形容。
一,Broker服务端
offset存储与加载
rocketMQ的broker端中,offset的是以json的模式长久化到磁盘文件中,文件门路为${user.home}/store/config/consumerOffset.json。其内容示例如下:
{ "offsetTable": { "test-topic@test-group": { "0": 88526, "1": 88528 } }}
broker端启动后,会调用BrokerController.initialize()办法,办法中会对offset进行加载,加载办法consumerOffsetManager.load()。获取文件内容后,最初会将其序列化为一个ConsumerOffsetManager对象,这个对象中要害的一个属性是
ConcurrentMap<String,ConcurrentMap<Integer, Long>> offsetTable
offsetTable的,key的模式为topic@group(每个topic下不同生产组的生产进度),value也是一个ConcurrentMap,key为queueId,value为生产位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就能够对各个topic下不同生产组的生产位移进行获取与治理。
commitLog与offset
producer发送音讯到broker之后,会将音讯具体内容长久化到commitLog文件中,再散发到topic下的生产队列consume Queue,消费者提交生产申请时,broker从该consumer负责的生产队列中依据申请参数起始offset获取待生产的音讯索引信息,再从commitLog中获取具体的音讯内容返回给consumer。在这个过程中,consumer提交的offset为本次申请的起始生产地位,即beginOffset;consume Queue中的offset定位了commitLog中具体音讯的地位。
nextBeginOffset
对于consumer的生产申请解决(PullMessageProcessor.processRequest()),除了待生产的音讯内容,broker在responseHeader(PullMessageResponseHeader)附带上以后生产队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset,也就是上图里的consumerOffset)。
- minOffset、maxOffset是以后生产队列consumeQueue记录的最小及最大的offset信息。
- nextBeginOffset是consumer下次拉取音讯的offset信息,即consumer对该consumeQueue的生产进度。
其中nextBeginOffset是consumer在下一轮音讯拉取时offset的重要依据,无论当次拉取的音讯生产是否失常,nextBeginOffset都不会回滚,这是因为rocketMQ对生产异样的音讯的解决是将音讯从新发回broker端的重试队列(会为每个topic创立一个重试队列,以%RERTY%结尾),达到重试工夫后将音讯投递到重试队列中进行生产重试。对生产异样的解决不是通过offset回滚,这使得客户端简化了offset的治理。
二,Consumer客户端
offset初始化
consumer启动过程中(Consumer主函数默认调用DefaultMQPushConsumer.start()办法)依据MessageModel(播送与集群模式)抉择对应的offsetStore,而后调用offsetStore.load()对offset进行加载,LocalFileOffsetStore是对本地文件的加载,而RemotebrokerOffsetStore是没有本地文件的,因而load()办法没有实现。在rebalance实现对messageQueue的调配之后会对messageQueue对应的生产地位offset进行更新。
/** RebalanceImpl *//**doRebalance() -> rebalanceByTopic() -> updateProcessQueueTableInRebalance() -> computePullFromWhere()*/private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { // (省略局部代码)负载平衡获取以后consumer负责的音讯队列后对processQueue进行筛选,删除processQueue不必要的messageQueue // 获取topic下consumer音讯拉取列表,List<PullRequest> List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // 删除messageQueue旧的offset信息 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 获取nextOffset,即更新以后messageQueue对应申请的offset long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } }
Push模式下,computePullFromWhere()办法的实现类为RebalancePushImpl.class。依据配置信息consumeFromWhere进行不同的操作。ConsumeFromWhere 这个参数的含意是,首次启动从何处开始生产。更精确的表述是,如果查问不到音讯生产进度时,从什么中央开始生产。
ConsumeFromWhere的类型枚举如下,其中有三个曾经被标记为Deprecated(本文基于4.5.0)
public enum ConsumeFromWhere { CONSUME_FROM_LAST_OFFSET, @Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @Deprecated CONSUME_FROM_MIN_OFFSET, @Deprecated CONSUME_FROM_MAX_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP,}
- CONSUME_FROM_LAST_OFFSET 从最新的offset开始生产。
获取consumer对以后音讯队列messageQueue的生产进度lastOffset,如果lastOffset>=0,从lastOffset开始生产;如果lastOffset小于0阐明是first start,没有offset信息,topic为重试topic时从0开始生产,否则申请获取该音讯队列对应的生产队列consumeQueue的最大offset(maxOffset),从maxOffset开始生产 - CONSUME_FROM_FIRST_OFFSET 从第一个offset开始生产。
获取consumer对以后音讯队列messageQueue的生产进度lastOffset,如果lastOffset>=0,从lastOffset开始生产;否则从0开始生产。 - CONSUME_FROM_TIMESTAMP 依据工夫戳申请查找offset。
获取consumer对以后音讯队列messageQueue的生产进度lastOffset,如果lastOffset>=0,从lastOffset开始生产;
当lastOffset<0,如果为重试topic,获取consumeQueue的最大offset;否则获取ConsumeTimestamp(consumer启动工夫),依据工夫戳申请查找offset。
上述三种生产地位的设置流程有一个共同点,都申请获取consumer对以后音讯队列messageQueue的生产进度lastOffset,如果lastOffset不小于0,则从lastOffset开始生产。这也是有时候设置了CONSUME_FROM_FIRST_OFFSET却不是从0开始从新生产的起因,rocketMQ缩小了因为配置起因造成的反复生产。
至于具体应用哪一种形式须要依据本人的业务场景来定。咱们能够思考上面的几种情景:
- 如果一个消费者启动运行了一段时间,因为版本公布等起因须要先停掉消费者,代码更新后,再启动消费者时消费者还能应用下面这三种策略,从新的一条音讯生产吗?如果是这样,在发版期间新发送的音讯将全副失落,这显然是不可承受的,要从上一次开始生产的时候生产,能力保障音讯不失落。
如果对于一个设置为 CONSUME_FROM_FIRST_OFFSET 的运行好久的消费者,以后版本的业务逻辑进行了重大重构,而且业务心愿是从最新的音讯开始生产。如果应用了上面的设置是不会胜利的。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
(第2点有点不明确)
能够应用 RocketMQ 提供的重置位点,其命令如下:sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g CID_CONSUMER_TEST -t TopicTest -s now
或者在控制台重置位点。
offset提交更新
consumer从broker拉取音讯后,会将音讯的扩大信息MessageExt寄存到ProcessQueue的属性TreeMap<Long, MessageExt> msgTreeMap中,key值为音讯对应的queueOffset,value为扩大信息(包含queueID等)。并发生产模式下(Concurrently),获取的待生产音讯会分批提交给生产线程进行生产,默认批次为1,即每个生产线程生产一条音讯。生产实现后调用ConsumerMessageConcurrentlyService.processConsumeResult()办法对后果进行解决:生产胜利确认ack,生产失败发回broker进行重试。之后便是对offset的更新操作。
首先是调用ProcessQueue.removeMessage()办法,将曾经生产实现的音讯从msgTreeMap中依据queueOffset移除,而后判断以后msgTreeMap是否为空,不为空则返回以后msgTreeMap第一个元素,即offset最小的元素,否则返回-1。
如果removeMessage()返回的offset大于0,则更新到offsetTable中。offsetTable的构造为ConcurrentMap<MessageQueue, AtomicLong> offsetTable,是一个线程平安的Map,key为MessageQueue,value为AtomicLong对象,值为offset,记录以后messageQueue的生产位移。
/** ConsumeMessageConcurrentlyService.class */public void processConsumeResult( final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) { // .... (省略局部代码)依据生产后果判断是否须要发回broker重试 // 在msgTreeMap中删除msg,标记以后音讯已被生产,msgTreeMap不为空返回以后msgTreeMap中最小的offset long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); // 更新offsetTable中的生产位移,offsetTable记录每个messageQueue的生产进度 // updateOffset()的最初一个参数increaseOnly为true,示意枯燥减少,新值要大于旧值 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}/** ProcessQueue.class */public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; // // 从msgTreeMap中删除该批次的msg for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); // 删除后以后msgTreeMap不为空,返回第一个元素,即最小的offset if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; }/** RemoteBrokerOffsetStore */public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { // offsetTable中不存在mq对应的记录 // putIfAbsent 如果传入key对应的value已存在,则返回存在的value,不替换;如果不存在,则新增,返回null offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } // offsetTable存在记录,替换,这里increaseOnly为true,offsetOld<offset才替换 if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } }
到这里一条音讯的生产流程曾经完结,offset更新到了本地缓存offsetTable,而将offset上传到broker是由定时工作执行的。MQClientInstance.start()会启动客户端相干的定时工作,包含NameService通信、offset提交等。
/** MQClientInstance.startScheduledTask() */this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 提交offset至broker MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- LocalFileOffsetStore模式下,将offset信息转化成json保留到本地文件中;
- RemoteBrokerOffsetStore则offsetTable将须要提交的MessageQueue的offset信息通过MQClientAPIImpl提供的接口updateConsumerOffsetOneway()提交到broker进行长久化存储。
当利用失常敞开时,consumer的shutdown()办法会被动触发一次长久化offset到broker的操作。
client对offset的更新是在音讯生产实现后将offset更新到offsetTable,再由定时工作进行长久化。这个过程有须要留神的中央:
- 因为是先生产再更新offset,因而存在生产实现后更新offset失败,但这种状况呈现的概率比拟低,更新offset只是写到缓存中,是一个简略的内存操作,出错的可能性较低。
- 因为offset先存到内存中,再由定时工作每隔10s提交一次,存在失落的危险,比方以后client宕机等,从而导致更新后的offset没有提交到broker,再次负载时会反复生产。因而consumer的生产业务逻辑须要保障幂等性。
并发生产时offset的更新
问题:consumer从broker拉取的待生产音讯时批量的(默认状况下pullBatchSize=32),并发生产时,offset的更新不是按大小程序的,比方拉取音讯m1到m10,m1可能是最初生产实现的,那提交的offset的正确性如何保障?m10 offset的更新不会导致m1会误认为已生产实现。
上一大节提到生产实现后,会将线程生产的批次音讯从msgTreeMap中删除,并返回以后msgTreeMap的第一个元素,也就是拉取批次最小的offset,offsetTable更新的offset始终会是拉取批次中未生产的最小的offset值。也就是m1未生产实现,m10生产实现的状况下,更新到offsetTable的以后messageQueue的生产进度为m1对应的offset值。
因而,offsetTable中寄存的可能不是messageQueue真正生产的offset的最大值,然而consumer拉取音讯时应用的是上一次拉取申请返回的nextBeginOffset,并不是根据offsetTable,失常状况下不会反复拉取数据。当产生宕机等异样时,与offsetTable未提交宕机异样一样,须要通过业务流程来保障幂等性。业务流程的幂等性是rocketMQ始终强调的。
本文转载于rocketMQ -- offset治理
也参考了:DefaultMQPushConsumer 应用示例与注意事项
通过这三个文件彻底搞懂rocketmq的存储原理