关于golang:Golang正确使用kafka的姿势细节决定成败

16次阅读

共计 9635 个字符,预计需要花费 25 分钟才能阅读完成。

本文转自 跟我学 IM 后盾开发作者 杰克. 许 经 OpenIM 技术人员整顿订正后公布。
写在后面

Open-IM 是由前微信技术专家打造的 开源 的即时通讯组件。Open-IM 包含 IM 服务端和客户端 SDK,实现了高性能、轻量级、易扩大等重要个性。开发者通过集成 Open-IM 组件,并私有化部署服务端,能够将即时通讯、实时网络能力疾速集成到本身利用中,并确保业务数据的安全性和私密性。

Kafka 在 OpenIM 我的项目中承当重要的角色,感激作者在应用 OpenIM 中发现的 bug(应用 Kafka 不当的 bug)

理解更多原创文章:

【OpenIM 原创】开源 OpenIM:轻量、高效、实时、牢靠、低成本的音讯模型

【OpenIM 原创】C/C++ 调用 golang 函数,golang 回调 C /C++ 函数

【OpenIM 原创】简略轻松入门 一文解说 WebRTC 实现 1 对 1 音视频通信原理

【OpenIM 扩大】OpenIM 服务发现和负载平衡 golang 插件:gRPC 接入 etcdv3

【开源 OpenIM】高性能、可伸缩、易扩大的即时通讯架构

如果您有趣味能够在文章结尾理解到更多对于咱们的信息,期待着与您的交换单干。

01 背景

在一些业务零碎中,模块之间通过引入 Kafka 解耦,拿 IM 举例(图起源):

用户 A 给 B 发送音讯,msg_gateway 收到音讯后,投递音讯到 Kafka 后就给 A 返回发送胜利。这个时候,其实还没有长久化到 mysql 中,尽管最终会放弃一致性。所以,试想如果 Kafka 丢音讯了,是不是就出大问题了?A 认为给 B 发送音讯胜利了,然而在服务器内部消息失落了 B 并没有收到。

所以,在应用 Kafka 的时候,有一些业务对音讯失落问题十分的关注。

同样,常见的问题还有:

  • 反复生产的问题。
  • 乱序的问题。

上面咱们来一起看一下如何应用 sarama 包来解决这些问题。

02 Kafka 音讯失落问题形容

以下内容起源:

kafka 什么时候会丢音讯:https://blog.csdn.net/qrne06/…

下面咱们放心的点须要进一步明确一下丢音讯的定义:kafka 集群中的局部或全副 broker 挂了,导致 consumer 没有及时收到音讯,这不属于丢音讯。broker 挂了,只有音讯全副长久化到了硬盘上,重启 broker 集群之后,使消费者持续拉取音讯,音讯就没有失落,依然全量生产了。所以我的了解,所谓丢音讯,意味着:开发人员未感知到哪些音讯没有被生产。

作者把音讯的失落演绎了以下几种状况:

1)producer 把音讯发送给 broker,因为网络抖动,音讯没有达到 broker,且开发人员无感知。

解决方案:producer 设置 acks 参数,音讯同步到 master 之后返回 ack 信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这种形式肯定水平保障了音讯的可靠性,producer 期待 broker 确认信号的时延也不高。

2)producer 把音讯发送给 broker-master,master 接管到音讯,在未将音讯同步给 follower 之前,挂掉了,且开发人员无感知。

解决方案:producer 设置 acks 参数,音讯同步到 master 且同步到所有 follower 之后返回 ack 信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保障了音讯的可靠性,毛病是 producer 期待 broker 确认信号的时延比拟高。

3)producer 把音讯发送给 broker-master,master 接管到音讯,master 未胜利将音讯同步给每个 follower,有音讯失落危险。

解决方案:同上。

4)某个 broker 音讯尚未从内存缓冲区长久化到磁盘,就挂掉了,这种状况无奈通过 ack 机制感知。

解决方案:设置参数,放慢音讯长久化的频率,能在肯定水平上缩小这种状况产生的概率。但进步频率天然也会影响性能。

5)consumer 胜利拉取到了音讯,consumer 挂了。

解决方案:设置手动 sync,生产胜利才提交

综上所述,集群 / 我的项目运行失常的状况下,kafka 不会丢音讯。一旦集群呈现问题,音讯的可靠性无奈齐全保障。要想尽可能保障音讯牢靠,根本只能在发现音讯有可能没有被生产时,重发消息来解决。所以在业务逻辑中,要思考音讯的反复生产问题,对于关键环节,要有幂等机制。

作者的几条倡议:

1)如果一个业务很要害,应用 kafka 的时候要思考丢音讯的老本和解决方案。

2)producer 端确认音讯是否达到集群,若有异样,进行重发。

3)consumer 端保障生产幂等性。

4)运维保障集群运行失常且高可用,保障网络状况良好。

03 生产端丢音讯问题解决

下面说了,只须要把 producer 设置 acks 参数,期待 Kafka 所有 follower 都胜利后再返回。咱们只须要进行如下设置:

  • \1. config := sarama.NewConfig() 2. config.Producer.RequiredAcks = sarama.WaitForAll // -1

ack 参数有如下取值:

1. const (
2. // NoResponse doesn't send any response, the TCP ACK is all you get. 3.   NoResponse RequiredAcks = 0
4. // WaitForLocal waits for only the local commit to succeed before         responding.    
5. WaitForLocal RequiredAcks = 1   
6. // WaitForAll waits for all in-sync replicas to commit before          responding.    
7. // The minimum number of in-sync replicas is configured on the             broker    via   
8. // the `min.insync.replicas` configuration key.    
9. WaitForAll RequiredAcks = -1
10.  )

04 生产端丢音讯问题

通常生产端丢音讯都是因为 Offset 主动提交了,然而数据并没有插入到 mysql(比方呈现 BUG 或者过程 Crash),导致下一次消费者重启后,音讯漏掉了,天然数据库中也查不到。这个时候,咱们能够通过手动提交解决,甚至在一些简单场景下,还要应用二阶段提交。

主动提交模式下的丢音讯问题

默认状况下,sarama 是主动提交的形式,距离为 1 秒钟

1.  // NewConfig returns a new configuration instance with sane                defaults.
2. func NewConfig() *Config {  
3. // …  
4. c.Consumer.Offsets.AutoCommit.Enable = true. // 主动提交 
5. c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 距离 
6. c.Consumer.Offsets.Initial = OffsetNewest 
7. c.Consumer.Offsets.Retry.Max = 3 
8.  // ...
9.  }

这里的主动提交,是基于被标记过的音讯(sess.MarkMessage(msg,“”))

1. type exampleConsumerGroupHandler struct{}
2. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession)        error   {return nil}
3. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession)      error {return nil}
4. func (h exampleConsumerGroupHandler) ConsumeClaim(sess                  ConsumerGroupSession, claim ConsumerGroupClaim) error {5. for msg := range claim.Messages() {6. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic,      msg.Partition, msg.Offset)      
7. // 标记音讯已解决,sarama 会主动提交     
8. sess.MarkMessage(msg, "") 
9. }   
10. return nil
11. }

如果不调用 sess.MarkMessage(msg,“”),即便启用了主动提交也没有成果,下次启动消费者会从上一次的 Offset 从新生产,咱们无妨正文掉 sess.MarkMessage(msg,“”),而后关上 Offset Explorer 查看:

那么这样,咱们就大略了解了 sarama 主动提交的原理:先标记再提交。咱们只须要放弃标记逻辑在插入 mysql 代码之后 即可确保不会呈现丢音讯的问题:

正确的调用程序:

1. func (h msgConsumerGroup) ConsumeClaim(sesssarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2. for msg := range claim.Messages() {
3. // 插入 mysql
4. insertToMysql(msg)      
5. // 正确:插入 mysql 胜利后程序解体,下一次顶多反复生产一次,而不是因为 Offset 超         前,导致应用层音讯失落了     
6.  sess.MarkMessage(msg,“") 
7.  }  
8.  return nil
9.  }

谬误的程序:

1. func (h msgConsumerGroup) ConsumeClaim(sess                           sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2. for msg := range claim.Messages() {     
3. // 谬误 1:不能先标记,再插入 mysql,可能标记的时候刚好主动提交 Offset,但 mysql 插入失败了,导致下一次这个音讯不会被生产,造成失落      
4. // 谬误 2:罗唆遗记调用 sess.MarkMessage(msg,“"),导致反复生产   
5. sess.MarkMessage(msg,“")      
6. // 插入 mysql      
7. insertToMysql(msg)  
8.  }  
9.  return nil
10. }

sarama 手动提交模式

当然,另外也能够通过手动提交来解决丢音讯的问题,然而集体不举荐,因为主动提交模式下曾经能解决丢音讯问题。

1. consumerConfig := sarama.NewConfig()
2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig.
3. Consumer.Return.Errors = falseconsumerConfig.
4. Consumer.Offsets.AutoCommit.Enable = false  // 禁用主动提交,改为手动
5. consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {7. for msg := range claim.Messages() {8. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))      9. // 插入 mysql     
10. insertToMysql(msg)      
11. // 手动提交模式下,也须要先进行标记     
12. sess.MarkMessage(msg, "")      
13. consumerCount++      
14. if consumerCount%3 == 0 {         
15. // 手动提交,不能频繁调用,耗时 9ms 左右,macOS i7 16GB         
16. t1 := time.Now().Nanosecond()         
17. sess.Commit()         
18. t2 := time.Now().Nanosecond()         
19.fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")      
20. }   
21. }   
22. return nil
23. }

05 Kafka 音讯程序问题

投递 Kafka 之前,咱们通过一次 gRPC 调用解决了音讯序号的生成问题,然而这里其实还波及一个音讯程序问题:订阅 Kafka 的消费者如何依照音讯程序写入 mysql,而不是随机写入呢?

咱们晓得,Kafka 的音讯在一个 partition 中是有序的,所以只有确保发给某个人的音讯都在同一个 partition 中即可。

1. 全局一个 partition

这个最简略,然而在 kafka 中一个 partition 对应一个线程,所以这种模型下 Kafka 的吞吐是个问题。

2. 多个 partition,手动指定

1. msg := &sarama.ProducerMessage{   
2. Topic:“msgc2s",   
3. Value: sarama.StringEncoder(“hello”),   
4. Partition: toUserId % 10,
5. }
6. partition, offset, err := producer.SendMessage(msg)

生产音讯的时候,除了 Topic 和 Value,咱们能够通过手动指定 partition,比方总共有 10 个分区,咱们依据用户 ID 取余,这样发给同一个用户的音讯,每次都到 1 个 partition 外面去了,消费者写入 mysql 中的时候,天然也是有序的。

然而,因为分区总数是写死的,万一 Kafka 的分区数要调整呢?那不得从新编译代码?所以这个形式不够柔美。

3. 多个 partition,主动计算

kafka 客户端为咱们提供了这种反对。首先,在初始化的时候,设置抉择分区的策略为 Hash:

p.config.Producer.Partitioner = sarama.NewHashPartitioner

而后,在生成音讯之前,设置音讯的 Key 值:

1. msg := &sarama.ProducerMessage{   
2. Topic: "testAutoSyncOffset",   
3. Value: sarama.StringEncoder("hello"),   
4. Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
5. }

Kafka 客户端会依据 Key 进行 Hash,咱们通过把接管用户 ID 作为 Key,这样就能让所有发给某个人的音讯落到同一个分区了,也就有序了。

4. 扩大常识:多线程状况下一个 partition 的乱序解决

咱们下面说了,Kafka 客户端针对一个 partition 开一个线程进行生产,如果解决比拟耗时的话,比方解决一条音讯耗时几十 ms,那么 1 秒钟就只能解决几十条音讯,这吞吐量太低了。这个时候,咱们可能就把逻辑挪动到其余线程外面去解决,这样的话,程序就可能会乱。

咱们能够通过写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue 即可,这样就能保障程序性。PS:就像 4 % 10 = 4,14 % 10 = 4,他们取余都是等于 4,所以落到了一个 partition,然而 key 值不一样啊,咱们能够本人再取余,放到不同的 queue 外面。

06 反复生产和音讯幂等

这篇文章中:

kafka 什么时候会丢音讯:https://blog.csdn.net/qrne06/…

具体了形容了各种丢音讯的状况,咱们通过设置 RequiredAcks = sarama.WaitForAll(-1),能够解决生产端丢音讯的问题。第六节中也对生产端丢音讯进行了阐明,只须要确保在插入数据库之后,调用 sess.MarkMessage(msg, “”) 即可。

如果呈现了插入 Mysql 胜利,然而因为主动提交有 1 秒的距离,如果此时解体,下次启动消费者势必会对这 1 秒的数据进行反复生产,咱们在应用层须要解决这个问题。

常见的有 2 种思路:

  1. 如果是存在 redis 中不须要长久化的数据,比方 string 类型,set 具备人造的幂等性,无需解决。
  2. 插入 mysql 之前,进行一次 query 操作,针对每个客户端发的音讯,咱们为它生成一个惟一的 ID(比方 GUID),或者间接把音讯的 ID 设置为惟一索引。

第 2 个计划的难点在于,全局惟一 ID 的生成,实践上 GUID 也是存在反复的可能性的,如果是客户端生成,那么插入失败,怎么让客户端感知呢?

所以,这里我认为还是须要自定义 ID 生产,比方通过组合法:用户 ID + 以后工夫 + 32 位 GUID,是不是简直不会反复了呢(试想,1 集体发 1 亿条文本须要多少年。。。)?

07 残缺代码实例

consumer.go

1. type msgConsumerGroup struct{}
2. 
3. func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error   {return nil}
4. func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error {return nil}
5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {6. for msg := range claim.Messages() {7. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
8. 
9. // 查 mysql 去重      
10. if check(msg) {          
11. // 插入 mysql          
12. insertToMysql()      
13. }
14.
15. // 标记,sarama 会主动进行提交,默认距离 1 秒      
16. sess.MarkMessage(msg, "")  
17. }   
18. return nil
19. }
20.
21. func main(){22. consumerConfig := sarama.NewConfig()    
23. consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version    
24. consumerConfig.Consumer.Return.Errors = false    
25. //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true      
26. // 禁用主动提交,改为手动  //
27. consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 测试 3 秒主动提交    consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
28.
29. cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092",    "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig)  30. if err != nil {31. panic(err)   
32. }
33. 
34. for {35. err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)       
36. if err != nil {37. fmt.Println(err.Error())         
38. break     
39. }   
40. }
41. 
42.  _ = cGroup.Close()
43. }

producer.go

1. func main(){2. config := sarama.NewConfig()    
3. config.Producer.RequiredAcks = sarama.WaitForAll // 期待所有 follower 都回复 ack,确保 Kafka 不会丢音讯    
4. config.Producer.Return.Successes = true    
5. config.Producer.Partitioner = sarama.NewHashPartitioner
6.
7.  // 对 Key 进行 Hash,同样的 Key 每次都落到一个分区,这样音讯是有序的
    // 应用同步 producer,异步模式下有更高的性能,然而解决更简单,这里倡议先从简略的动手    
8. producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)    
9. defer func() {10. _ = producer.Close()    
11. }()    
12. if err != nil {13. panic(err.Error())   
14. }
15.
16. msgCount := 4   
17. // 模仿 4 个音讯    
18. for i := 0; i < msgCount; i++ {19. rand.Seed(int64(time.Now().Nanosecond()))        
20. msg := &sarama.ProducerMessage{          
21. Topic: "testAutoSyncOffset",          
22. Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),   
23. Key:   sarama.StringEncoder("BBB”),        
24. }
25.
26.  t1 := time.Now().Nanosecond()        
27. partition, offset, err := producer.SendMessage(msg)        
28. t2 := time.Now().Nanosecond()
29.
30. if err == nil {31. fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), "ms")        
32. } else {33. fmt.Println(err.Error())      
34.      }   
35.   }
36.}

完结

OpenIM github 开源地址:

https://github.com/OpenIMSDK/…

OpenIM 官网:https://www.rentsoft.cn

OpenIM 官方论坛:https://forum.rentsoft.cn/

咱们致力于通过开源模式,为寰球企业 / 开发者提供简略、易用、高效的 IM 服务和实时音视频通信能力,帮忙开发者升高我的项目的开发成本,并让开发者掌控业务的外围数据。

IM 作为外围业务数据,平安的重要性毋庸置疑,OpenIM 开源以及私有化部署让企业能更放心使用。

现在 IM 云服务商免费高企,如何让企业低成本、平安、牢靠接入 IM 服务,是 OpenIM 的历史使命,也是咱们后退的方向。

如您有技术下面的浅见请到咱们的论坛分割沟通,用户也可与咱们的技术人员谈讨应用方面的难题以及见解

正文完
 0