关于面试:面试官Kafka-为什么会丢消息

  • 1、如何晓得有音讯失落?
  • 2、哪些环节可能丢音讯?
  • 3、如何确保音讯不失落?
    • *

引入 MQ 消息中间件最间接的目标:零碎解耦以及流量管制(削峰填谷)

  • 零碎解耦: 上下游零碎之间的通信相互依赖,利用 MQ 音讯队列能够隔离上下游环境变动带来的不稳固因素。
  • 流量管制: 超高并发场景中,引入 MQ 能够实现流量 “削峰填谷” 的作用以及服务异步解决,不至于打崩服务。

引入 MQ 同样带来其余问题:数据一致性。

在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。音讯生产端发送音讯到 MQ 再到音讯生产端须要保障音讯不失落。

所以在应用 MQ 音讯队列时,须要思考这 3 个问题:

  • 如何晓得有音讯失落?
  • 哪些环节可能丢音讯?
  • 如何确保音讯不失落?图片

1、如何晓得有音讯失落?

如何感知音讯是否失落了?可总结如下:

  1. 别人反馈: 经营、PM 反馈音讯失落。
  2. 监控报警: 监控指定指标,即时报警人工调整。Kafka 集群异样、Broker 宕机、Broker 磁盘挂载问题、消费者异样导致音讯积压等都会给用户间接感觉是音讯失落了。

案例:舆情剖析中数据采集同步

  • PM 可本人下发采集调度指令,去采集特定数据。
  • PM 可通过 ES 近实时查问对应数据,若没相应数据可再次下发指令。

当感知音讯失落了,那就须要一种机制来查看音讯是否失落。

检索音讯

运维工具有:

  1. 查看 Kafka 生产地位:
> 基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后盾管理系统 + 用户小程序,反对 RBAC 动静权限、多租户、数据权限、工作流、三方登录、领取、短信、商城等性能
>
> * 我的项目地址:<https://gitee.com/zhijiantianya/ruoyi-vue-pro>
> * 视频教程:<https://doc.iocoder.cn/video/>

# 查看某个topic的message数量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic

> 基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后盾管理系统 + 用户小程序,反对 RBAC 动静权限、多租户、数据权限、工作流、三方登录、领取、短信、商城等性能
>
> * 我的项目地址:<https://gitee.com/zhijiantianya/yudao-cloud>
> * 视频教程:<https://doc.iocoder.cn/video/>

# 查看consumer Group列表
$ ./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092

# 查看 offset 生产状况
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP                 TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                           HOST            CLIENT-ID
console-consumer-1152 test_topic      0          -               4               -               consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1      consumer-console-consumer-1152-1

2.利用工具:Kafka Tools

3.其余可见化界面工具

2、哪些环节可能丢音讯?

一条音讯从生产到生产实现经验 3 个环节:音讯生产者、消息中间件、音讯消费者。

哪个环节都有可能呈现音讯失落问题。

1)生产端

首先要意识到 Kafka 生产端发送音讯流程:

调用 send() 办法时,不会立即把音讯发送进来,而是缓存起来,抉择失当机会把缓存里的音讯划分成一批数据,通过 Sender 线程按批次发送给服务端 Broker。

此环节失落音讯的场景有: 即导致 Producer 音讯没有发送胜利

  1. 网络稳定: 生产者与服务端之间的链路不可达,发送超时。景象是:各端状态失常,但生产端就是没有生产音讯,就像失落音讯一样。
  2. 解决措施: 重试 props.put(“retries”, “10”);
  3. 不失当配置: 发送音讯无 ack 确认; 发送音讯失败无回调,无日志。producer.send(new ProducerRecord<>(topic, messageKey, messageStr),
    new CallBack(){…});
  4. 解决措施: 设置 acks=1 或者 acks=all。发送音讯设置回调。

回顾下重要的参数: acks

  • acks=0:不须要期待服务器的确认. 这是 retries 设置有效. 响应里来自服务端的 offset 总是 -1,producer只管发不论发送胜利与否。提早低,容易失落数据。
  • acks=1:示意 leader 写入胜利(然而并没有刷新到磁盘)后即向 producer 响应。提早中等,一旦 leader 正本挂了,就会失落数据。
  • acks=all:期待数据实现正本的复制, 等同于 -1. 如果须要保障音讯不失落, 须要应用该设置. 同时须要设置 unclean.leader.election.enable 为 true, 保障当 ISR 列表为空时, 抉择其余存活的正本作为新的 leader.

2)服务端

先来理解下 Kafka Broker 写入数据的过程:

  1. Broker 接管到一批数据,会先写入内存 PageCache(OS Cache)中。
  2. 操作系统会隔段时间把 OS Cache 中数据进行刷盘,这个过程会是 「异步批量刷盘」

这里就有个隐患,如果数据写入 PageCache 后 Kafka Broker宕机会怎么?机子宕机/掉电?

  • Kafka Broker 宕机: 音讯不会失落。因为数据曾经写入 PageCache,只期待操作系统刷盘即可。
  • 机子宕机/掉电: 音讯会失落。因为数据仍在内存里,内存RAM 掉电后就会失落数据。

解决方案 :应用带蓄电池后备电源的缓存 cache,避免零碎断电异样。

比照学习 MySQL 的 “双1” 策略,根本不应用这个策略,因为 “双1” 会导致频繁的 I/O 操作,也是最慢的一种。

比照学习 Redis 的 AOF 策略,默认且举荐的策略:Everysec(AOF_FSYNC_EVERYSEC) 每一秒钟保留一次(默认): 。每个写命令执行完, 只是先把日志写到 AOF 文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。

拓展:Kafka 日志刷盘机制

# 举荐采纳默认值,即不配置该配置,交由操作系统自行决定何时落盘,以晋升性能。
# 针对 broker 配置:
log.flush.interval.messages=10000 # 日志落盘音讯条数距离,即每接管到肯定条数音讯,即进行log落盘。
log.flush.interval.ms=1000        # 日志落盘工夫距离,单位ms,即每隔肯定工夫,即进行log落盘。

# 针对 topic 配置:
flush.messages.flush.ms=1000  # topic下每1s刷盘
flush.messages=1              # topic下每个音讯都落盘

# 查看 Linux 后盾线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10      # 示意当脏页占总内存的的百分比超过这个值时,后盾线程开始刷新脏页。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000    # 示意脏数据多久会被刷新到磁盘上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500  # 示意多久唤醒一次刷新脏页的后盾线程(5秒)。
vm.dirtytime_expire_seconds = 43200

Broker 的可靠性须要依赖其多正本机制: 个别正本数 3 个(配置参数:replication.factor=3)

  • Leader Partition 正本:提供对外读写机制。
  • Follower Partition 正本:同步 Leader 数据。

正本之间的数据同步也可能呈现问题:数据失落问题和数据不统一问题。

解决方案:ISR 和 Epoch 机制

  • ISR(In-Sync Replicas) : 当 Le“ader 宕机,能够从 ISR 中抉择一个 Follower 作为 Leader。
  • Epoch 机制: 解决 Leader 正本高水位更新和 Follower 正本高水位更新在工夫上是存在错配问题。Tips: Kafka 0.11.x 版本才引入 leader epoch 机制解决高水位机制弊病。

对应须要的配置参数如下:

  1. acks=-1 或者 acks=all: 必须所有正本均同步到音讯,能力表明音讯发送胜利。
  2. replication.factor >= 3: 正本数至多有 3 个。
  3. min.insync.replicas > 1: 代表音讯至多写入 2个正本才算发送胜利。前提须要 acks=-1。举个栗子:Leader 宕机了,至多要保障 ISR 中有一个 Follower,这样这个Follwer被选举为Leader 且不会失落数据。公式:replication.factor = min.insync.replicas + 1
  4. unclean.leader.election.enable=false: 避免不在 ISR 中的 Follower 被选举为 Leader。Kafka 0.11.0.0版本开始默认 unclean.leader.election.enable=false

3)生产端

生产端音讯失落场景有:

  1. 音讯沉积: 几个分区的音讯都没生产,就跟丢音讯一样。
  2. 解决措施: 个别问题都出在生产端,尽量进步客户端的生产速度,生产逻辑另起线程进行解决。
  3. 主动提交: 生产端拉下一批数据,正在解决中主动提交了 offset,这时候生产端宕机了; 重启后,拉到新一批数据,而上一批数据却没解决完。
  4. 解决措施: 勾销主动提交 auto.commit = false,改为手动 ack。
  5. 心跳超时,引发 Rebalance: 客户端心跳超时,触发 Rebalance被踢出生产组。如果只有这一个客户端,那音讯就不会被生产了。同时防止两次 poll 的间隔时间超过阈值:
  6. max.poll.records:升高该参数值,倡议远远小于 <单个线程每秒生产的条数> <生产线程的个数> <max.poll.interval.ms> 的积。
  7. max.poll.interval.ms: 该值要大于 <max.poll.records> / (<单个线程每秒生产的条数> * <生产线程的个数>) 的值。
  8. 解决措施: 客户端版本升级至 0.10.2 以上版本。

案例:凡曾遇到数据同步时,音讯中的文本需通过 NLP 的 NER 剖析,再同步到 ES。

这个过程的次要流程是:

  1. 数据同步程序从 Kafka 中拉取音讯。
  2. 数据同步程序将音讯内的文本发送的 NER 进行剖析,失去特色数组。
  3. 数据同步程序将音讯同步给 ES。

景象:线上数据同步程序运行一段时间后,音讯就不生产了。

  • 排查日志: 发现有 Rebalance 日志,狐疑是客户端生产太慢被踢出了生产组。
  • 本地测试: 发现运行一段时间也会呈现 Rebalance,且 NLP的NER 服务拜访 HTTP 500 报错。
  • 得出结论: 因NER服务异样,导致数据同步程序生产超时。且过后客户端版本为 v0.10.1,Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起,从而也会造成心跳超时。

过后解决措施是:

  1. session.timeout.ms: 设置为 25s,过后没有降级客户端版本,怕带来其余问题。
  2. 熔断机制: 减少 Hystrix,超过 3 次服务调用异样就熔断,爱护客户端失常生产数据。

3、如何确保音讯不失落?

把握这些技能:

  1. 相熟音讯从发送到生产的每个阶段
  2. 监控报警 Kafka 集群
  3. 相熟计划 “MQ 可靠消息投递”

怎么确保音讯 100% 不失落?

到这,总结下:

  1. 生产端:
  • 设置重试:props.put(“retries”, “10”);
  • 设置 acks=all
  • 设置回调:producer.send(msg, new CallBack(){…});

2.Broker:

  • 内存:应用带蓄电池后备电源的缓存 cache。
  • Kafka 版本 0.11.x 以上:反对 Epoch 机制。
  • replication.factor >= 3: 正本数至多有 3 个。
  • min.insync.replicas > 1: 代表音讯至多写入 2个正本才算发送胜利。前提须要 acks=-1。
  • unclean.leader.election.enable=false: 避免不在 ISR 中的 Follower 被选举为 Leader。

3.生产端

  • 客户端版本升级至 0.10.2 以上版本。
  • 勾销主动提交 auto.commit = false,改为手动 ack。
  • 尽量进步客户端的生产速度,生产逻辑另起线程进行解决。

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据