关于后端:sarama的消费者组分析使用

8次阅读

共计 8999 个字符,预计需要花费 23 分钟才能阅读完成。

kafka 的 go 客户端,应用最多的应该是 sarama,但以前老的 sarama 版本不反对消费者组的生产形式,所以大多数人都用 sarama-cluster。

起初 sarama 反对了消费者组的生产形式,sarama-cluster 也进行保护了,但网上对于 sarama 的消费者组的解析很少,且官网的样例很简略,所以这里剖析一下。

一、官网样例

官网样例比较简单:

1、通过 sarama.NewConfig 创立一个配置

2、通过 NewConsumerGroup 创立一个消费者组

3、通过 Consume 创立消费者组的会话,该函数的第三个参数即为该会话三个阶段的回调:Setup CleanupConsumeClaim,别离在创立会话之前、会话完结之后 和 会话生存中(次要就是在此阶段进行音讯读取)进行调用。

二、问题

1、当指定的 topic 在 kafka 中不存的时候,kafka 会新建该 topic,如果只想让用户生产已存在的 topic,那么该如何获取 kafka 中曾经存在的 topic?

2、setupCleanup 的调用流程是怎么的?会在哪些状况下被调用?

3、既然是消费者组,那如何查看组里某个消费者领有哪些 topic 和 partition?

4、如何应用指定的 offset 来生产某个 topic?

5、如何实现生产的 Exactly-once?

注:以上测试应用的示例代码是本人写的样例代码的局部内容,残缺的样例代码见文章最初

三、剖析
1、在 sarama 中,获取 topic 的接口在 Client interface 中,所以须要先通过 NewClient 接口创立一个 client,而后就能够通过该 client 的 Topics 接口获取到 kafka 中所有的 topic。但消费者组应用的类型是 ConsumerGroup,那该如何获取该类型呢?sarama 中提供 NewConsumerGroupFromClient 接口,能够从一个现存的 client 创立一个 ConsumerGroup,所以,批改后的流程,由原先的 NewConsumerGroup 间接创立,变成:

a、应用 NewClient 创立一个 client

b、应用 NewConsumerGroupFromClient 创立 ConsumerGroup

具体代码实现如下:

// 创立 client
newClient, err := sarama.NewClient(brokers, config)
if err != nil {log.Fatal(err)
}

// 获取所有的 topic
topics, err := newClient.Topics()
if err != nil {log.Fatal(err)
}
log.Info("topics:", topics)

// 依据 client 创立 consumerGroup
client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
if err != nil {log.Fatalf("Error creating consumer group client: %v", err)
}

这么做的益处就是:能够应用 client 的接口,获取一些信息,例如 kafka 的以后配置有哪些,controller 有哪些,brokers 有哪些,topic 总共有哪些,特定的 topic 有哪些 partitions,partition 以后的 offset 是多少 等等,具体性能可查看 Client interface

type Client interface {
    // Config returns the Config struct of the client. This struct should not be
    // altered after it has been created.
    Config() *Config

    // Controller returns the cluster controller broker. It will return a
    // locally cached value if it's available. You can call RefreshController
    // to update the cached value. Requires Kafka 0.10 or higher.
    Controller() (*Broker, error)

    // RefreshController retrieves the cluster controller from fresh metadata
    // and stores it in the local cache. Requires Kafka 0.10 or higher.
    RefreshController() (*Broker, error)

    // Brokers returns the current set of active brokers as retrieved from cluster metadata.
    Brokers() []*Broker
  ......
}

2、setupCleanupConsumeClaim 是 s.handler.ConsumeClaim 的三个接口,须要用户本人实现。能够简略了解为:当须要创立一个会话时,先运行 setup,而后在 ConsumerClaim 中解决音讯,最初运行 Cleanup

setup 会在一个新会话开始之前运行,且也在 ConsumerClaim 接口之前运行。调用流程为:Consume —> newSession —> newConsumerGroupSession —> handler.Setup

在调用了 Setup 之后,前面会创立一个协程,该协程外面其实调用的就是 ConsumeClaim 接口,所以咱们实现的 ConsumerClaim 其实是一个独自的协程,其调用流程为:Consume —> newSession —> newConsumerGroupSession —> consume —> s.handler.ConsumeClaim

Cleanup 会在一个会话完结的时候运行。调用流程为:Consume —>release —> s.handler.Cleanup

理解了调用流程之后,哪些状况又会调用到他们呢?—> 1、新建 consumeGroup 的时候。2、产生 rebalance 的时候。

咱们能够在 setup 和 cleanup 中加一个打印:

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {log.Info("setup")
  close(k.ready)
    return nil
}

func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {log.Info("cleanup")
    return nil
}

而后启动一个 consumer,能够察看到打印:

INFO[0000] setup
而后按 Ctrl + C 敞开 consumer,能够察看到打印:

INFO[0101] cleanup
阐明新建 consumer 而后退出时,会调用 setup 和 cleanup。

咱们再试一下产生 rebalance 的状况:先启动一个 consumer,而后再启动一个同一组的 consumer,能够看到打印为:

第一个启动的 consumer 打印为:INFO[0000] setup
INFO[0006] cleanup
INFO[0006] setup

第二个启动的 consumer 打印为:INFO[0002] setup

阐明在产生 reblance 的时候,会先敞开原先的会话,并调用 cleanup,而后再调用 setup,最初生成一个新的会话。

3、在 ConsumerGroupSession 接口中,有一个 Claims 接口,能够用来查看以后 consumer 被调配到哪些 topic 和 partition。咱们能够在 Setup 接口中进行打印:

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {log.Info("setup")
    log.Info(session.Claims())
    close(k.ready)
    return nil
}

这里应用 range 分区策略,订阅的 topic 为 t1p4 和 t2p4,每个 topic 都有 4 个分区,而后创立 3 个 consumer,产生的打印为:

consumer1:
INFO[0000] setup
INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]]
INFO[0009] cleanup
INFO[0009] setup
INFO[0009] map[t1p4:[0 1] t2p4:[2 3]]
INFO[0015] cleanup
INFO[0015] setup
INFO[0015] map[t1p4:[0] t2p4:[3]]

consumer2:
INFO[0002] setup
INFO[0002] map[t1p4:[2 3] t2p4:[0 1]]
INFO[0009] cleanup
INFO[0009] setup
INFO[0009] map[t1p4:[1 2] t2p4:[0]]

consumer3:
INFO[0000] setup
INFO[0000] map[t1p4:[3] t2p4:[1 2]]

当只有 consumer1 的时候,它被调配到所有的分区:t1p4:[0 1 2 3] t2p4:[0 1 2 3]

当 consumer2 退出的时候,consumer1 被调配的是:t1p4:[0 1] t2p4:[2 3],consumer2 被调配的是:t1p4:[2 3] t2p4:[0 1]

当 consumer3 退出的时候,consumert1 被调配的是:t1p4:[0] t2p4:[3],consumer2 被调配的是:t1p4:[1 2] t2p4:[0],consumer3 被调配的是:t1p4:[3] t2p4:[1 2]

有趣味的能够再顺次删除 consumer1,consumer2。

4、kafka 的 config 配置中,指定生产的 offset 只有两个:OffsetNewestOffsetOldest ,如果想指定 offset 进行生产,该怎么做呢?

后面说过,Setup 是运行在会话最一开始的中央,且这个时候曾经可能获取到所有的 topic 和 partition,所以这里能够应用 ConsumerGroupSessionResetOffset 接口进行设置,具体实现如下:(这里应用的主题:t2p4 已存在,且 0 分区中的 offset 曾经到 18)

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {log.Info("setup")
    session.ResetOffset("t2p4", 0, 13, "")
    log.Info(session.Claims())
    close(k.ready)
    return nil
}

func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
            message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
        session.MarkMessage(message, "")
    }
    return nil
}

此时,无论运行多少次,都能够生产 13 到 18 之间的音讯:

INFO[0000] setup
INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]]
INFO[0000] [topic:t2p4] [partiton:0] [offset:13] [value:a] [time:2021-10-12 23:02:35.058 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:14] [value:b] [time:2021-10-12 23:02:35.087 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:15] [value:c] [time:2021-10-12 23:02:35.092 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:16] [value:d] [time:2021-10-12 23:03:18.882 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:17] [value:e] [time:2021-10-12 23:03:18.898 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:18] [value:f] [time:2021-10-12 23:03:18.903 -0400 EDT]

5、后面曾经剖析了 Setup 的调用流程,以及能够在 Setup 中能够做的事件,那么就能够手动记录 topic 的 offset 到磁盘中(比方文本、数据库等),在 Setup 的接口中,读取之前记录的 offset,通过 ResetOffset 接口进行从新设置即可。当然,更新 offset 与 音讯解决这部分的一致性,须要业务本人保障(例如应用数据库的事务来实现)。

四、残缺样例代码

package main

import (
    "context"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
    log "github.com/sirupsen/logrus"
)

type Kafka struct {brokers           []string
    topics            []string
    startOffset       int64
    version           string
    ready             chan bool
    group             string
    channelBufferSize int
    assignor          string
}

var brokers = []string{"192.168.1.101:9092"}
var topics = []string{"t1p4", "t2p4"}
var group = "grp1"
var assignor = "range"

func NewKafka() *Kafka {
    return &Kafka{
        brokers:           brokers,
        topics:            topics,
        group:             group,
        channelBufferSize: 1000,
        ready:             make(chan bool),
        version:           "2.8.0",
        assignor:          assignor,
    }
}

func (k *Kafka) Connect() func() {log.Infoln("kafka init...")

    version, err := sarama.ParseKafkaVersion(k.version)
    if err != nil {log.Fatalf("Error parsing Kafka version: %v", err)
    }

    config := sarama.NewConfig()
    config.Version = version
    // 分区调配策略
    switch assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
    }
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    config.ChannelBufferSize = k.channelBufferSize // channel 长度

    // 创立 client
    newClient, err := sarama.NewClient(brokers, config)
    if err != nil {log.Fatal(err)
    }
    // 获取所有的 topic
    topics, err := newClient.Topics()
    if err != nil {log.Fatal(err)
    }
    log.Info("topics:", topics)

    // 依据 client 创立 consumerGroup
    client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
    if err != nil {log.Fatalf("Error creating consumer group client: %v", err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {defer wg.Done()
        for {if err := client.Consume(ctx, k.topics, k); err != nil {
                // 当 setup 失败的时候,error 会返回到这里
                log.Errorf("Error from consumer: %v", err)
                return
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {log.Println(ctx.Err())
                return
            }
            k.ready = make(chan bool)
        }
    }()
    <-k.ready
    log.Infoln("Sarama consumer up and running!...")
    // 保障在零碎退出时,通道外面的音讯被生产
    return func() {log.Info("kafka close")
        cancel()
        wg.Wait()
        if err = client.Close(); err != nil {log.Errorf("Error closing client: %v", err)
        }
    }
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {log.Info("setup")
    session.ResetOffset("t2p4", 0, 13, "")
    log.Info(session.Claims())
    // Mark the consumer as ready
    close(k.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {log.Info("cleanup")
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // <https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29>
    // 具体生产音讯
    for message := range claim.Messages() {log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
            message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
        // 更新位移
        session.MarkMessage(message, "")
    }
    return nil
}

func main() {k := NewKafka()
    c := k.Connect()

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-sigterm:
        log.Warnln("terminating: via signal")
    }
    c()}

五、在滴普 ADB 产品中的利用

ADB 产品作为实时数仓应用,一个重要的性能就是实时导入数据,该性能次要由 ADB 产品下的 IS 组件实现,次要流程为:

1、从各个数据源采集源数据。

2、flink 实时处理这些源数据后写至 kafka。

3、IS 组件读取 kafka,通过并行数据导入协定,将数据实时写入数仓中。

IS 组件应用 kafka 的 consumerGroup 形式进行生产,可依据数据量的大小,程度扩大 IS 的生产能力,并利用数仓的事务实现导入的数据的 Exactly-once,实现端到端的数据交付。

正文完
 0