乐趣区

关于rocketmq:RocketMQ学习十一offset管理

通过上一篇咱们曾经晓得只有先从 ConsumerQueue 里获取到了 CommitLog 物理偏移量后才能够疾速的从 CommintLog 里找到对应的 message。那 ConsumerQueue 里的信息又是如何定位的呢?这里就波及到咱们明天的主题:offset 治理
在 RocketMQ 中,音讯生产实现后须要将 offset 存储下来,offset 用来治理每个生产队列的不同生产组的生产进度,依据生产模式的不同又有所差别:

  • 在播送模式下,因为每条音讯会被生产组内所有的消费者生产,同生产组的消费者互相独立,生产进度要独自存储,会以文本文件的模式存储在客户端,对应的数据结构为 LocalFileOffsetStore
  • 在集群模式下,同一条音讯只会被同一个生产组生产一次,生产进度会参加到负载平衡中,故生产进度是须要共享的,另外,消费者产生异样或重启为了保障能够从上一次生产的中央持续进行生产,这时的 offset 是对立保留到 broker 服务端的。对应的数据结构为 RemoteBrokerOffsetStore。

理论应用 RocketMQ 大多是集群模式,这里也只针对集群模式下的 offset 治理形容。

一,Broker 服务端

offset 存储与加载

rocketMQ 的 broker 端中,offset 的是以 json 的模式长久化到磁盘文件中,文件门路为 ${user.home}/store/config/consumerOffset.json。其内容示例如下:

{
    "offsetTable": {
        "test-topic@test-group": {
            "0": 88526, 
            "1": 88528
        }
    }
}

broker 端启动后,会调用 BrokerController.initialize()办法,办法中会对 offset 进行加载,加载办法 consumerOffsetManager.load()。获取文件内容后,最初会将其序列化为一个 ConsumerOffsetManager 对象,这个对象中要害的一个属性是

ConcurrentMap<String,ConcurrentMap<Integer, Long>> offsetTable

offsetTable 的,key 的模式为 topic@group(每个 topic 下不同生产组的生产进度),value 也是一个 ConcurrentMap,key 为 queueId,value 为生产位移(这里不是 offset 而是位移)。通过对全局 ConsumerOffsetManager 对象就能够对各个 topic 下不同生产组的生产位移进行获取与治理。

commitLog 与 offset

producer 发送音讯到 broker 之后,会将音讯具体内容长久化到 commitLog 文件中,再散发到 topic 下的生产队列 consume Queue,消费者提交生产申请时,broker 从该 consumer 负责的生产队列中依据申请参数起始 offset 获取待生产的音讯索引信息,再从 commitLog 中获取具体的音讯内容返回给 consumer。在这个过程中,consumer 提交的 offset 为本次申请的起始生产地位,即 beginOffset;consume Queue 中的 offset 定位了 commitLog 中具体音讯的地位。

nextBeginOffset

对于 consumer 的生产申请解决(PullMessageProcessor.processRequest()),除了待生产的音讯内容,broker 在 responseHeader(PullMessageResponseHeader)附带上以后生产队列的最小 offset(minOffset)、最大 offset(maxOffset)、及下次拉取的起始 offset(nextBeginOffset,也就是上图里的 consumerOffset)。

  • minOffset、maxOffset 是以后生产队列 consumeQueue 记录的最小及最大的 offset 信息。
  • nextBeginOffset 是 consumer 下次拉取音讯的 offset 信息,即 consumer 对该 consumeQueue 的生产进度。

其中 nextBeginOffset 是 consumer 在下一轮音讯拉取时 offset 的重要依据,无论当次拉取的音讯生产是否失常,nextBeginOffset 都不会回滚,这是因为 rocketMQ 对生产异样的音讯的解决是将音讯从新发回 broker 端的重试队列(会为每个 topic 创立一个重试队列,以 %RERTY% 结尾),达到重试工夫后将音讯投递到重试队列中进行生产重试。对生产异样的解决不是通过 offset 回滚,这使得客户端简化了 offset 的治理。

二,Consumer 客户端

offset 初始化

consumer 启动过程中(Consumer 主函数默认调用 DefaultMQPushConsumer.start()办法)依据 MessageModel(播送与集群模式)抉择对应的 offsetStore,而后调用 offsetStore.load()对 offset 进行加载,LocalFileOffsetStore 是对本地文件的加载,而 RemotebrokerOffsetStore 是没有本地文件的,因而 load()办法没有实现。在 rebalance 实现对 messageQueue 的调配之后会对 messageQueue 对应的生产地位 offset 进行更新。

/** RebalanceImpl */
/**
doRebalance() -> rebalanceByTopic() -> updateProcessQueueTableInRebalance() 
-> computePullFromWhere()
*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {// (省略局部代码)负载平衡获取以后 consumer 负责的音讯队列后对 processQueue 进行筛选,删除 processQueue 不必要的 messageQueue
    
    // 获取 topic 下 consumer 音讯拉取列表,List<PullRequest>
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                // 删除 messageQueue 旧的 offset 信息
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                // 获取 nextOffset,即更新以后 messageQueue 对应申请的 offset
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
    }
    
}

Push 模式下,computePullFromWhere()办法的实现类为 RebalancePushImpl.class。依据配置信息 consumeFromWhere 进行不同的操作。ConsumeFromWhere 这个参数的含意是,首次启动从何处开始生产。更精确的表述是,如果查问不到音讯生产进度时,从什么中央开始生产。

ConsumeFromWhere 的类型枚举如下,其中有三个曾经被标记为 Deprecated(本文基于 4.5.0)

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}
  • CONSUME_FROM_LAST_OFFSET 从最新的 offset 开始生产。
    获取 consumer 对以后音讯队列 messageQueue 的生产进度 lastOffset,如果 lastOffset>=0,从 lastOffset 开始生产;如果 lastOffset 小于 0 阐明是 first start,没有 offset 信息,topic 为重试 topic 时从 0 开始生产,否则申请获取该音讯队列对应的生产队列 consumeQueue 的最大 offset(maxOffset),从 maxOffset 开始生产
  • CONSUME_FROM_FIRST_OFFSET 从第一个 offset 开始生产。
    获取 consumer 对以后音讯队列 messageQueue 的生产进度 lastOffset,如果 lastOffset>=0,从 lastOffset 开始生产;否则从 0 开始生产。
  • CONSUME_FROM_TIMESTAMP 依据工夫戳申请查找 offset。
    获取 consumer 对以后音讯队列 messageQueue 的生产进度 lastOffset,如果 lastOffset>=0,从 lastOffset 开始生产;
    当 lastOffset<0,如果为重试 topic,获取 consumeQueue 的最大 offset;否则获取 ConsumeTimestamp(consumer 启动工夫),依据工夫戳申请查找 offset。

上述三种生产地位的设置流程有一个共同点,都申请获取 consumer 对以后音讯队列 messageQueue 的生产进度 lastOffset,如果 lastOffset 不小于 0,则从 lastOffset 开始生产。这也是有时候设置了 CONSUME_FROM_FIRST_OFFSET 却不是从 0 开始从新生产的起因,rocketMQ 缩小了因为配置起因造成的反复生产。

至于具体应用哪一种形式须要依据本人的业务场景来定。咱们能够思考上面的几种情景:

  1. 如果一个消费者启动运行了一段时间,因为版本公布等起因须要先停掉消费者,代码更新后,再启动消费者时消费者还能应用下面这三种策略,从新的一条音讯生产吗?如果是这样,在发版期间新发送的音讯将全副失落,这显然是不可承受的,要从上一次开始生产的时候生产,能力保障音讯不失落。
  2. 如果对于一个设置为 CONSUME_FROM_FIRST_OFFSET 的运行好久的消费者,以后版本的业务逻辑进行了重大重构,而且业务心愿是从最新的音讯开始生产。如果应用了上面的设置是不会胜利的。

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    

    (第 2 点有点不明确)
    能够应用 RocketMQ 提供的重置位点,其命令如下:

    sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876  -g CID_CONSUMER_TEST -t TopicTest -s now
    

    或者在控制台重置位点。

offset 提交更新

consumer 从 broker 拉取音讯后,会将音讯的扩大信息 MessageExt 寄存到 ProcessQueue 的属性 TreeMap<Long, MessageExt> msgTreeMap 中,key 值为音讯对应的 queueOffset,value 为扩大信息(包含 queueID 等)。并发生产模式下(Concurrently),获取的待生产音讯会分批提交给生产线程进行生产,默认批次为 1,即每个生产线程生产一条音讯。生产实现后调用 ConsumerMessageConcurrentlyService.processConsumeResult()办法对后果进行解决:生产胜利确认 ack,生产失败发回 broker 进行重试。之后便是对 offset 的更新操作。
首先是调用 ProcessQueue.removeMessage()办法,将曾经生产实现的音讯从 msgTreeMap 中依据 queueOffset 移除,而后判断以后 msgTreeMap 是否为空,不为空则返回以后 msgTreeMap 第一个元素,即 offset 最小的元素,否则返回 -1。
如果 removeMessage()返回的 offset 大于 0,则更新到 offsetTable 中。offsetTable 的构造为 ConcurrentMap<MessageQueue, AtomicLong> offsetTable,是一个线程平安的 Map,key 为 MessageQueue,value 为 AtomicLong 对象,值为 offset,记录以后 messageQueue 的生产位移。

/** ConsumeMessageConcurrentlyService.class */
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {
    // ....(省略局部代码)依据生产后果判断是否须要发回 broker 重试
    
    // 在 msgTreeMap 中删除 msg,标记以后音讯已被生产,msgTreeMap 不为空返回以后 msgTreeMap 中最小的 offset
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    
    // 更新 offsetTable 中的生产位移,offsetTable 记录每个 messageQueue 的生产进度
    // updateOffset()的最初一个参数 increaseOnly 为 true,示意枯燥减少,新值要大于旧值
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
     }
}

/** ProcessQueue.class */
public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    // // 从 msgTreeMap 中删除该批次的 msg
                    for (MessageExt msg : msgs) {MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    // 删除后以后 msgTreeMap 不为空,返回第一个元素,即最小的 offset
                    if (!msgTreeMap.isEmpty()) {result = msgTreeMap.firstKey();
                    }
                }
            } finally {this.lockTreeMap.writeLock().unlock();}
        } catch (Throwable t) {log.error("removeMessage exception", t);
        }

        return result;
    }

/** RemoteBrokerOffsetStore */
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                // offsetTable 中不存在 mq 对应的记录
                // putIfAbsent 如果传入 key 对应的 value 已存在,则返回存在的 value,不替换;如果不存在,则新增,返回 null
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            // offsetTable 存在记录,替换,这里 increaseOnly 为 true,offsetOld<offset 才替换
            if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {offsetOld.set(offset);
                }
            }
        }
    }

到这里一条音讯的生产流程曾经完结,offset 更新到了本地缓存 offsetTable,而将 offset 上传到 broker 是由定时工作执行的。MQClientInstance.start()会启动客户端相干的定时工作,包含 NameService 通信、offset 提交等。

/** MQClientInstance.startScheduledTask() */
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 提交 offset 至 broker
                    MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  • LocalFileOffsetStore 模式下,将 offset 信息转化成 json 保留到本地文件中;
  • RemoteBrokerOffsetStore 则 offsetTable 将须要提交的 MessageQueue 的 offset 信息通过 MQClientAPIImpl 提供的接口 updateConsumerOffsetOneway()提交到 broker 进行长久化存储。

当利用失常敞开时,consumer 的 shutdown()办法会被动触发一次长久化 offset 到 broker 的操作。

client 对 offset 的更新是在音讯生产实现后将 offset 更新到 offsetTable,再由定时工作进行长久化。这个过程有须要留神的中央:

  1. 因为是先生产再更新 offset,因而存在生产实现后更新 offset 失败,但这种状况呈现的概率比拟低,更新 offset 只是写到缓存中,是一个简略的内存操作,出错的可能性较低。
  2. 因为 offset 先存到内存中,再由定时工作每隔 10s 提交一次,存在失落的危险,比方以后 client 宕机等,从而导致更新后的 offset 没有提交到 broker,再次负载时会反复生产。因而 consumer 的生产业务逻辑须要保障幂等性。

并发生产时 offset 的更新

问题:consumer 从 broker 拉取的待生产音讯时批量的(默认状况下 pullBatchSize=32),并发生产时,offset 的更新不是按大小程序的,比方拉取音讯 m1 到 m10,m1 可能是最初生产实现的,那提交的 offset 的正确性如何保障?m10 offset 的更新不会导致 m1 会误认为已生产实现。
上一大节提到生产实现后,会将线程生产的批次音讯从 msgTreeMap 中删除,并返回以后 msgTreeMap 的第一个元素,也就是拉取批次最小的 offset,offsetTable 更新的 offset 始终会是拉取批次中未生产的最小的 offset 值。也就是 m1 未生产实现,m10 生产实现的状况下,更新到 offsetTable 的以后 messageQueue 的生产进度为 m1 对应的 offset 值。

因而,offsetTable 中寄存的可能不是 messageQueue 真正生产的 offset 的最大值,然而 consumer 拉取音讯时应用的是上一次拉取申请返回的 nextBeginOffset,并不是根据 offsetTable,失常状况下不会反复拉取数据。当产生宕机等异样时,与 offsetTable 未提交宕机异样一样,须要通过业务流程来保障幂等性。业务流程的幂等性是 rocketMQ 始终强调的。

本文转载于 rocketMQ — offset 治理
也参考了:DefaultMQPushConsumer 应用示例与注意事项
通过这三个文件彻底搞懂 rocketmq 的存储原理

退出移动版