共计 6380 个字符,预计需要花费 16 分钟才能阅读完成。
原文地址:https://github.com/WilburXu/b…
Kafka 体系架构
Broker 服务代理节点
服务代理节点。对于 Kafka 而言,Broker 能够简略地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数状况下也能够将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例,一个或多个 Broker 组成了一个 Kafka 集群。
Producer 和 Consumer
Producer 生产者
生产者,也就是发送音讯的一方。生产者负责创立音讯,而后将其投递到 Kafka 中。
一个失常的生产逻辑须要具备以下几个步骤:
- 创立生产者实例
- 构建待发送的音讯
- 发送音讯到指定的
Topic
、Partition
、Key
- 敞开生产者实例
Consumer 消费者
消费者,也就是接管音讯的一方。消费者连贯到 Kafka 上并接管音讯,从而进行相应的业务逻辑解决。
生产个别有三种生产模式:
单线程模式
单个线程生产多个Partition
问题:
- 效率低,并发上不去
- 可用性差,单个线程挂了,将无奈生产
多线程模式
独立消费者模式
和单线程模式相似,区别就是为每一个 Partition
独自起一个线程进行生产。
问题:
- 线程和并发减少了,然而单线程挂了,该线程的分区还是无奈生产。
生产组模式
也是目前最罕用的生产模式,咱们能够创立多个生产实例并设置同一个 group-id
来辨别生产组,同一个生产组能够指定一个或多个 Topic
进行生产:
- 生产组自均衡(Rebalance),kafka 会依据生产组实例数量和分区数量自均衡调配
- 不会反复生产,同个组内 kafka 确保一个分区只会发往一个生产实例,防止反复生产
- 高可用,当一个生产实例挂了,kafka 会主动调整生产实例和分区的关系
Topic 主题
Kafka 中的音讯以主题为单位进行归类(逻辑概念,生产者负责将音讯发送到特定的主题(发送到 Kafka 集群中的每一条音讯都要指定一个主题),而消费者负责订阅主题并进行生产。
Partition 分区
物理分区,主题细分为了 1 或多个分区,一个分区只能属于单个主题,个别也会把分区称为主题分区(Topic-Partition)。
Segment
理论存储数据的中央,Segment
蕴含一个数据文件和一个索引文件。一个 Partition
有多个大小雷同的 Segment
,能够了解为Partition
是在 Segment
之上进行的逻辑形象。
Kafka 根本命令
zookeeper
broker 节点保留在 zookeeper,所有须要:
- 进入 zookeeper,而后
./bin/zkCli.sh
- 执行
ls /brokers/ids
查看 broker 详情
kafka-log-dirs.sh --describe --bootstrap-server kafka:9092 --broker-list 1
topic
查看列表
kafka-topics.sh --list --zookeeper zookeeper:2181
创立
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic [topic_name]
查看详情
kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic [topic_name]
删除
kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic [topic_name]
topic 生产状况
topic offset 最小
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -2
topic offset 最大
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -1
生产
增加数据
kafka-console-producer.sh --broker-list localhost:9092 --topic [topic_name]
生产
从头部开始生产
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning
从尾部开始生产,必须要指定分区
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0
从某个地位开始生产(–offset [n])
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset 100 --partition 0
生产指定个数(–max-messages [n])
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0 --max-messages 2
生产组
查看生产组列表
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
查看生产组状况
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group [group_id]
offset 偏移设置为最早
kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-earliest --all-topics --execute
offset 偏移设置为新
kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-latest --all-topics --execute
offset 偏移设置为指定地位
kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-offset 2000 --all-topics --execute
offset 偏移设置某个工夫之后最早位移
kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-datetime 2020-12-28T00:00:00.000 --all-topics --execute
Go 案例
基于 https://github.com/Shopify/sarama
的生产和生产案例
生产者
InitKafka.go
package kafka
var (kafkaClient *Client)
func InitKafka() {
var err error
var config = Config{Host: []string{"kafka:9092"},
}
kafkaClient, err = NewClient(config)
if err != nil {panic(err)
}
}
func GetClient() *Client {return kafkaClient}
Producer.go
package kafka
import (
"errors"
"github.com/Shopify/sarama"
)
type Client struct {
sarama.AsyncProducer
msgPool chan *sarama.ProducerMessage
}
type Config struct {Host []string `json:"host"`
ReturnSuccess bool `json:"return_success"`
ReturnErrors bool `json:"return_errors"`
}
func NewClient(cfg Config) (*Client, error) {
// create client
var err error
c := &Client{msgPool: make(chan *sarama.ProducerMessage, 2000),
}
config := sarama.NewConfig()
config.Producer.Return.Errors = cfg.ReturnErrors
config.Producer.Return.Successes = cfg.ReturnSuccess
config.Version = sarama.V2_0_0_0
c.AsyncProducer, err = sarama.NewAsyncProducer(cfg.Host, config)
if err != nil {return nil, err}
return c, nil
}
// run
func (c *Client) Run() {
for {
select {
case msg := <-c.msgPool:
c.Input() <- msg
logger.Info("%+v", msg)
}
}
}
// send msg
func (c *Client) Send(topic string, msg []byte) error {
if topic == "" {return errors.New("kafka producer send msg topic empty")
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(msg),
}
c.msgPool <- kafkaMsg
return nil
}
生产者初始化:
// kafka init
kafka.InitKafka()
go kafka.GetClient().Run()
消费者
consumer.go
package kafka_consumer
import (
"context"
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"syscall"
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {ready chan bool}
func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {//panic("implement me")
return nil
}
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {//panic("implement me")
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {logger.Info("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
c.Handler(message.Topic, message.Value)
}
return nil
}
func (c *Consumer) Handler(topic string, msg []byte) {
switch topic {
case conscom.KafkaTopicGiftOrder:
GiftOrder(topic, msg)
case conscom.KafkaTopicFollow:
UserFollow(topic, msg)
}
}
func ConsumeInit(topics []string, groupID string) {
consumer := Consumer{ready: make(chan bool),
}
brokerList := []string{"kafka:9092"}
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokerList, groupID, config)
if err != nil {log.Printf("kafka consumer err %v", err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {defer wg.Done()
for {
// server-side rebalance happens, the consumer session will need to be
if err := client.Consume(ctx, topics, &consumer); err != nil {log.Printf("kafka consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {return}
consumer.ready = make(chan bool)
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {case <-ctx.Done():
log.Printf("kafka consume gift terminating: context cancelled")
case <-sigterm:
log.Printf("kafka consume gift terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {log.Printf("kafka consume gift Error closing client: %v", err)
}
}
消费者初始化:
// kafka consumer
go kafka_consumer.ConsumeInit([]string{"topicA", "topicB", "group-name")
参考
《深刻了解 Kafka: 外围设计与实际原理》作者: 朱忠华
https://github.com/Shopify/sa…
http://kafka.apache.org/docum…
https://crossoverjie.top/2018…