关于go:Golang使用Kafka自动刷新主题

44次阅读

共计 6232 个字符,预计需要花费 16 分钟才能阅读完成。

在公司业务中有场景须要实时订阅 Topic,也就是当有新的 Topic 呈现时,须要主动发现、监听、生产

诸多比拟之后抉择了用户群体最多的 sarama,然而遇到了一个问题,这个包并没有实现像 Java 的一样的正则匹配策略,不要说正则,连实时刷新机制都没有,所以须要咱们本人来实现 Java 客户端 subscribe(Pattern) 的通配符模式,废话不多说,间接上代码:


package main

import (
    "context"
    "errors"
    "log"
    "os"
    "os/signal"
    "reflect"
    "sort"
    "strings"
    "sync"
    "syscall"
    "time"

    "github.com/IBM/sarama"
)

// Sarama configuration options
var (
    brokers  = "127.0.0.1:9092"
    version  = "2.8.1" // Note: kafka broker version (not Sarama version)
    group    = "kfk_group_id"
    assignor = "sticky"
)

func main() {
    keepRunning := true

    log.Println("Starting a new Sarama consumer")
    sarama.Logger = log.New(os.Stdout, "[sarama]", log.LstdFlags)
    version, err := sarama.ParseKafkaVersion(version)
    if err != nil {log.Panicf("Error parsing Kafka version: %v", err)
    }

    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    config := sarama.NewConfig()
    config.Version = version
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    switch assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
    case "roundrobin":
        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
    case "range":
        config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
    default:
        log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
    }

    /**
     * Set up a new Sarama consumer group
     */
    consumer := Consumer{ready: make(chan bool),
    }

    ctx, cancel := context.WithCancel(context.Background())

    newClient, err := sarama.NewClient(strings.Split(brokers, ","), config)
    consumerGroup, err := sarama.NewConsumerGroupFromClient(group, newClient)
    if err != nil {log.Fatalf("Error creating consumer group client: %v", err)
    }

    wg := &sync.WaitGroup{}
    wg.Add(1)

    // Get all the Topic
    topics, err := newClient.Topics()

    topics = filterTopics(topics)

    go func() {defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {if errors.Is(err, sarama.ErrClosedConsumerGroup) {return}
                log.Panicf("Error from consumer: %v", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {log.Printf("Context err from consumer: %v", ctx.Err())
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    <-consumer.ready // Await till the consumer has been set up

    log.Println("Sarama consumer up and running!...")

    go refreshTopics(newClient, consumerGroup, topics)

    sigusr1 := make(chan os.Signal, 1)
    signal.Notify(sigusr1, syscall.SIGUSR1)

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

    for keepRunning {
        select {case <-ctx.Done():
            log.Println("terminating: context cancelled")
            keepRunning = false

        case <-sigterm:
            log.Println("terminating: via signal")
            keepRunning = false

        case <-sigusr1:

        }
    }

    cancel()

    wg.Wait()
    if err = consumerGroup.Close(); err != nil {log.Panicf("Error closing client: %v", err)
    }
}

func EqualSlices(s1, s2 []string) bool {if len(s1) != len(s2) {return false}

    m1 := make(map[string]struct{})
    m2 := make(map[string]struct{})

    for _, v := range s1 {m1[v] = struct{}{}
    }

    for _, v := range s2 {m2[v] = struct{}{}
    }

    return reflect.DeepEqual(m1, m2)
}

func filterTopics(topics []string) []string {filteredTopics := make([]string, 0)
    for _, topic := range topics {
        if topic != "__consumer_offsets" {filteredTopics = append(filteredTopics, topic)
        }
    }
    return filteredTopics
}

func refreshTopics(client sarama.Client, prevConsumerGroup sarama.ConsumerGroup, topicsOld []string) {ticker := time.NewTicker(5 * time.Second)

    for {
        <-ticker.C

        if err := client.RefreshMetadata(); err != nil {log.Printf("Error refreshing metadata: %v", err)
            continue
        }

        topics, err := client.Topics()
        if err != nil {log.Printf("Error refreshing topics: %v", err)
            continue
        }

        filteredTopics := filterTopics(topics) // filter "__consumer_offsets"
        sort.Strings(filteredTopics)
        log.Printf("All Topics: %v", filteredTopics)

        if !EqualSlices(filteredTopics, topicsOld) {
            topicsOld = filteredTopics

            if prevConsumerGroup != nil {err := prevConsumerGroup.Close()
                if err != nil {log.Printf("Error closing prev consumer group: %v", err)
                }
            }

            newConsumer := Consumer{ready: make(chan bool),
            }

            newConsumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)
            if err != nil {log.Printf("Error creating new consumer group: %v", err)
                return
            }

            defer func(newConsumerGroup sarama.ConsumerGroup) {err := newConsumerGroup.Close()
                if err != nil {log.Printf("Error closing new consumer group: %v", err)
                }
            }(newConsumerGroup)

            go func() {ctx, cancel := context.WithCancel(context.Background())
                defer cancel()

                wg := &sync.WaitGroup{}
                wg.Add(1)

                // start Consume
                go func() {defer wg.Done()
                    if err := newConsumerGroup.Consume(ctx, filteredTopics, &newConsumer); err != nil {log.Printf("Error from consumer: %v", err)
                    }
                }()

                wg.Wait()}()}
    }
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {ready chan bool}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {return nil}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
    for {
        select {case message, ok := <-claim.Messages():
            if !ok {log.Printf("message channel was closed")

                return nil
            }

            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)

            session.MarkMessage(message, "")
        // Should return when `session.Context()` is done.
        // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
        // https://github.com/IBM/sarama/issues/1192
        case <-session.Context().Done():
            return nil
        }
    }
}

代码很简略,置信大家都能看懂我是在做什么了,其实就是减少了一个 refreshTopics 来刷新 Topic,当检测到新的 Topic 的时候关掉之前的消费者组、创立新的组、订阅新的主题

留神:此处有一个大坑!当新的 Topic 退出的时候,refreshTopics 监测到新的音讯进行生产,这里如果不进行配置,则只会生产订阅之后的音讯,你不要遗记啊,能触发订阅新的主题的条件是有了一条新的音讯,那这条触发订阅的音讯去哪了呢?首先必定不会被原消费者组生产,因为他们还没订阅,其次, 新启动的生产这组尽管订阅了新的 Topic,然而因为你没有配置,它默认没有生产记录的新的 Topic 会从音讯的最初的地位进行生产(有生产记录的 Topic 从未生产的地位开始生产),所以,基本就不会生产到那条音讯!那怎么办呢?你只须要在启动的时候减少一个配置即可:

config.Consumer.Offsets.Initial = sarama.OffsetOldest

问题解决,撒花···

正文完
 0