consumer生产

consumer本地会记录两种offset:
• 生产拉取的offset, 初始化和rebalance时会fetchInitialOffset从broker获取上次生产的offset而后从指定的offset拉取音讯。接下来Consumer会一直通过fetchNewMessages() 到broker从指定offset拉取音讯。
• 生产提交的offset,生产后主动/手动提交曾经生产的offset

讲下提交步骤:
1.本地先标记offset, MarkOffset:

  • 留神此时只是本地标记曾经生产的, 并没有真正提交到broker

2.再执行CommitOffset提交到broker,而CommitOffset有上面两种形式:

  • 主动提交(默认都是这种),Consumer.Offsets.AutoCommit.Enable(默认关上)

    两种状况触发CommitOffset主动提交:
    1.定时提交,与Consumer.Offsets.AutoCommit.Interval相干
    2.进行consumer sessions中断生产时退出的时候会触发提交
  • 手动提交(不倡议)

_consumer_offsets
__consumer_offsets这个topic外面存储的是consumer offsets信息,清理策略log.cleanup.policy默认是compact,简略的说就是压缩雷同key, 保留最初一个。其余topic log默认log.cleanup.policy默认是delete。 log.retention参数针对的是delete的清理策略,对compact不失效。compact是压缩后会清理掉垃圾文件次要和log.cleaner配置无关。可浏览:
Kafka 2.2.0 消息日志清理机制:日志删除 日志压缩

演示

kafka broker版本:v2.0.0, sarama (go sdk)版本 v1.26.1
批改配置:
• log.retention.minutes: 5 (默认log.retention.hours=168)

日志保留工夫

• log.retention.check.interval.ms: 1000 (默认600000ms,10分钟)

日志过期查看频率

• offsets.retention.minutes: 1 (以前默认24小时,2.0版本后默认10080, 7天)

消费者的offset保留工夫

• offsets.retention.check.interval.ms: 1000 (默认300000ms, 5分钟)

消费者的过期offset查看频率

• 消费者初始化生产设置为OffsetOldest

消费者过期后从最老的log offset生产

consumer offset 过期工夫

1.关上生产者和消费者


其中offset10是上一轮生产的

2.读取__consumer_offsets音讯查看消费者offset的状况

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'|grep -v console-consumer-

其中ExpirationTime-CommitTime=1分钟,阐明保留工夫是一分钟了

3.敞开生产者,一分钟(与offsets.retention.minutes和offsets.retention.check.interval.ms无关) 后__consumer_offsets外面[my-group,sync_time_test3,0] 接管到了null音讯,代表consumer的offsets过期了。

4.重启生产者
consumer持续上次生产,此时如果不重启消费者是没事的。此时尽管broker外面曾经没有了consumer生产的offset,然而consumer本地记住了上次fetch的offset, 会持续拉取下一条音讯。只有初始化和rebalance时会fetchInitialOffset从broker获取上次生产的offset而后从指定的offset拉取音讯。

5.再次敞开生产者,期待consumer offset过期

6.马上重启consumer会从新生产

从新生产了,能够和第4步比照生产工夫

consumer offset 过期后onsumer为什么还在?

1.先关上生产者消费者

2.敞开生产者
consumer过期了,然而查看consumer还有:


敞开消费者,consumer隐没:

然而外面内容没了,内容能够应用上面命令查看:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe# 此命令在kafka v2.0.0会报错 https://github.com/apache/kafka/pull/4980

3.剖析应该是因为心跳放弃consumer始终在

Consumer.Group.Session.Timeout用于检测worker程序失败的超时。worker定期发送心跳,以向代理表明其活性。如果在此会话超时过期之前代理没有接管到心跳,则代理将从组中删除。请留神,该值必须位于broker配置中配置group.min.session.timeout.msgroup.max.session.timeout.ms之间。

咱们先设置成30s:

而后设置sleep长时间不让heartbeat:

4.只关上消费者
my-group 三十秒后因为没有心跳被剔除

log过期工夫

查看topic命令./kafka-run-class.sh kafka.tools.GetOffsetShell --topic sync_time_test3 --broker-list localhost:9092 --time -1 # 查看topic的offset(不是消费者的offset),最初的参数-1示意显示获取以后offset最大值,-2示意offset的最小值

1.先生产音讯,产生几条后敞开生产
查看topic offset

产生音讯后,查看topic offset

2.关上消费者,生产后敞开

3.隔一分多钟从新关上消费者,此时会从新生产

4.隔5分多钟后log的offset最大值和最小值统一, 其余的曾经过期,只会保留下一次的offset

5.此时再关上消费者也没有音讯了

consumer commit code

上面是局部对于CommitOffset的代码:

// Consume implements ConsumerGroup.func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {    // Ensure group is not closed    select {    case <-c.closed:        return ErrClosedConsumerGroup    default:    }    ......    // Refresh metadata for requested topics    if err := c.client.RefreshMetadata(topics...); err != nil {        return err    }    // Init session 留神这外面进入后会始终生产期待退出    sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)    if err == ErrClosedClient {        return ErrClosedConsumerGroup    } else if err != nil {        return err    }    ...    // Gracefully release session claims  外面会执行offsets.Close(), 会flushToBroker      return sess.release(true)}//sess.release(true)--> offsets.Close():func (om *offsetManager) Close() error {    om.closeOnce.Do(func() {        // exit the mainLoop        close(om.closing)        if om.conf.Consumer.Offsets.AutoCommit.Enable {            <-om.closed        }        // mark all POMs as closed        om.asyncClosePOMs()        // flush one last time,最初一次刷入        if om.conf.Consumer.Offsets.AutoCommit.Enable {            for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {                om.flushToBroker()                if om.releasePOMs(false) == 0 {                    break                }            }        }....    })    return nil}//提交offset到brokerfunc (om *offsetManager) Commit() {    om.flushToBroker()    om.releasePOMs(false)}func (om *offsetManager) flushToBroker() {    req := om.constructRequest() //这外面会查看是否有新的offset须要提交    ......    resp, err := broker.CommitOffset(req) //这里就是提交了    ......    om.handleResponse(broker, req, resp)}func (om *offsetManager) constructRequest() *OffsetCommitRequest {    var r *OffsetCommitRequest    var perPartitionTimestamp int64        //这个是是否手动设置offset保留工夫,如果为0,以broker的保留工夫为准    if om.conf.Consumer.Offsets.Retention == 0 {        perPartitionTimestamp = ReceiveTime        r = &OffsetCommitRequest{            Version:                 1,            ConsumerGroup:           om.group,            ConsumerID:              om.memberID,            ConsumerGroupGeneration: om.generation,        }    } else {        r = &OffsetCommitRequest{            Version:                 2,            RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),            ConsumerGroup:           om.group,            ConsumerID:              om.memberID,            ConsumerGroupGeneration: om.generation,        }    }    for _, topicManagers := range om.poms {        for _, pom := range topicManagers {            pom.lock.Lock()            if pom.dirty { //这里的dirty就是判断数据是否有更新,是否有新的offset须要提交                r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)            }            pom.lock.Unlock()        }    }    if len(r.blocks) > 0 {        return r    }    return nil}

总结:

  • topic日志的保留与log.retention.check.interval.ms和log.retention.check.interval.ms无关(可部分针对topic配置),即便log清理后也会保留最新offset信息,超过保留工夫后下次生产也会持续从上次开始。然而topic: __consumer_offsets默认是compact保留策略和log.retention无关。
  • 消费者信息的保留在__consumer_offsets。 与offsets.retention.minutes和offsets.retention.check.interval.ms无关(不可针对指定topic进行consumer配置,是全局设置), 如果非要独自改能够更改consumer客户端的参数Consumer.Offsets.Retention, 如设置为2分钟:

    此时__consumer_offsets中指定consumer的ExpirationTime-CommitTime=2分钟:
  • 举荐设置消费者的过期工夫offsets.retention > log保留工夫log.retention

举荐浏览:

• 【kafka原理】 消费者偏移量__consumer_offsets_相干解析
• Kafka 中的消费者位移 __consumer_offsets
• Kafka 2.2.0 消息日志清理机制:日志删除 日志压缩
• https://kafka.apache.org/20/documentation.html