原文地址:https://github.com/WilburXu/b...

Kafka体系架构

Broker服务代理节点

服务代理节点。对于Kafka而言,Broker能够简略地看作一个独立的Kafka服务节点或Kafka服务实例。大多数状况下也能够将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例,一个或多个Broker组成了一个Kafka集群。

Producer和Consumer

Producer生产者

生产者,也就是发送音讯的一方。生产者负责创立音讯,而后将其投递到Kafka中。

一个失常的生产逻辑须要具备以下几个步骤:

  1. 创立生产者实例
  2. 构建待发送的音讯
  3. 发送音讯到指定的TopicPartitionKey
  4. 敞开生产者实例

Consumer消费者

消费者,也就是接管音讯的一方。消费者连贯到Kafka上并接管音讯,从而进行相应的业务逻辑解决。

生产个别有三种生产模式:

单线程模式

单个线程生产多个Partition

问题:

  • 效率低,并发上不去
  • 可用性差,单个线程挂了,将无奈生产

多线程模式

独立消费者模式

和单线程模式相似,区别就是为每一个Partition独自起一个线程进行生产。

问题:

  • 线程和并发减少了,然而单线程挂了,该线程的分区还是无奈生产。
生产组模式

也是目前最罕用的生产模式,咱们能够创立多个生产实例并设置同一个group-id来辨别生产组,同一个生产组能够指定一个或多个Topic进行生产:

  • 生产组自均衡(Rebalance),kafka会依据生产组实例数量和分区数量自均衡调配
  • 不会反复生产,同个组内kafka确保一个分区只会发往一个生产实例,防止反复生产
  • 高可用,当一个生产实例挂了,kafka会主动调整生产实例和分区的关系

Topic主题

Kafka中的音讯以主题为单位进行归类(逻辑概念,生产者负责将音讯发送到特定的主题(发送到Kafka集群中的每一条音讯都要指定一个主题),而消费者负责订阅主题并进行生产。

Partition分区

物理分区,主题细分为了1或多个分区,一个分区只能属于单个主题,个别也会把分区称为主题分区(Topic-Partition)。

Segment

理论存储数据的中央,Segment蕴含一个数据文件和一个索引文件。一个Partition有多个大小雷同的Segment,能够了解为Partition是在Segment之上进行的逻辑形象。

Kafka根本命令

zookeeper

broker节点保留在zookeeper,所有须要:

  1. 进入zookeeper,而后 ./bin/zkCli.sh
  2. 执行ls /brokers/ids

查看broker详情

kafka-log-dirs.sh --describe --bootstrap-server kafka:9092 --broker-list 1

topic

查看列表

kafka-topics.sh --list --zookeeper zookeeper:2181

创立

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic [topic_name]

查看详情

kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic [topic_name]

删除

kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic [topic_name]

topic生产状况

topic offset 最小

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -2

topic offset最大

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -1

生产

增加数据

kafka-console-producer.sh --broker-list localhost:9092 --topic [topic_name]

生产

从头部开始生产

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning

从尾部开始生产,必须要指定分区

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0

从某个地位开始生产(--offset [n])

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset 100 --partition 0

生产指定个数(--max-messages [n])

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0 --max-messages 2

生产组

查看生产组列表

kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

查看生产组状况

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group [group_id]

offset 偏移设置为最早

kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-earliest --all-topics --execute

offset 偏移设置为新

kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-latest --all-topics --execute

offset 偏移设置为指定地位

kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-offset 2000 --all-topics --execute

offset 偏移设置某个工夫之后最早位移

kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-datetime 2020-12-28T00:00:00.000 --all-topics --execute

Go案例

基于https://github.com/Shopify/sarama的生产和生产案例

生产者

InitKafka.go

package kafkavar (    kafkaClient *Client)func InitKafka() {    var err error    var config = Config{        Host: []string{"kafka:9092"},    }    kafkaClient, err = NewClient(config)    if err != nil {        panic(err)    }}func GetClient() *Client {    return kafkaClient}

Producer.go

package kafkaimport (   "errors"   "github.com/Shopify/sarama")type Client struct {   sarama.AsyncProducer   msgPool chan *sarama.ProducerMessage}type Config struct {   Host          []string `json:"host"`   ReturnSuccess bool     `json:"return_success"`   ReturnErrors  bool     `json:"return_errors"`}func NewClient(cfg Config) (*Client, error) {   // create client   var err error   c := &Client{      msgPool: make(chan *sarama.ProducerMessage, 2000),   }   config := sarama.NewConfig()   config.Producer.Return.Errors = cfg.ReturnErrors   config.Producer.Return.Successes = cfg.ReturnSuccess   config.Version = sarama.V2_0_0_0   c.AsyncProducer, err = sarama.NewAsyncProducer(cfg.Host, config)   if err != nil {      return nil, err   }   return c, nil}// runfunc (c *Client) Run() {   for {      select {      case msg := <-c.msgPool:         c.Input() <- msg         logger.Info("%+v", msg)      }   }}// send msgfunc (c *Client) Send(topic string, msg []byte) error {   if topic == "" {      return errors.New("kafka producer send msg topic empty")   }   kafkaMsg := &sarama.ProducerMessage{      Topic: topic,      Value: sarama.ByteEncoder(msg),   }   c.msgPool <- kafkaMsg   return nil}

生产者初始化

// kafka initkafka.InitKafka()go kafka.GetClient().Run()

消费者

consumer.go

package kafka_consumerimport (   "context"   "github.com/Shopify/sarama"   "os"   "os/signal"   "sync"   "syscall")// Consumer represents a Sarama consumer group consumertype Consumer struct {   ready chan bool}func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {   //panic("implement me")   return nil}func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {   //panic("implement me")   return nil}func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   for message := range claim.Messages() {      logger.Info("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)      session.MarkMessage(message, "")      c.Handler(message.Topic, message.Value)   }      return nil}func (c *Consumer) Handler(topic string, msg []byte) {   switch topic {   case conscom.KafkaTopicGiftOrder:      GiftOrder(topic, msg)   case conscom.KafkaTopicFollow:      UserFollow(topic, msg)   }}func ConsumeInit(topics []string, groupID string) {   consumer := Consumer{      ready: make(chan bool),   }   brokerList := []string{"kafka:9092"}   config := sarama.NewConfig()   config.Version = sarama.V1_0_0_0   ctx, cancel := context.WithCancel(context.Background())   client, err := sarama.NewConsumerGroup(brokerList, groupID, config)   if err != nil {      log.Printf("kafka consumer err %v", err)      return   }   wg := &sync.WaitGroup{}   wg.Add(1)   go func() {      defer wg.Done()      for {         // server-side rebalance happens, the consumer session will need to be         if err := client.Consume(ctx, topics, &consumer); err != nil {            log.Printf("kafka consumer: %v", err)         }         // check if context was cancelled, signaling that the consumer should stop         if ctx.Err() != nil {            return         }         consumer.ready = make(chan bool)      }   }()   sigterm := make(chan os.Signal, 1)   signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)   select {   case <-ctx.Done():      log.Printf("kafka consume gift terminating: context cancelled")   case <-sigterm:      log.Printf("kafka consume gift terminating: via signal")   }   cancel()   wg.Wait()   if err = client.Close(); err != nil {      log.Printf("kafka consume gift Error closing client: %v", err)   }}

消费者初始化

// kafka consumergo kafka_consumer.ConsumeInit([]string{"topicA", "topicB", "group-name")

参考

《深刻了解Kafka:外围设计与实际原理》作者:朱忠华

https://github.com/Shopify/sa...

http://kafka.apache.org/docum...

https://crossoverjie.top/2018...