在公司业务中有场景须要实时订阅 Topic,也就是当有新的 Topic 呈现时,须要主动发现、监听、生产
诸多比拟之后抉择了用户群体最多的 sarama,然而遇到了一个问题,这个包并没有实现像 Java 的一样的正则匹配策略,不要说正则,连实时刷新机制都没有,所以须要咱们本人来实现 Java 客户端 subscribe(Pattern) 的通配符模式,废话不多说,间接上代码:
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"reflect"
"sort"
"strings"
"sync"
"syscall"
"time"
"github.com/IBM/sarama"
)
// Sarama configuration options
var (
brokers = "127.0.0.1:9092"
version = "2.8.1" // Note: kafka broker version (not Sarama version)
group = "kfk_group_id"
assignor = "sticky"
)
func main() {
keepRunning := true
log.Println("Starting a new Sarama consumer")
sarama.Logger = log.New(os.Stdout, "[sarama]", log.LstdFlags)
version, err := sarama.ParseKafkaVersion(version)
if err != nil {log.Panicf("Error parsing Kafka version: %v", err)
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case "range":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
/**
* Set up a new Sarama consumer group
*/
consumer := Consumer{ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
newClient, err := sarama.NewClient(strings.Split(brokers, ","), config)
consumerGroup, err := sarama.NewConsumerGroupFromClient(group, newClient)
if err != nil {log.Fatalf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
// Get all the Topic
topics, err := newClient.Topics()
topics = filterTopics(topics)
go func() {defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {if errors.Is(err, sarama.ErrClosedConsumerGroup) {return}
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {log.Printf("Context err from consumer: %v", ctx.Err())
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
go refreshTopics(newClient, consumerGroup, topics)
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
for keepRunning {
select {case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
}
}
cancel()
wg.Wait()
if err = consumerGroup.Close(); err != nil {log.Panicf("Error closing client: %v", err)
}
}
func EqualSlices(s1, s2 []string) bool {if len(s1) != len(s2) {return false}
m1 := make(map[string]struct{})
m2 := make(map[string]struct{})
for _, v := range s1 {m1[v] = struct{}{}
}
for _, v := range s2 {m2[v] = struct{}{}
}
return reflect.DeepEqual(m1, m2)
}
func filterTopics(topics []string) []string {filteredTopics := make([]string, 0)
for _, topic := range topics {
if topic != "__consumer_offsets" {filteredTopics = append(filteredTopics, topic)
}
}
return filteredTopics
}
func refreshTopics(client sarama.Client, prevConsumerGroup sarama.ConsumerGroup, topicsOld []string) {ticker := time.NewTicker(5 * time.Second)
for {
<-ticker.C
if err := client.RefreshMetadata(); err != nil {log.Printf("Error refreshing metadata: %v", err)
continue
}
topics, err := client.Topics()
if err != nil {log.Printf("Error refreshing topics: %v", err)
continue
}
filteredTopics := filterTopics(topics) // filter "__consumer_offsets"
sort.Strings(filteredTopics)
log.Printf("All Topics: %v", filteredTopics)
if !EqualSlices(filteredTopics, topicsOld) {
topicsOld = filteredTopics
if prevConsumerGroup != nil {err := prevConsumerGroup.Close()
if err != nil {log.Printf("Error closing prev consumer group: %v", err)
}
}
newConsumer := Consumer{ready: make(chan bool),
}
newConsumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)
if err != nil {log.Printf("Error creating new consumer group: %v", err)
return
}
defer func(newConsumerGroup sarama.ConsumerGroup) {err := newConsumerGroup.Close()
if err != nil {log.Printf("Error closing new consumer group: %v", err)
}
}(newConsumerGroup)
go func() {ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
// start Consume
go func() {defer wg.Done()
if err := newConsumerGroup.Consume(ctx, filteredTopics, &newConsumer); err != nil {log.Printf("Error from consumer: %v", err)
}
}()
wg.Wait()}()}
}
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {ready chan bool}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {return nil}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {case message, ok := <-claim.Messages():
if !ok {log.Printf("message channel was closed")
return nil
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
代码很简略,置信大家都能看懂我是在做什么了,其实就是减少了一个 refreshTopics
来刷新 Topic,当检测到新的 Topic 的时候关掉之前的消费者组、创立新的组、订阅新的主题
留神:此处有一个大坑!当新的 Topic 退出的时候,refreshTopics
监测到新的音讯进行生产,这里如果不进行配置,则只会生产订阅之后的音讯,你不要遗记啊,能触发订阅新的主题的条件是有了一条新的音讯,那这条触发订阅的音讯去哪了呢?首先必定不会被原消费者组生产,因为他们还没订阅,其次, 新启动的生产这组尽管订阅了新的 Topic,然而因为你没有配置,它默认没有生产记录的新的 Topic 会从音讯的最初的地位进行生产(有生产记录的 Topic 从未生产的地位开始生产),所以,基本就不会生产到那条音讯!那怎么办呢?你只须要在启动的时候减少一个配置即可:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
问题解决,撒花···