乐趣区

关于go:Kafka-log和consumer-offset过期测试

consumer 生产

consumer 本地会记录两种 offset:
• 生产拉取的 offset, 初始化和 rebalance 时会 fetchInitialOffset 从 broker 获取上次生产的 offset 而后从指定的 offset 拉取音讯。接下来 Consumer 会一直通过 fetchNewMessages() 到 broker 从指定 offset 拉取音讯。
• 生产提交的 offset,生产后主动 / 手动提交曾经生产的 offset

讲下提交步骤:
1. 本地先标记 offset, MarkOffset:

  • 留神此时只是本地标记曾经生产的, 并没有真正提交到 broker

2. 再执行CommitOffse t 提交到 broker, 而 CommitOffset 有上面两种形式:

  • 主动提交(默认都是这种),Consumer.Offsets.AutoCommit.Enable(默认关上)

    两种状况触发 CommitOffset 主动提交:
    1. 定时提交,与 Consumer.Offsets.AutoCommit.Interval 相干
    2. 进行 consumer sessions 中断生产时退出的时候会触发提交

  • 手动提交(不倡议)

_consumer_offsets
__consumer_offsets 这个 topic 外面存储的是 consumer offsets 信息,清理策略 log.cleanup.policy 默认是 compact,简略的说就是压缩雷同 key, 保留最初一个。其余 topic log 默认 log.cleanup.policy 默认是 delete。log.retention 参数针对的是 delete 的清理策略,对 compact 不失效。compact 是压缩后会清理掉垃圾文件次要和 log.cleaner 配置无关。可浏览:
Kafka 2.2.0 消息日志清理机制:日志删除 日志压缩

演示

kafka broker 版本:v2.0.0, sarama (go sdk)版本 v1.26.1
批改配置:
• log.retention.minutes: 5 (默认 log.retention.hours=168)

日志保留工夫

• log.retention.check.interval.ms: 1000 (默认 600000ms,10 分钟)

日志过期查看频率

• offsets.retention.minutes: 1 (以前默认 24 小时,2.0 版本后默认 10080, 7 天)

消费者的 offset 保留工夫

• offsets.retention.check.interval.ms: 1000 (默认 300000ms, 5 分钟)

消费者的过期 offset 查看频率

• 消费者初始化生产设置为OffsetOldest

消费者过期后从最老的 log offset 生产

consumer offset 过期工夫

1. 关上生产者和消费者

其中 offset10 是上一轮生产的

2. 读取__consumer_offsets 音讯查看消费者 offset 的状况

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'|grep -v console-consumer-

其中 ExpirationTime-CommitTime= 1 分钟, 阐明保留工夫是一分钟了

3. 敞开生产者,一分钟 (与 offsets.retention.minutes 和 offsets.retention.check.interval.ms 无关) 后__consumer_offsets 外面 [my-group,sync_time_test3,0] 接管到了null 音讯,代表 consumer 的 offsets 过期了。

4. 重启生产者
consumer 持续上次生产,此时如果不重启消费者是没事的。此时尽管 broker 外面曾经没有了 consumer 生产的 offset,然而 consumer 本地记住了上次 fetch 的 offset, 会持续拉取下一条音讯。只有初始化和 rebalance 时会fetchInitialOffset 从 broker 获取上次生产的 offset 而后从指定的 offset 拉取音讯。

5. 再次敞开生产者, 期待 consumer offset 过期

6. 马上重启 consumer 会从新生产

从新生产了,能够和第 4 步比照生产工夫

consumer offset 过期后 onsumer 为什么还在?

1. 先关上生产者消费者

2. 敞开生产者
consumer 过期了,然而查看 consumer 还有:

敞开消费者,consumer 隐没:

然而外面内容没了,内容能够应用上面命令查看:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
# 此命令在 kafka v2.0.0 会报错 https://github.com/apache/kafka/pull/4980

3. 剖析应该是因为心跳放弃 consumer 始终在

Consumer.Group.Session.Timeout 用于检测 worker 程序失败的超时。worker 定期发送心跳,以向代理表明其活性。如果在此会话超时过期之前代理没有接管到心跳,则代理将从组中删除。请留神,该值必须位于 broker 配置中配置 group.min.session.timeout.msgroup.max.session.timeout.ms之间。

咱们先设置成 30s:

而后设置 sleep 长时间不让 heartbeat:

4. 只关上消费者
my-group 三十秒后因为没有心跳被剔除

log 过期工夫

查看 topic 命令
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic sync_time_test3 --broker-list localhost:9092 --time -1 
# 查看 topic 的 offset(不是消费者的 offset), 最初的参数 - 1 示意显示获取以后 offset 最大值,- 2 示意 offset 的最小值

1. 先生产音讯,产生几条后敞开生产
查看 topic offset

产生音讯后,查看 topic offset

2. 关上消费者,生产后敞开

3. 隔一分多钟从新关上消费者,此时会从新生产

4. 隔 5 分多钟后 log 的 offset 最大值和最小值统一, 其余的曾经过期,只会保留下一次的 offset

5. 此时再关上消费者也没有音讯了

consumer commit code

上面是局部对于 CommitOffset 的代码:

// Consume implements ConsumerGroup.
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
    // Ensure group is not closed
    select {
    case <-c.closed:
        return ErrClosedConsumerGroup
    default:
    }
    ......
    // Refresh metadata for requested topics
    if err := c.client.RefreshMetadata(topics...); err != nil {return err}

    // Init session 留神这外面进入后会始终生产期待退出
    sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
    if err == ErrClosedClient {return ErrClosedConsumerGroup} else if err != nil {return err}
    ...
    // Gracefully release session claims  外面会执行 offsets.Close(), 会 flushToBroker  
    return sess.release(true)
}

//sess.release(true)--> offsets.Close():
func (om *offsetManager) Close() error {om.closeOnce.Do(func() {
        // exit the mainLoop
        close(om.closing)
        if om.conf.Consumer.Offsets.AutoCommit.Enable {<-om.closed}

        // mark all POMs as closed
        om.asyncClosePOMs()

        // flush one last time, 最初一次刷入
        if om.conf.Consumer.Offsets.AutoCommit.Enable {
            for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {om.flushToBroker()
                if om.releasePOMs(false) == 0 {break}
            }
        }
....
    })
    return nil
}

// 提交 offset 到 broker
func (om *offsetManager) Commit() {om.flushToBroker()
    om.releasePOMs(false)
}

func (om *offsetManager) flushToBroker() {req := om.constructRequest() // 这外面会查看是否有新的 offset 须要提交
    ......
    resp, err := broker.CommitOffset(req) // 这里就是提交了
    ......
    om.handleResponse(broker, req, resp)
}


func (om *offsetManager) constructRequest() *OffsetCommitRequest {
    var r *OffsetCommitRequest
    var perPartitionTimestamp int64
    
    // 这个是是否手动设置 offset 保留工夫,如果为 0,以 broker 的保留工夫为准
    if om.conf.Consumer.Offsets.Retention == 0 {
        perPartitionTimestamp = ReceiveTime
        r = &OffsetCommitRequest{
            Version:                 1,
            ConsumerGroup:           om.group,
            ConsumerID:              om.memberID,
            ConsumerGroupGeneration: om.generation,
        }
    } else {
        r = &OffsetCommitRequest{
            Version:                 2,
            RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
            ConsumerGroup:           om.group,
            ConsumerID:              om.memberID,
            ConsumerGroupGeneration: om.generation,
        }
    }

    for _, topicManagers := range om.poms {
        for _, pom := range topicManagers {pom.lock.Lock()
            if pom.dirty { // 这里的 dirty 就是判断数据是否有更新, 是否有新的 offset 须要提交
                r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
            }
            pom.lock.Unlock()}
    }

    if len(r.blocks) > 0 {return r}

    return nil
}

总结:

  • topic 日志的保留与 log.retention.check.interval.ms 和 log.retention.check.interval.ms 无关(可部分针对 topic 配置),即便 log 清理后也会保留最新 offset 信息,超过保留工夫后下次生产也会持续从上次开始。然而 topic: __consumer_offsets 默认是 compact 保留策略和 log.retention 无关。
  • 消费者信息的保留在__consumer_offsets。与 offsets.retention.minutes 和 offsets.retention.check.interval.ms 无关(不可针对指定 topic 进行 consumer 配置,是全局设置), 如果非要独自改能够更改 consumer 客户端的参数Consumer.Offsets.Retention, 如设置为 2 分钟:

    此时__consumer_offsets 中指定 consumer 的 ExpirationTime-CommitTime= 2 分钟:

  • 举荐设置消费者的过期工夫 offsets.retention > log 保留工夫 log.retention

举荐浏览:

•【kafka 原理】消费者偏移量__consumer_offsets_相干解析
• Kafka 中的消费者位移 __consumer_offsets
• Kafka 2.2.0 消息日志清理机制:日志删除 日志压缩
• https://kafka.apache.org/20/documentation.html

退出移动版