在公司业务中有场景须要实时订阅Topic,也就是当有新的Topic呈现时,须要主动发现、监听、生产
诸多比拟之后抉择了用户群体最多的 sarama,然而遇到了一个问题,这个包并没有实现像Java的一样的正则匹配策略,不要说正则,连实时刷新机制都没有,所以须要咱们本人来实现 Java 客户端 subscribe(Pattern)的通配符模式,废话不多说,间接上代码:
package mainimport ( "context" "errors" "log" "os" "os/signal" "reflect" "sort" "strings" "sync" "syscall" "time" "github.com/IBM/sarama")// Sarama configuration optionsvar ( brokers = "127.0.0.1:9092" version = "2.8.1" // Note: kafka broker version (not Sarama version) group = "kfk_group_id" assignor = "sticky")func main() { keepRunning := true log.Println("Starting a new Sarama consumer") sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) version, err := sarama.ParseKafkaVersion(version) if err != nil { log.Panicf("Error parsing Kafka version: %v", err) } /** * Construct a new Sarama configuration. * The Kafka cluster version has to be defined before the consumer/producer is initialized. */ config := sarama.NewConfig() config.Version = version config.Consumer.Offsets.Initial = sarama.OffsetOldest switch assignor { case "sticky": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()} case "roundrobin": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} case "range": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } /** * Set up a new Sarama consumer group */ consumer := Consumer{ ready: make(chan bool), } ctx, cancel := context.WithCancel(context.Background()) newClient, err := sarama.NewClient(strings.Split(brokers, ","), config) consumerGroup, err := sarama.NewConsumerGroupFromClient(group, newClient) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } wg := &sync.WaitGroup{} wg.Add(1) // Get all the Topic topics, err := newClient.Topics() topics = filterTopics(topics) go func() { defer wg.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil { if errors.Is(err, sarama.ErrClosedConsumerGroup) { return } log.Panicf("Error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { log.Printf("Context err from consumer: %v", ctx.Err()) return } consumer.ready = make(chan bool) } }() <-consumer.ready // Await till the consumer has been set up log.Println("Sarama consumer up and running!...") go refreshTopics(newClient, consumerGroup, topics) sigusr1 := make(chan os.Signal, 1) signal.Notify(sigusr1, syscall.SIGUSR1) sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) for keepRunning { select { case <-ctx.Done(): log.Println("terminating: context cancelled") keepRunning = false case <-sigterm: log.Println("terminating: via signal") keepRunning = false case <-sigusr1: } } cancel() wg.Wait() if err = consumerGroup.Close(); err != nil { log.Panicf("Error closing client: %v", err) }}func EqualSlices(s1, s2 []string) bool { if len(s1) != len(s2) { return false } m1 := make(map[string]struct{}) m2 := make(map[string]struct{}) for _, v := range s1 { m1[v] = struct{}{} } for _, v := range s2 { m2[v] = struct{}{} } return reflect.DeepEqual(m1, m2)}func filterTopics(topics []string) []string { filteredTopics := make([]string, 0) for _, topic := range topics { if topic != "__consumer_offsets" { filteredTopics = append(filteredTopics, topic) } } return filteredTopics}func refreshTopics(client sarama.Client, prevConsumerGroup sarama.ConsumerGroup, topicsOld []string) { ticker := time.NewTicker(5 * time.Second) for { <-ticker.C if err := client.RefreshMetadata(); err != nil { log.Printf("Error refreshing metadata: %v", err) continue } topics, err := client.Topics() if err != nil { log.Printf("Error refreshing topics: %v", err) continue } filteredTopics := filterTopics(topics) // filter "__consumer_offsets" sort.Strings(filteredTopics) log.Printf("All Topics: %v", filteredTopics) if !EqualSlices(filteredTopics, topicsOld) { topicsOld = filteredTopics if prevConsumerGroup != nil { err := prevConsumerGroup.Close() if err != nil { log.Printf("Error closing prev consumer group: %v", err) } } newConsumer := Consumer{ ready: make(chan bool), } newConsumerGroup, err := sarama.NewConsumerGroupFromClient(group, client) if err != nil { log.Printf("Error creating new consumer group: %v", err) return } defer func(newConsumerGroup sarama.ConsumerGroup) { err := newConsumerGroup.Close() if err != nil { log.Printf("Error closing new consumer group: %v", err) } }(newConsumerGroup) go func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg := &sync.WaitGroup{} wg.Add(1) // start Consume go func() { defer wg.Done() if err := newConsumerGroup.Consume(ctx, filteredTopics, &newConsumer); err != nil { log.Printf("Error from consumer: %v", err) } }() wg.Wait() }() } }}// Consumer represents a Sarama consumer group consumertype Consumer struct { ready chan bool}// Setup is run at the beginning of a new session, before ConsumeClaimfunc (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil}// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exitedfunc (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil}// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().// Once the Messages() channel is closed, the Handler must finish its processing// loop and exit.func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 for { select { case message, ok := <-claim.Messages(): if !ok { log.Printf("message channel was closed") return nil } log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil } }}
代码很简略,置信大家都能看懂我是在做什么了,其实就是减少了一个 refreshTopics
来刷新Topic,当检测到新的Topic的时候关掉之前的消费者组、创立新的组、订阅新的主题
留神:此处有一个大坑!当新的Topic退出的时候,refreshTopics
监测到新的音讯进行生产,这里如果不进行配置,则只会生产订阅之后的音讯,你不要遗记啊,能触发订阅新的主题的条件是有了一条新的音讯,那这条触发订阅的音讯去哪了呢?首先必定不会被原消费者组生产,因为他们还没订阅,其次,新启动的生产这组尽管订阅了新的Topic,然而因为你没有配置,它默认没有生产记录的新的Topic会从音讯的最初的地位进行生产(有生产记录的Topic从未生产的地位开始生产),所以,基本就不会生产到那条音讯!那怎么办呢?你只须要在启动的时候减少一个配置即可:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
问题解决,撒花···