在公司业务中有场景须要实时订阅Topic,也就是当有新的Topic呈现时,须要主动发现、监听、生产

诸多比拟之后抉择了用户群体最多的 sarama,然而遇到了一个问题,这个包并没有实现像Java的一样的正则匹配策略,不要说正则,连实时刷新机制都没有,所以须要咱们本人来实现 Java 客户端 subscribe(Pattern)的通配符模式,废话不多说,间接上代码:

package mainimport (    "context"    "errors"    "log"    "os"    "os/signal"    "reflect"    "sort"    "strings"    "sync"    "syscall"    "time"    "github.com/IBM/sarama")// Sarama configuration optionsvar (    brokers  = "127.0.0.1:9092"    version  = "2.8.1" // Note: kafka broker version (not Sarama version)    group    = "kfk_group_id"    assignor = "sticky")func main() {    keepRunning := true    log.Println("Starting a new Sarama consumer")    sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)    version, err := sarama.ParseKafkaVersion(version)    if err != nil {        log.Panicf("Error parsing Kafka version: %v", err)    }    /**     * Construct a new Sarama configuration.     * The Kafka cluster version has to be defined before the consumer/producer is initialized.     */    config := sarama.NewConfig()    config.Version = version    config.Consumer.Offsets.Initial = sarama.OffsetOldest    switch assignor {    case "sticky":        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}    case "roundrobin":        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}    case "range":        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}    default:        log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)    }    /**     * Set up a new Sarama consumer group     */    consumer := Consumer{        ready: make(chan bool),    }    ctx, cancel := context.WithCancel(context.Background())    newClient, err := sarama.NewClient(strings.Split(brokers, ","), config)    consumerGroup, err := sarama.NewConsumerGroupFromClient(group, newClient)    if err != nil {        log.Fatalf("Error creating consumer group client: %v", err)    }    wg := &sync.WaitGroup{}    wg.Add(1)    // Get all the Topic    topics, err := newClient.Topics()    topics = filterTopics(topics)    go func() {        defer wg.Done()        for {            // `Consume` should be called inside an infinite loop, when a            // server-side rebalance happens, the consumer session will need to be            // recreated to get the new claims            if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {                if errors.Is(err, sarama.ErrClosedConsumerGroup) {                    return                }                log.Panicf("Error from consumer: %v", err)            }            // check if context was cancelled, signaling that the consumer should stop            if ctx.Err() != nil {                log.Printf("Context err from consumer: %v", ctx.Err())                return            }            consumer.ready = make(chan bool)        }    }()    <-consumer.ready // Await till the consumer has been set up    log.Println("Sarama consumer up and running!...")    go refreshTopics(newClient, consumerGroup, topics)    sigusr1 := make(chan os.Signal, 1)    signal.Notify(sigusr1, syscall.SIGUSR1)    sigterm := make(chan os.Signal, 1)    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)    for keepRunning {        select {        case <-ctx.Done():            log.Println("terminating: context cancelled")            keepRunning = false        case <-sigterm:            log.Println("terminating: via signal")            keepRunning = false        case <-sigusr1:        }    }    cancel()    wg.Wait()    if err = consumerGroup.Close(); err != nil {        log.Panicf("Error closing client: %v", err)    }}func EqualSlices(s1, s2 []string) bool {    if len(s1) != len(s2) {        return false    }    m1 := make(map[string]struct{})    m2 := make(map[string]struct{})    for _, v := range s1 {        m1[v] = struct{}{}    }    for _, v := range s2 {        m2[v] = struct{}{}    }    return reflect.DeepEqual(m1, m2)}func filterTopics(topics []string) []string {    filteredTopics := make([]string, 0)    for _, topic := range topics {        if topic != "__consumer_offsets" {            filteredTopics = append(filteredTopics, topic)        }    }    return filteredTopics}func refreshTopics(client sarama.Client, prevConsumerGroup sarama.ConsumerGroup, topicsOld []string) {    ticker := time.NewTicker(5 * time.Second)    for {        <-ticker.C        if err := client.RefreshMetadata(); err != nil {            log.Printf("Error refreshing metadata: %v", err)            continue        }        topics, err := client.Topics()        if err != nil {            log.Printf("Error refreshing topics: %v", err)            continue        }        filteredTopics := filterTopics(topics) // filter "__consumer_offsets"        sort.Strings(filteredTopics)        log.Printf("All Topics: %v", filteredTopics)        if !EqualSlices(filteredTopics, topicsOld) {            topicsOld = filteredTopics            if prevConsumerGroup != nil {                err := prevConsumerGroup.Close()                if err != nil {                    log.Printf("Error closing prev consumer group: %v", err)                }            }            newConsumer := Consumer{                ready: make(chan bool),            }            newConsumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)            if err != nil {                log.Printf("Error creating new consumer group: %v", err)                return            }            defer func(newConsumerGroup sarama.ConsumerGroup) {                err := newConsumerGroup.Close()                if err != nil {                    log.Printf("Error closing new consumer group: %v", err)                }            }(newConsumerGroup)            go func() {                ctx, cancel := context.WithCancel(context.Background())                defer cancel()                wg := &sync.WaitGroup{}                wg.Add(1)                // start Consume                go func() {                    defer wg.Done()                    if err := newConsumerGroup.Consume(ctx, filteredTopics, &newConsumer); err != nil {                        log.Printf("Error from consumer: %v", err)                    }                }()                wg.Wait()            }()        }    }}// Consumer represents a Sarama consumer group consumertype Consumer struct {    ready chan bool}// Setup is run at the beginning of a new session, before ConsumeClaimfunc (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {    // Mark the consumer as ready    close(consumer.ready)    return nil}// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exitedfunc (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {    return nil}// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().// Once the Messages() channel is closed, the Handler must finish its processing// loop and exit.func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    // NOTE:    // Do not move the code below to a goroutine.    // The `ConsumeClaim` itself is called within a goroutine, see:    // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29    for {        select {        case message, ok := <-claim.Messages():            if !ok {                log.Printf("message channel was closed")                return nil            }            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)            session.MarkMessage(message, "")        // Should return when `session.Context()` is done.        // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:        // https://github.com/IBM/sarama/issues/1192        case <-session.Context().Done():            return nil        }    }}

代码很简略,置信大家都能看懂我是在做什么了,其实就是减少了一个 refreshTopics 来刷新Topic,当检测到新的Topic的时候关掉之前的消费者组、创立新的组、订阅新的主题

留神:此处有一个大坑!当新的Topic退出的时候,refreshTopics 监测到新的音讯进行生产,这里如果不进行配置,则只会生产订阅之后的音讯,你不要遗记啊,能触发订阅新的主题的条件是有了一条新的音讯,那这条触发订阅的音讯去哪了呢?首先必定不会被原消费者组生产,因为他们还没订阅,其次,新启动的生产这组尽管订阅了新的Topic,然而因为你没有配置,它默认没有生产记录的新的Topic会从音讯的最初的地位进行生产(有生产记录的Topic从未生产的地位开始生产),所以,基本就不会生产到那条音讯!那怎么办呢?你只须要在启动的时候减少一个配置即可:

config.Consumer.Offsets.Initial = sarama.OffsetOldest

问题解决,撒花···