共计 9635 个字符,预计需要花费 25 分钟才能阅读完成。
本文转自 跟我学 IM 后盾开发作者 杰克. 许 经 OpenIM 技术人员整顿订正后公布。
写在后面
Open-IM 是由前微信技术专家打造的 开源 的即时通讯组件。Open-IM 包含 IM 服务端和客户端 SDK,实现了高性能、轻量级、易扩大等重要个性。开发者通过集成 Open-IM 组件,并私有化部署服务端,能够将即时通讯、实时网络能力疾速集成到本身利用中,并确保业务数据的安全性和私密性。
Kafka 在 OpenIM 我的项目中承当重要的角色,感激作者在应用 OpenIM 中发现的 bug(应用 Kafka 不当的 bug)
理解更多原创文章:
【OpenIM 原创】开源 OpenIM:轻量、高效、实时、牢靠、低成本的音讯模型
【OpenIM 原创】C/C++ 调用 golang 函数,golang 回调 C /C++ 函数
【OpenIM 原创】简略轻松入门 一文解说 WebRTC 实现 1 对 1 音视频通信原理
【OpenIM 扩大】OpenIM 服务发现和负载平衡 golang 插件:gRPC 接入 etcdv3
【开源 OpenIM】高性能、可伸缩、易扩大的即时通讯架构
如果您有趣味能够在文章结尾理解到更多对于咱们的信息,期待着与您的交换单干。
01 背景
在一些业务零碎中,模块之间通过引入 Kafka 解耦,拿 IM 举例(图起源):
用户 A 给 B 发送音讯,msg_gateway 收到音讯后,投递音讯到 Kafka 后就给 A 返回发送胜利。这个时候,其实还没有长久化到 mysql 中,尽管最终会放弃一致性。所以,试想如果 Kafka 丢音讯了,是不是就出大问题了?A 认为给 B 发送音讯胜利了,然而在服务器内部消息失落了 B 并没有收到。
所以,在应用 Kafka 的时候,有一些业务对音讯失落问题十分的关注。
同样,常见的问题还有:
- 反复生产的问题。
- 乱序的问题。
上面咱们来一起看一下如何应用 sarama 包来解决这些问题。
02 Kafka 音讯失落问题形容
以下内容起源:
kafka 什么时候会丢音讯:https://blog.csdn.net/qrne06/…
下面咱们放心的点须要进一步明确一下丢音讯的定义:kafka 集群中的局部或全副 broker 挂了,导致 consumer 没有及时收到音讯,这不属于丢音讯。broker 挂了,只有音讯全副长久化到了硬盘上,重启 broker 集群之后,使消费者持续拉取音讯,音讯就没有失落,依然全量生产了。所以我的了解,所谓丢音讯,意味着:开发人员未感知到哪些音讯没有被生产。
作者把音讯的失落演绎了以下几种状况:
1)producer 把音讯发送给 broker,因为网络抖动,音讯没有达到 broker,且开发人员无感知。
解决方案:producer 设置 acks 参数,音讯同步到 master 之后返回 ack 信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这种形式肯定水平保障了音讯的可靠性,producer 期待 broker 确认信号的时延也不高。
2)producer 把音讯发送给 broker-master,master 接管到音讯,在未将音讯同步给 follower 之前,挂掉了,且开发人员无感知。
解决方案:producer 设置 acks 参数,音讯同步到 master 且同步到所有 follower 之后返回 ack 信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保障了音讯的可靠性,毛病是 producer 期待 broker 确认信号的时延比拟高。
3)producer 把音讯发送给 broker-master,master 接管到音讯,master 未胜利将音讯同步给每个 follower,有音讯失落危险。
解决方案:同上。
4)某个 broker 音讯尚未从内存缓冲区长久化到磁盘,就挂掉了,这种状况无奈通过 ack 机制感知。
解决方案:设置参数,放慢音讯长久化的频率,能在肯定水平上缩小这种状况产生的概率。但进步频率天然也会影响性能。
5)consumer 胜利拉取到了音讯,consumer 挂了。
解决方案:设置手动 sync,生产胜利才提交。
综上所述,集群 / 我的项目运行失常的状况下,kafka 不会丢音讯。一旦集群呈现问题,音讯的可靠性无奈齐全保障。要想尽可能保障音讯牢靠,根本只能在发现音讯有可能没有被生产时,重发消息来解决。所以在业务逻辑中,要思考音讯的反复生产问题,对于关键环节,要有幂等机制。
作者的几条倡议:
1)如果一个业务很要害,应用 kafka 的时候要思考丢音讯的老本和解决方案。
2)producer 端确认音讯是否达到集群,若有异样,进行重发。
3)consumer 端保障生产幂等性。
4)运维保障集群运行失常且高可用,保障网络状况良好。
03 生产端丢音讯问题解决
下面说了,只须要把 producer 设置 acks 参数,期待 Kafka 所有 follower 都胜利后再返回。咱们只须要进行如下设置:
- \1. config := sarama.NewConfig() 2. config.Producer.RequiredAcks = sarama.WaitForAll // -1
ack 参数有如下取值:
1. const (
2. // NoResponse doesn't send any response, the TCP ACK is all you get. 3. NoResponse RequiredAcks = 0
4. // WaitForLocal waits for only the local commit to succeed before responding.
5. WaitForLocal RequiredAcks = 1
6. // WaitForAll waits for all in-sync replicas to commit before responding.
7. // The minimum number of in-sync replicas is configured on the broker via
8. // the `min.insync.replicas` configuration key.
9. WaitForAll RequiredAcks = -1
10. )
04 生产端丢音讯问题
通常生产端丢音讯都是因为 Offset 主动提交了,然而数据并没有插入到 mysql(比方呈现 BUG 或者过程 Crash),导致下一次消费者重启后,音讯漏掉了,天然数据库中也查不到。这个时候,咱们能够通过手动提交解决,甚至在一些简单场景下,还要应用二阶段提交。
主动提交模式下的丢音讯问题
默认状况下,sarama 是主动提交的形式,距离为 1 秒钟
1. // NewConfig returns a new configuration instance with sane defaults.
2. func NewConfig() *Config {
3. // …
4. c.Consumer.Offsets.AutoCommit.Enable = true. // 主动提交
5. c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 距离
6. c.Consumer.Offsets.Initial = OffsetNewest
7. c.Consumer.Offsets.Retry.Max = 3
8. // ...
9. }
这里的主动提交,是基于被标记过的音讯(sess.MarkMessage(msg,“”))
1. type exampleConsumerGroupHandler struct{}
2. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error {return nil}
3. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error {return nil}
4. func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {5. for msg := range claim.Messages() {6. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
7. // 标记音讯已解决,sarama 会主动提交
8. sess.MarkMessage(msg, "")
9. }
10. return nil
11. }
如果不调用 sess.MarkMessage(msg,“”),即便启用了主动提交也没有成果,下次启动消费者会从上一次的 Offset 从新生产,咱们无妨正文掉 sess.MarkMessage(msg,“”),而后关上 Offset Explorer 查看:
那么这样,咱们就大略了解了 sarama 主动提交的原理:先标记再提交。咱们只须要放弃标记逻辑在插入 mysql 代码之后 即可确保不会呈现丢音讯的问题:
正确的调用程序:
1. func (h msgConsumerGroup) ConsumeClaim(sesssarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2. for msg := range claim.Messages() {
3. // 插入 mysql
4. insertToMysql(msg)
5. // 正确:插入 mysql 胜利后程序解体,下一次顶多反复生产一次,而不是因为 Offset 超 前,导致应用层音讯失落了
6. sess.MarkMessage(msg,“")
7. }
8. return nil
9. }
谬误的程序:
1. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2. for msg := range claim.Messages() {
3. // 谬误 1:不能先标记,再插入 mysql,可能标记的时候刚好主动提交 Offset,但 mysql 插入失败了,导致下一次这个音讯不会被生产,造成失落
4. // 谬误 2:罗唆遗记调用 sess.MarkMessage(msg,“"),导致反复生产
5. sess.MarkMessage(msg,“")
6. // 插入 mysql
7. insertToMysql(msg)
8. }
9. return nil
10. }
sarama 手动提交模式
当然,另外也能够通过手动提交来解决丢音讯的问题,然而集体不举荐,因为主动提交模式下曾经能解决丢音讯问题。
1. consumerConfig := sarama.NewConfig()
2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig.
3. Consumer.Return.Errors = falseconsumerConfig.
4. Consumer.Offsets.AutoCommit.Enable = false // 禁用主动提交,改为手动
5. consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {7. for msg := range claim.Messages() {8. fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) 9. // 插入 mysql
10. insertToMysql(msg)
11. // 手动提交模式下,也须要先进行标记
12. sess.MarkMessage(msg, "")
13. consumerCount++
14. if consumerCount%3 == 0 {
15. // 手动提交,不能频繁调用,耗时 9ms 左右,macOS i7 16GB
16. t1 := time.Now().Nanosecond()
17. sess.Commit()
18. t2 := time.Now().Nanosecond()
19.fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")
20. }
21. }
22. return nil
23. }
05 Kafka 音讯程序问题
投递 Kafka 之前,咱们通过一次 gRPC 调用解决了音讯序号的生成问题,然而这里其实还波及一个音讯程序问题:订阅 Kafka 的消费者如何依照音讯程序写入 mysql,而不是随机写入呢?
咱们晓得,Kafka 的音讯在一个 partition 中是有序的,所以只有确保发给某个人的音讯都在同一个 partition 中即可。
1. 全局一个 partition
这个最简略,然而在 kafka 中一个 partition 对应一个线程,所以这种模型下 Kafka 的吞吐是个问题。
2. 多个 partition,手动指定
1. msg := &sarama.ProducerMessage{
2. Topic:“msgc2s",
3. Value: sarama.StringEncoder(“hello”),
4. Partition: toUserId % 10,
5. }
6. partition, offset, err := producer.SendMessage(msg)
生产音讯的时候,除了 Topic 和 Value,咱们能够通过手动指定 partition,比方总共有 10 个分区,咱们依据用户 ID 取余,这样发给同一个用户的音讯,每次都到 1 个 partition 外面去了,消费者写入 mysql 中的时候,天然也是有序的。
然而,因为分区总数是写死的,万一 Kafka 的分区数要调整呢?那不得从新编译代码?所以这个形式不够柔美。
3. 多个 partition,主动计算
kafka 客户端为咱们提供了这种反对。首先,在初始化的时候,设置抉择分区的策略为 Hash:
p.config.Producer.Partitioner = sarama.NewHashPartitioner
而后,在生成音讯之前,设置音讯的 Key 值:
1. msg := &sarama.ProducerMessage{
2. Topic: "testAutoSyncOffset",
3. Value: sarama.StringEncoder("hello"),
4. Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
5. }
Kafka 客户端会依据 Key 进行 Hash,咱们通过把接管用户 ID 作为 Key,这样就能让所有发给某个人的音讯落到同一个分区了,也就有序了。
4. 扩大常识:多线程状况下一个 partition 的乱序解决
咱们下面说了,Kafka 客户端针对一个 partition 开一个线程进行生产,如果解决比拟耗时的话,比方解决一条音讯耗时几十 ms,那么 1 秒钟就只能解决几十条音讯,这吞吐量太低了。这个时候,咱们可能就把逻辑挪动到其余线程外面去解决,这样的话,程序就可能会乱。
咱们能够通过写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue 即可,这样就能保障程序性。PS:就像 4 % 10 = 4,14 % 10 = 4,他们取余都是等于 4,所以落到了一个 partition,然而 key 值不一样啊,咱们能够本人再取余,放到不同的 queue 外面。
06 反复生产和音讯幂等
这篇文章中:
kafka 什么时候会丢音讯:https://blog.csdn.net/qrne06/…
具体了形容了各种丢音讯的状况,咱们通过设置 RequiredAcks = sarama.WaitForAll(-1),能够解决生产端丢音讯的问题。第六节中也对生产端丢音讯进行了阐明,只须要确保在插入数据库之后,调用 sess.MarkMessage(msg, “”) 即可。
如果呈现了插入 Mysql 胜利,然而因为主动提交有 1 秒的距离,如果此时解体,下次启动消费者势必会对这 1 秒的数据进行反复生产,咱们在应用层须要解决这个问题。
常见的有 2 种思路:
- 如果是存在 redis 中不须要长久化的数据,比方 string 类型,set 具备人造的幂等性,无需解决。
- 插入 mysql 之前,进行一次 query 操作,针对每个客户端发的音讯,咱们为它生成一个惟一的 ID(比方 GUID),或者间接把音讯的 ID 设置为惟一索引。
第 2 个计划的难点在于,全局惟一 ID 的生成,实践上 GUID 也是存在反复的可能性的,如果是客户端生成,那么插入失败,怎么让客户端感知呢?
所以,这里我认为还是须要自定义 ID 生产,比方通过组合法:用户 ID + 以后工夫 + 32 位 GUID,是不是简直不会反复了呢(试想,1 集体发 1 亿条文本须要多少年。。。)?
07 残缺代码实例
consumer.go
1. type msgConsumerGroup struct{}
2.
3. func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error {return nil}
4. func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error {return nil}
5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {6. for msg := range claim.Messages() {7. fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
8.
9. // 查 mysql 去重
10. if check(msg) {
11. // 插入 mysql
12. insertToMysql()
13. }
14.
15. // 标记,sarama 会主动进行提交,默认距离 1 秒
16. sess.MarkMessage(msg, "")
17. }
18. return nil
19. }
20.
21. func main(){22. consumerConfig := sarama.NewConfig()
23. consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version
24. consumerConfig.Consumer.Return.Errors = false
25. //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true
26. // 禁用主动提交,改为手动 //
27. consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 测试 3 秒主动提交 consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
28.
29. cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092", "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig) 30. if err != nil {31. panic(err)
32. }
33.
34. for {35. err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)
36. if err != nil {37. fmt.Println(err.Error())
38. break
39. }
40. }
41.
42. _ = cGroup.Close()
43. }
producer.go
1. func main(){2. config := sarama.NewConfig()
3. config.Producer.RequiredAcks = sarama.WaitForAll // 期待所有 follower 都回复 ack,确保 Kafka 不会丢音讯
4. config.Producer.Return.Successes = true
5. config.Producer.Partitioner = sarama.NewHashPartitioner
6.
7. // 对 Key 进行 Hash,同样的 Key 每次都落到一个分区,这样音讯是有序的
// 应用同步 producer,异步模式下有更高的性能,然而解决更简单,这里倡议先从简略的动手
8. producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)
9. defer func() {10. _ = producer.Close()
11. }()
12. if err != nil {13. panic(err.Error())
14. }
15.
16. msgCount := 4
17. // 模仿 4 个音讯
18. for i := 0; i < msgCount; i++ {19. rand.Seed(int64(time.Now().Nanosecond()))
20. msg := &sarama.ProducerMessage{
21. Topic: "testAutoSyncOffset",
22. Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),
23. Key: sarama.StringEncoder("BBB”),
24. }
25.
26. t1 := time.Now().Nanosecond()
27. partition, offset, err := producer.SendMessage(msg)
28. t2 := time.Now().Nanosecond()
29.
30. if err == nil {31. fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), "ms")
32. } else {33. fmt.Println(err.Error())
34. }
35. }
36.}
完结
OpenIM github 开源地址:
https://github.com/OpenIMSDK/…
OpenIM 官网:https://www.rentsoft.cn
OpenIM 官方论坛:https://forum.rentsoft.cn/
咱们致力于通过开源模式,为寰球企业 / 开发者提供简略、易用、高效的 IM 服务和实时音视频通信能力,帮忙开发者升高我的项目的开发成本,并让开发者掌控业务的外围数据。
IM 作为外围业务数据,平安的重要性毋庸置疑,OpenIM 开源以及私有化部署让企业能更放心使用。
现在 IM 云服务商免费高企,如何让企业低成本、平安、牢靠接入 IM 服务,是 OpenIM 的历史使命,也是咱们后退的方向。
如您有技术下面的浅见请到咱们的论坛分割沟通,用户也可与咱们的技术人员谈讨应用方面的难题以及见解