- 1、如何晓得有音讯失落?
- 2、哪些环节可能丢音讯?
- 3、如何确保音讯不失落?
-
- *
引入 MQ 消息中间件最间接的目标:零碎解耦以及流量管制(削峰填谷)
- 零碎解耦: 上下游零碎之间的通信相互依赖,利用 MQ 音讯队列能够隔离上下游环境变动带来的不稳固因素。
- 流量管制: 超高并发场景中,引入 MQ 能够实现流量“削峰填谷”的作用以及服务异步解决,不至于打崩服务。
引入 MQ 同样带来其余问题:数据一致性。
在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。音讯生产端发送音讯到 MQ 再到音讯生产端须要保障音讯不失落。
所以在应用 MQ 音讯队列时,须要思考这 3 个问题:
- 如何晓得有音讯失落?
- 哪些环节可能丢音讯?
- 如何确保音讯不失落?图片
1、如何晓得有音讯失落?
如何感知音讯是否失落了?可总结如下:
- 别人反馈: 经营、PM 反馈音讯失落。
- 监控报警: 监控指定指标,即时报警人工调整。Kafka 集群异样、Broker 宕机、Broker 磁盘挂载问题、消费者异样导致音讯积压等都会给用户间接感觉是音讯失落了。
案例:舆情剖析中数据采集同步
- PM 可本人下发采集调度指令,去采集特定数据。
- PM 可通过 ES 近实时查问对应数据,若没相应数据可再次下发指令。
当感知音讯失落了,那就须要一种机制来查看音讯是否失落。
检索音讯
运维工具有:
- 查看 Kafka 生产地位:
> 基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后盾管理系统 + 用户小程序,反对 RBAC 动静权限、多租户、数据权限、工作流、三方登录、领取、短信、商城等性能
>
> * 我的项目地址:<https://gitee.com/zhijiantianya/ruoyi-vue-pro>
> * 视频教程:<https://doc.iocoder.cn/video/>
# 查看某个 topic 的 message 数量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
> 基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后盾管理系统 + 用户小程序,反对 RBAC 动静权限、多租户、数据权限、工作流、三方登录、领取、短信、商城等性能
>
> * 我的项目地址:<https://gitee.com/zhijiantianya/yudao-cloud>
> * 视频教程:<https://doc.iocoder.cn/video/>
# 查看 consumer Group 列表
$ ./kafka-consumer-groups.sh --list --bootstrap-server 192.168.88.108:9092
# 查看 offset 生产状况
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-1152 test_topic 0 - 4 - consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1 consumer-console-consumer-1152-1
2. 利用工具:Kafka Tools
3. 其余可见化界面工具
2、哪些环节可能丢音讯?
一条音讯从生产到生产实现经验 3 个环节:音讯生产者、消息中间件、音讯消费者。
哪个环节都有可能呈现音讯失落问题。
1)生产端
首先要意识到 Kafka 生产端发送音讯流程:
调用 send() 办法时,不会立即把音讯发送进来,而是缓存起来,抉择失当机会把缓存里的音讯划分成一批数据,通过 Sender 线程按批次发送给服务端 Broker。
此环节失落音讯的场景有: 即导致 Producer 音讯没有发送胜利
- 网络稳定: 生产者与服务端之间的链路不可达,发送超时。景象是:各端状态失常,但生产端就是没有生产音讯,就像失落音讯一样。
- 解决措施: 重试 props.put(“retries”, “10”);
- 不失当配置: 发送音讯无 ack 确认; 发送音讯失败无回调,无日志。producer.send(new ProducerRecord<>(topic, messageKey, messageStr),
new CallBack(){…}); - 解决措施: 设置 acks=1 或者 acks=all。发送音讯设置回调。
回顾下重要的参数: acks
- acks=0:不须要期待服务器的确认. 这是 retries 设置有效. 响应里来自服务端的 offset 总是 -1,producer 只管发不论发送胜利与否。提早低,容易失落数据。
- acks=1:示意 leader 写入胜利(然而并没有刷新到磁盘)后即向 producer 响应。提早中等,一旦 leader 正本挂了,就会失落数据。
- acks=all:期待数据实现正本的复制, 等同于 -1. 如果须要保障音讯不失落, 须要应用该设置. 同时须要设置 unclean.leader.election.enable 为 true, 保障当 ISR 列表为空时, 抉择其余存活的正本作为新的 leader.
2)服务端
先来理解下 Kafka Broker 写入数据的过程:
- Broker 接管到一批数据,会先写入内存 PageCache(OS Cache)中。
- 操作系统会隔段时间把 OS Cache 中数据进行刷盘,这个过程会是 「异步批量刷盘」。
这里就有个隐患,如果数据写入 PageCache 后 Kafka Broker 宕机会怎么?机子宕机 / 掉电?
- Kafka Broker 宕机: 音讯不会失落。因为数据曾经写入 PageCache,只期待操作系统刷盘即可。
- 机子宕机 / 掉电: 音讯会失落。因为数据仍在内存里,内存 RAM 掉电后就会失落数据。
解决方案 :应用带蓄电池后备电源的缓存 cache,避免零碎断电异样。
比照学习 MySQL 的“双 1”策略,根本不应用这个策略,因为“双 1”会导致频繁的 I/O 操作,也是最慢的一种。
比照学习 Redis 的 AOF 策略,默认且举荐的策略:Everysec(AOF_FSYNC_EVERYSEC) 每一秒钟保留一次(默认):。每个写命令执行完, 只是先把日志写到 AOF 文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。
拓展:Kafka 日志刷盘机制
# 举荐采纳默认值,即不配置该配置,交由操作系统自行决定何时落盘,以晋升性能。# 针对 broker 配置:log.flush.interval.messages=10000 # 日志落盘音讯条数距离,即每接管到肯定条数音讯,即进行 log 落盘。log.flush.interval.ms=1000 # 日志落盘工夫距离,单位 ms,即每隔肯定工夫,即进行 log 落盘。# 针对 topic 配置:flush.messages.flush.ms=1000 # topic 下每 1s 刷盘
flush.messages=1 # topic 下每个音讯都落盘
# 查看 Linux 后盾线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10 # 示意当脏页占总内存的的百分比超过这个值时,后盾线程开始刷新脏页。vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000 # 示意脏数据多久会被刷新到磁盘上(30 秒)。vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500 # 示意多久唤醒一次刷新脏页的后盾线程(5秒)。vm.dirtytime_expire_seconds = 43200
Broker 的可靠性须要依赖其多正本机制: 个别正本数 3 个(配置参数:replication.factor=3)
- Leader Partition 正本:提供对外读写机制。
- Follower Partition 正本:同步 Leader 数据。
正本之间的数据同步也可能呈现问题:数据失落问题和数据不统一问题。
解决方案:ISR 和 Epoch 机制
- ISR(In-Sync Replicas): 当 Le“ader 宕机,能够从 ISR 中抉择一个 Follower 作为 Leader。
- Epoch 机制: 解决 Leader 正本高水位更新和 Follower 正本高水位更新在工夫上是存在错配问题。Tips: Kafka 0.11.x 版本才引入 leader epoch 机制解决高水位机制弊病。
对应须要的配置参数如下:
- acks=-1 或者 acks=all: 必须所有正本均同步到音讯,能力表明音讯发送胜利。
- replication.factor >= 3: 正本数至多有 3 个。
- min.insync.replicas > 1: 代表音讯至多写入 2 个正本才算发送胜利。前提须要 acks=-1。举个栗子:Leader 宕机了,至多要保障 ISR 中有一个 Follower,这样这个 Follwer 被选举为 Leader 且不会失落数据。公式:replication.factor = min.insync.replicas + 1
- unclean.leader.election.enable=false: 避免不在 ISR 中的 Follower 被选举为 Leader。Kafka 0.11.0.0 版本开始默认 unclean.leader.election.enable=false
3)生产端
生产端音讯失落场景有:
- 音讯沉积: 几个分区的音讯都没生产,就跟丢音讯一样。
- 解决措施: 个别问题都出在生产端,尽量进步客户端的生产速度,生产逻辑另起线程进行解决。
- 主动提交: 生产端拉下一批数据,正在解决中主动提交了 offset,这时候生产端宕机了; 重启后,拉到新一批数据,而上一批数据却没解决完。
- 解决措施: 勾销主动提交 auto.commit = false,改为手动 ack。
- 心跳超时,引发 Rebalance: 客户端心跳超时,触发 Rebalance 被踢出生产组。如果只有这一个客户端,那音讯就不会被生产了。 同时防止两次 poll 的间隔时间超过阈值:
- max.poll.records:升高该参数值,倡议远远小于 < 单个线程每秒生产的条数 > < 生产线程的个数 > <max.poll.interval.ms> 的积。
- max.poll.interval.ms: 该值要大于 <max.poll.records> / (< 单个线程每秒生产的条数 > * < 生产线程的个数 >) 的值。
- 解决措施: 客户端版本升级至 0.10.2 以上版本。
案例:凡曾遇到数据同步时,音讯中的文本需通过 NLP 的 NER 剖析,再同步到 ES。
这个过程的次要流程是:
- 数据同步程序从 Kafka 中拉取音讯。
- 数据同步程序将音讯内的文本发送的 NER 进行剖析,失去特色数组。
- 数据同步程序将音讯同步给 ES。
景象:线上数据同步程序运行一段时间后,音讯就不生产了。
- 排查日志: 发现有 Rebalance 日志,狐疑是客户端生产太慢被踢出了生产组。
- 本地测试: 发现运行一段时间也会呈现 Rebalance,且 NLP 的 NER 服务拜访 HTTP 500 报错。
- 得出结论: 因 NER 服务异样,导致数据同步程序生产超时。 且过后客户端版本为 v0.10.1,Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起,从而也会造成心跳超时。
过后解决措施是:
- session.timeout.ms: 设置为 25s,过后没有降级客户端版本,怕带来其余问题。
- 熔断机制: 减少 Hystrix,超过 3 次服务调用异样就熔断,爱护客户端失常生产数据。
3、如何确保音讯不失落?
把握这些技能:
- 相熟音讯从发送到生产的每个阶段
- 监控报警 Kafka 集群
- 相熟计划“MQ 可靠消息投递”
怎么确保音讯 100% 不失落?
到这,总结下:
- 生产端:
- 设置重试:props.put(“retries”, “10”);
- 设置 acks=all
- 设置回调:producer.send(msg, new CallBack(){…});
2.Broker:
- 内存:应用带蓄电池后备电源的缓存 cache。
- Kafka 版本 0.11.x 以上:反对 Epoch 机制。
- replication.factor >= 3: 正本数至多有 3 个。
- min.insync.replicas > 1: 代表音讯至多写入 2 个正本才算发送胜利。前提须要 acks=-1。
- unclean.leader.election.enable=false: 避免不在 ISR 中的 Follower 被选举为 Leader。
3. 生产端
- 客户端版本升级至 0.10.2 以上版本。
- 勾销主动提交 auto.commit = false,改为手动 ack。
- 尽量进步客户端的生产速度,生产逻辑另起线程进行解决。