关于kafka:如何更好地使用Kafka

5次阅读

共计 14827 个字符,预计需要花费 38 分钟才能阅读完成。

引言 | 要确保 Kafka 在应用过程中的稳定性,须要从 kafka 在业务中的应用周期进行顺次保障。次要能够分为:当时预防(通过标准的应用、开发,预防问题产生)、运行时监控(保障集群稳固,出问题能及时发现)、故障时解决(有残缺的应急预案)这三阶段。

当时预防

当时预防即通过标准的应用、开发,预防问题产生。次要蕴含集群 / 生产端 / 生产端的一些最佳实际、上线前测试以及一些针对紧急情况(如音讯积压等)的长期开关性能。

Kafka 调优准则:

1. 确定优化指标,并且定量给出指标(Kafka 常见的优化指标是吞吐量、延时、持久性和可用性)。

2. 确定了指标之后,须要明确优化的维度。

通用性优化:操作系统、JVM 等。

针对性优化:优化 Kafka 的 TPS、处理速度、延时等。

(一)生产端最佳实际

  • 参数调优

  • 应用 Java 版的 Client;
  • 应用 kafka-producer-perf-test.sh 测试你的环境;
  • 设置内存、CPU、batch 压缩;
  • batch.size:该值设置越大,吞吐越大,但提早也会越大;
  • linger.ms:示意 batch 的超时工夫,该值越大,吞吐越大、但提早也会越大;
  • max.in.flight.requests.per.connection:默认为 5,示意 client 在 blocking 之前向单个连贯(broker)发送的未确认申请的最大数,超过 1 时,将会影响数据的程序性;
  • compression.type:压缩设置,会进步吞吐量;
  • acks:数据 durability 的设置;
  • 防止大音讯(占用过多内存、升高 broker 处理速度);
  • broker 调整:减少 num.replica.fetchers,晋升 Follower 同步 TPS,防止 Broker Full GC 等;
  • 当吞吐量小于网络带宽时:减少线程、进步 batch.size、减少更多 producer 实例、减少 partition 数;
  • 设置 acks=-1 时,如果提早增大:能够增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
  • 跨数据中心的传输:减少 socket 缓冲区设置以及 OS tcp 缓冲区设置。
  • 开发实际

a. 做好 Topic 隔离

依据具体场景(是否容许肯定提早、实时音讯、定时周期工作等)辨别 kafka topic,防止挤占或阻塞实时业务音讯的解决。

b. 做好音讯流控

如果上游音讯生产存在瓶颈或者集群负载过低等,须要在生产端(或音讯网关)施行流量生产速率的管制或者延时 / 暂定音讯发送等策略,防止短时间内发送大量音讯。

c. 做好音讯补推

手动去查问失落的那局部数据,而后将音讯从新发送到 mq 外面,把失落的数据从新补回来。

d. 做好音讯程序性保障

如果须要在保障 Kafka 在分区内严格有序的话(即须要保障两个音讯是有严格的先后顺序),须要设置 key,让某类音讯依据指定规定路由到同一个 topic 的同一个分区中(能解决大部分生产程序的问题)。

然而,须要防止分区内音讯歪斜的问题(例如,依照店铺 Id 进行路由,容易导致音讯不平衡的问题)。

1. 生产端:音讯发送指定 key,确保雷同 key 的音讯发送到同一个 partition。

2. 生产端:单线程生产或者写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue。

e. 适当进步音讯发送效率

批量发送:kafka 先将音讯缓存在内存中的双端队列(buffer)中,当音讯量达到 batch size 指定大小时进行批量发送,缩小了网络传输频次,进步了传输效率;

端到端压缩音讯:将一批音讯打包后进行压缩,发送给 Broker 服务器后,但频繁的压缩和解压也会升高性能,最终还是以压缩的形式传递到消费者的手上,在 Consumer 端进行解压;

异步发送:将生产者革新为异步的形式,能够晋升发送效率,然而如果音讯异步产生过快,会导致挂起线程过多,内存不足,最终导致音讯失落;

索引分区并行生产:当一个工夫绝对长的工作在执行时,它会占用该音讯所在索引分区被锁定,前面的工作不能及时派发给闲暇的客户端解决,若服务端如果启用索引分区并行生产的个性,就能够及时的把前面的工作派发给其余的客户端去执行,同时也不须要调整索引的分区数(但此类音讯仅实用于无需保障音讯程序关系的音讯)。

f. 保障音讯发送可靠性

Producer:如果对数据可靠性要求很高的话,在发送音讯的时候,须要抉择带有 callBack 的 api 进行发送,并设置 acks、retries、factor 等等些参数来保障 Producer 发送的音讯不失落。

Broker:kafka 为了失去更高的性能和吞吐量,将数据异步批量的存储在磁盘中,并采纳了批量刷盘的做法,如果对数据可靠性要求很高的话,能够批改为同步刷盘的形式进步音讯的可靠性。

(二)生产端最佳实际

  • 参数调优

  • 吞吐量:调整 partition 数、OS page cache(调配足够的内存来缓存数据);
  • offset topic(\_\_consumer\_offsets):offsets.topic.replication.factor(默认为 3)、offsets.retention.minutes(默认为 1440,即 1day);
  • offset commit 较慢:异步 commit 或 手动 commit;
  • fetch.min.bytes、fetch.max.wait.ms;
  • max.poll.interval.ms:调用 poll() 之后提早的最大工夫,超过这个工夫没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance;
  • max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为 500;
  • session.timeout.ms;
  • Consumer Rebalance:check timeouts、check processing times/logic、GC Issues;
  • 网络配置。
  • 开发实际

a. 做好音讯生产幂等

音讯生产的幂等次要依据业务逻辑做调整。

以解决订单音讯为例

1. 由订单编号 + 订单状态惟一的幂等 key,并存入 redis;

2. 在解决之前,首先会去查 Redis 是否存在该 Key,如果存在,则阐明曾经解决过了,间接丢掉;

3. 如果 Redis 没解决过,则将解决过的数据插入到业务 DB 上,再到最初把幂等 Key 插入到 Redis 上;

简而言之,即通过 Redis 做前置解决 + DB 惟一索引做最终保障来实现幂等性。

b. 做好 Consumer 隔离

在音讯量十分大的状况下,实时和离线消费者同时生产一个集群,离线数据沉重的磁盘 IO 操作会间接影响实时业务的实时性和集群的稳定性。

依据生产的实时性能够将音讯消费者行为划分两类:实时消费者和离线消费者。

实时消费者:对数据实时性要求较高;在实时生产的场景下,Kafka 会利用零碎的 page cache 缓存,间接从内存转发给实时消费者(热读),磁盘压力为零,适宜广告、举荐等业务场景。

离线消费者(定时周期性消费者):通常是生产数分钟前或是数小时前的音讯,这类音讯通常存储在磁盘中,生产时会触发磁盘的 IO 操作(冷读),适宜报表计算、批量计算等周期性执行的业务场景。

c. 防止音讯生产沉积
  • 提早解决、管制速度,工夫范畴内摊派音讯(针对实时性不高的音讯);
  • 生产速度大于生产速度,这样能够适当减少分区,减少 consumer 数量,晋升生产 TPS;
  • 防止很重的生产逻辑,优化 consumer TPS:

是否有大量 DB 操作;

上游 / 内部服务接口调用超时;

是否有 lock 操作(导致线程阻塞);

须要特地关注 kafka 异步链路中的波及音讯放大的逻辑。

  • 如果有较重的生产逻辑,须要调整 xx 参数,防止音讯没生产完时,生产组退出,造成 reblance 等问题;
  • 确保 consumer 端没有因为异样而导致生产 hang 住;
  • 如果应用的是消费者组,确保没有频繁地产生 rebalance;
  • 多线程生产,批量拉取解决。

注:批量拉取解决时,需注意下 kafka 版本,spring-kafka 2.2.11.RELEASE 版本以下,如果配置 kafka.batchListener=true,然而将音讯接管的元素设置为单个元素(非批量 List),可能会导致 kafka 在拉取一批音讯后,仅仅生产了头部的第一个音讯。

d. 防止 Rebalance 问题
  • 触发条件:

1. 消费者数量变动:新消费者退出、消费者下线(未能及时发送心跳,被“踢出”Group)、消费者被动退出生产组(Consumer 生产工夫过长导致);

2. 生产组内订阅的主题或者主题的分区数量发生变化;

3. 生产组对应的 GroupCoorinator 节点发生变化。

  • 如何防止非必要 rebalance(消费者下线、消费者被动退出生产组导致的 reblance):

1. 须要认真地设置 session.timeout.ms(决定了 Consumer 存活性的工夫距离)和 heartbeat.interval.ms(管制发送心跳申请频率的参数)的值。

2.max.poll.interval.ms 参数配置:管制 Consumer 理论生产能力对 Rebalance 的影响,限定了 Consumer 端应用程序两次调用 poll 办法的最大工夫距离。默认值是 5 分钟,示意 Consumer 程序如果在 5 分钟之内无奈生产完 poll 办法返回的音讯,那么 Consumer 会被动发动“来到组”的申请,Coordinator 也会开启新一轮 Rebalance。具体能够统计下历史的工夫破费,把最长的工夫为参考进行设置。

e. 保障音讯生产可靠性

个别状况下,还是 client 生产 broker 丢音讯的场景比拟多,想 client 端生产数据不能丢,必定是不能应用 autoCommit 的,所以必须是手动提交的。

Consumer 主动提交的机制是依据肯定的工夫距离,将收到的音讯进行 commit。commit 过程和生产音讯的过程是异步的。也就是说,可能存在生产过程未胜利(比方抛出异样),commit 音讯曾经提交了,则此时音讯就失落了。

f. 保障音讯生产程序性

1. 不同 topic(乱序音讯):如果领取与订单生成对应不同的 topic,只能在 consumer 层面去解决了。

2. 同一个 topic(乱序音讯):一个 topic 能够对应多个分区,别离对应了多个 consumer,与“不同 topic”没什么实质上的差异。(能够了解为咱们的服务有多个 pod,生产者程序发送音讯,但被路由到不同分区,就可能变得乱序了,服务生产的就是无序的音讯)。

3. 同一个 topic,同一个分区(程序音讯):Kafka 的音讯在分区内是严格有序的,例如把同一笔订单的所有音讯,依照生成的程序一个个发送到同一个 topic 的同一个分区。

针对乱序音讯

例如:订单和领取别离封装了各自的音讯,然而生产端的业务场景须要按订单音讯 -> 领取音讯的程序顺次生产音讯。

宽表(业务主题相干的指标、维度、属性关联在一起的一张数据库表):生产音讯时,只更新对应的字段就好,音讯只会存在短暂的状态不统一问题,然而状态最终是统一的。例如订单,领取有本人的状态字段,订单有本人的状态字段,售后有本人的状态字段,就不须要保障领取、订单、售后音讯的有序,即便音讯无序,也只会更新本人的状态字段,不会影响到其余状态;

音讯弥补机制:将音讯与 DB 进行比照,如果发现数据不统一,再从新发送音讯至主过程解决,保障最终一致性;

MQ 队列:一个中间方(比方 redis 的队列)来保护 MQ 的程序;

业务保障:通过业务逻辑保障生产程序;

针对程序音讯

两者都是通过将音讯绑定到定向的分区或者队列来保障程序性,通过减少分区或者线程来晋升生产能力。

1.Consumer 单线程程序生产

生产者在发送音讯时,已保障音讯在分区内有序,一个分区对应了一个消费者,保障了音讯生产的程序性。

2.Consumer 多线程程序生产(具体策略在前面章节)

单线程程序生产的扩大能力很差。为了晋升消费者的处理速度,除了横向扩大分区数,减少消费者外,还能够应用多线程程序生产。

将接管到的 kafka 数据进行 hash 取模(留神:如果 kafka 分区承受音讯曾经是取模的了,这里肯定要对 id 做一次 hash 再取模)发送到不同的队列,而后开启多个线程去生产对应队列外面的数据。

此外,这里通过配置核心进行开关、动静扩容 / 缩容线程池。

g. 解决 Consumer 的事务

通过事务音讯,能够很好的保障一些业务场景的事务逻辑,不会因为网络不可用等起因呈现零碎之间状态不统一。

当更新任何一个服务呈现故障时就抛出异样,事务音讯不会被提交或回滚,音讯服务器会回调发送端的事务查问接口,确定事务状态,发送端程序能够依据音讯的内容对未做完的工作从新执行,而后通知音讯服务器该事务的状态。

(三)集群配置最佳实际

  • 集群配置

Broker 评估:每个 Broker 的 Partition 数不应该超过 2k、管制 partition 大小(不要超过 25GB)。

集群评估(Broker 的数量依据以下条件配置):数据保留工夫、集群的流量大小。

集群扩容:磁盘使用率应该在 60% 以下、网络使用率应该在 75% 以下。

集群监控:放弃负载平衡、确保 topic 的 partition 均匀分布在所有 Broker 上、确保集群的阶段没有耗尽磁盘或带宽。

  • Topic 评估

1.Partition 数:

Partition 数应该至多与最大 consumer group 中 consumer 线程数统一;

对于应用频繁的 topic,应该设置更多的 partition;

管制 partition 的大小(25GB 左右);

思考利用将来的增长(能够应用一种机制进行主动扩容);

2. 应用带 key 的 topic;

3.partition 扩容:当 partition 的数据量超过一个阈值时应该主动扩容(实际上还应该思考网络流量)。

  • 分区配置

设置多个分区在肯定水平上是能够进步消费者生产的并发度,然而分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能减少端到端的提早、影响零碎可用性、故障复原工夫较长等问题。

依据吞吐量的要求设置 partition 数:

1. 假如 Producer 单 partition 的吞吐量为 P

2.consumer 生产一个 partition 的吞吐量为 C

3. 而要求的吞吐量为 T

4. 那么 partition 数至多应该大于 T/P、T/c 的最大值

(四)性能调优

调优指标:高吞吐量、低延时。

  • 分层调优

自上而下分为应用程序层、框架层、JVM 层和操作系统层,层级越靠上,调优的成果越显著。

|
调优类型

|

倡议

操作系统

|

挂载文件系统时禁掉 atime 更新;抉择 ext4 或 XFS 文件系统;swap 空间的设置;页缓存大小

|
|

JVM(堆设置和 GC 收集器)

|

将 JVM 堆大小设置成 6~8GB;倡议应用 G1 收集器,不便省事,比 CMS 收集器的优化难度小

|
|

Broker 端

|

放弃服务器端和客户端版本统一

|
|

应用层

|

要频繁地创立 Producer 和 Consumer 对象实例;用完及时敞开;正当利用多线程来改善性能

|

  • 吞吐量 (TPS) 调优

|
| 参数列表

Broker 端

|

适当减少 num.replica.fetchers 参数值,但不超过 CPU 核数

|
|
|

调优 GC 参数以防止经常性的 Full GC

|
|

Producer 端

|

适当减少 batch.size 参数值,比方从默认的 16KB 减少到 512KB 或 1MB

|
|
|

适当减少 linger.ms 参数值,比方 10~100

|
|
|

设置 compression.type=lz4 或 zstd

|
|
|

设置 acks= 0 或 1

|
|
|

设置 retries=0

|
|
|

如果多线程共享同一个 Producer 实例,则减少 buffer.memory 参数值

|
|

Consumer 端

|

采纳多 Consumer 过程或线程同时生产数据

|
|
|

减少 fetch.min.bytes 参数值,比方设置成 1KB 或更大

|

  • 延时调优

|

参数列表

Broker 端

|

适当设置 num.replica.fetchers 值

|
|

Producer 端

|

设置 linger.ms=0

|
|
|

不启用压缩,即设置 compression.type=none

|
|
|

设置 ackes=1

|
|

Consumer 端

|

设置 fetch.min.bytes=1

|

(五)稳定性测试

kafka 的稳定性测试次要在业务上线前针对 Kafka 实例 / 集群衰弱性、高可用性的测试。

  • 衰弱性查看

1. 查看实例:查看 Kafka 实例对象中拿到所有的信息(例如 IP、端口等);

2. 测试可用性:拜访生产者和消费者,测试连贯。

  • 高可用测试

单节点异样测试:重启 Leader 正本或 Follower 正本所在 Pod

步骤:

1. 查看 topic 的正本信息

2. 删除相应 pod

3. 脚本检测 Kafka 的可用性

预期:对生产者和消费者的可用性均无影响。

集群异样测试:重启所有 pod

步骤:

1. 删除所有 pod

2. 脚本检测 Kafka 的可用性

预期:所有 broker ready 后服务失常。

运行时监控

运行时监控次要蕴含集群稳定性配置与 Kafka 监控的最佳实际,旨在及时发现 Kafka 在运行时产生的相干问题与异样。

(一)集群稳定性监控

  • 腾讯云 CKafka 集群配置

正当进行 kafka 实例配,次要关注这几个数据:

  1. 磁盘容量和峰值带宽
  2. 音讯保留时长;
  3. 动静保留策略;
a. 磁盘容量和峰值带宽

可依据理论业务的音讯内容大小、发送音讯 qps 等进行预估,能够尽量设置大点;具体数值可依据实例监控查看,如果短时间内磁盘应用百分比就达到较高值,则需扩容。

峰值带宽 = 最大生产流量 * 正本数

b. 音讯保留时长

音讯即便被生产,也会长久化到磁盘存储保留时长的工夫。该设置会占用磁盘空间,如果每天音讯量很大的话,可适当缩短保留工夫。

c. 动静保留策略

举荐开启动静保留设置。当磁盘容量达到阈值,则删除最早的音讯,最多删除到保底时长范畴外的音讯(淘汰策略),能够很大水平防止磁盘被打满的状况。

但有调整时不会被动告诉,但咱们能够通过配置告警感知磁盘容量的变动。

  • 自建 Kafka 集群配置

1. 设置日志配置参数以使日志易于治理;

2. 理解 kafka 的 (低) 硬件需要;

3. 充分利用 Apache ZooKeeper;

4. 以正确的形式设置复制和冗余;

5. 留神主题配置;

6. 应用并行处理;

7. 带着安全性思维配置和隔离 Kafka;

8. 通过进步限度防止停机;

9. 放弃低网络提早;

10. 利用无效的监控和警报。

  • 资源隔离

a.Broker 级别物理隔离

如果不同业务线的 topic 会共享一块磁盘,若某个 consumer 呈现问题而导致生产产生 lag,进而导致频繁读盘,会影响在同一块磁盘的其余业务线 TP 的写入。

解决:Broker 级别物理隔离:创立 Topic、迁徙 Topic、宕机复原流程

b.RPC 队列隔离

Kafka RPC 队列短少隔离,一旦某个 topic 解决慢,会导致所有申请 hang 住。

解决:须要依照控制流、数据流拆散,且数据流要可能依照 topic 做隔离。

1. 将 call 队列依照拆解成多个,并且为每个 call 队列都调配一个线程池。

2. 一个队列独自解决 controller 申请的队列(隔离控制流),其余多个队列依照 topic 做 hash 的扩散开(数据流之间隔离)。

如果一个 topic 呈现问题,则只会阻塞其中的一个 RPC 解决线程池,以及 call 队列,能够保障其余的解决链路是畅通的。

  • 智能限速

整个限速逻辑实现在 RPC 工作线程解决的末端,一旦 RPC 处理完毕,则通过限速管制模块进行限速检测。

1. 配置等待时间,之后放入到 delayed queue 中,否则放到 response queue 中。

2. 放入到 delayed queue 中的申请,等待时间达到后,会被 delayed 线程放入到 response queue 中。

3. 最终在 response queue 中的申请被返回给 consumer。

(二)Kafka 监控

白盒监控:服务或零碎本身指标,如 CPU 负载、堆栈信息、连接数等;

黑盒监控:个别是通过模仿内部用户对其可见的零碎性能进行监控的一种监控形式,相干指标如音讯的提早、错误率和反复率等性能和可用性指标。

|
监控

|

性能 / 指标

|

详情

黑盒监控

|

操作

|

主题操作:创立、预览、查看、更新、删除

|
|
|

服务

|

数据写入、是否生产胜利

|
|
|

零碎

|

CPU 负载、堆栈信息、连接数等

|
|

白盒监控

|

容量

|

总存储空间、已用存储空间、最大分区应用、集群资源、分区数量、主题数量;

|
|
|

流量

|

音讯写入、生产速率、集群网络进出;

|
|
|

提早

|

音讯写入、生产耗时(平均值、99 分位、最大耗时)、主题生产提早量(offset lag)

|
|
|

谬误

|

集群异样节点数量、音讯写入回绝量、音讯生产失败量、依赖 zookeeper 的相干谬误

|

  • 腾讯云 CKafka 告警

针对 CKafka,须要配置告警(此类告警个别为音讯积压、可用性、集群 / 机器衰弱性等查看)。

a. 指标

如:实例衰弱状态、节点数量、衰弱节点数量、问题分区数、生产音讯数、生产申请数、jvm 内存利用率、均匀生产响应工夫、分区生产偏移量等。

具体指标能够参考:https://cloud.tencent.com/doc…

b. 配置

配置文档:https://cloud.tencent.com/doc…

抉择监控实例,配置告警内容和阈值。

个别会对以后服务本身的 kafka 集群做告警配置,然而如果是依赖本身音讯的上游服务呈现生产问题,咱们是感知不到了;而且针对生产端服务不共用同一个集群的状况,呈现音讯反复发送的问题,服务本身是很难发现的。

c. 预案

在业务上线前,最好梳理下本身服务所波及的 topic 音讯(上游生产端和上游生产端),并细化告警配置,如果呈现上游 kafka 异样或者上游 kafka 音讯沉积能够及时感知。特地须要把可能有刹时大量音讯的场景(如批量数据导入、定时全量数据同步等)做肯定的告警或者预案,防止服务不可用或者影响失常业务音讯。

  • 自建告警平台

通过自建告警平台配置对服务本身的异样告警,其中包含对框架在应用 kafka 组件时抛出与 kafka 生产逻辑过程中抛出的业务异样。

其中,可能须要异样降级的状况(因为)独自做下解决(针对 spring kafka):

1. 自定义 kafka 异样处理器:实现 KafkaListenerErrorHandler 接口的办法,注册自定义异样监听器,辨别业务异样并抛出;

2. 生产 Kafka 音讯时,将 @KafkaListener 的 errorHandler 参数设置为定义的 Kafka 异样处理器;

3. 尔后,指定的业务异样会被抛出,而不会被封装成 Spring kafka 的框架异样,导致不能清晰地理解具体异样信息。

  • Kafka 监控组件

目前业界并没有公认的解决方案,各家都有各自的监控之道。

Kafka Manager:应该算是最有名的专属 Kafka 监控框架了,是独立的监控零碎。

Kafka Monitor:LinkedIn 开源的收费框架,反对对集群进行零碎测试,并实时监控测试后果。

CruiseControl:也是 LinkedIn 公司开源的监控框架,用于实时监测资源使用率,以及提供罕用运维操作等。无 UI 界面,只提供 REST API。

JMX 监控:因为 Kafka 提供的监控指标都是基于 JMX 的,因而,市面上任何可能集成 JMX 的框架都能够应用,比方 Zabbix 和 Prometheus。已有大数据平台本人的监控体系:像 Cloudera 提供的 CDH 这类大数据平台,人造就提供 Kafka 监控计划。

JMXTool:社区提供的命令行工具,可能实时监控 JMX 指标。答上这一条,属于相对的加分项,因为晓得的人很少,而且会给人一种你对 Kafka 工具十分相熟的感觉。如果你临时不理解它的用法,能够在命令行以无参数形式执行一下 kafka-run-class.sh kafka.tools.JmxTool,学习下它的用法。

  • Kafka Monitor

其中,Kafka Monitor 通过模仿客户端行为,生产和生产数据并采集音讯的提早、错误率和反复率等性能和可用性指标,能够很好地发现上游的音讯生产状况进而能够动静地调整音讯的发送。(应用过程中需注意对样本覆盖率、性能覆盖率、流量、数据隔离、时延的管制)

Kakfa Monitor 劣势

1. 通过为每个 Partition 启动独自的生产工作,确保监控笼罩所有 Partition。

2. 在生产的音讯中蕴含了工夫戳、序列号,Kafka Monitor 能够根据这些数据对音讯的提早、失落率和反复率进行统计。

3. 通过设定音讯生成的频率,来达到管制流量的目标。

4. 生产的音讯在序列化时指定为一个可配置的大小(验证对不同大小数据的解决能力、雷同音讯大小的性能比拟)。

5. 通过设定独自的 Topic 和 Producer ID 来操作 Kafka 集群,可防止净化线上数据,做到肯定水平上的数据隔离。

基于 Kafka Monitor 的设计思维,能够针对业务特点引入对音讯的提早、错误率和反复率等性能的监控告警指标。

故障时解决

防微杜渐,遇到问题 / 故障时有残缺的应急预案,以疾速定位并解决问题。

(一)Kafka 音讯沉积紧急预案

问题形容:生产端产生音讯积压,导致依赖该音讯的服务不能及时感知业务变动,导致一些业务逻辑、数据处理呈现提早,容易产生业务阻塞和数据一致性问题。

计划:问题排查、扩容升配策略、音讯 Topic 转换策略、可配置多线程的生产策略。

  • 问题排查

遇到音讯积压时,具体能够从以下几个角度去定位问题起因:

1. 音讯生产端数据量是否存在陡升的状况。

2. 音讯生产端生产能力是否有降落。

3. 音讯积压是产生在所有的 partition 还是所有的 partition 都有积压状况。

对于第 1、2 点导致的音讯积压:为暂时性的音讯积压,通过扩分区、扩容升配、多线程生产、批量生产等形式进步生产速度能在肯定水平上解决这类问题。

对于第 3 点导致的音讯积压:能够采纳音讯 Topic 直达策略。

  • 扩容升配策略

1. 查看生产端生产发送状况(次要查看是否持续有音讯产生、是否存在逻辑缺点、是否有反复音讯发送);

2. 察看生产端的生产状况(预估下沉积音讯的解决清理以及是否有升高趋势);

3. 若为生产端问题,则评估是否能够通过减少分区数、调整偏移量、删除 topic(须要评估影响面)等解决;

4. 生产端新增机器及依赖资源,进步生产能力;

5. 如果波及数据一致性问题,须要通过数据比对、对账等性能进行校验。

  • 配置多线程的生产策略

简而言之,即线程池生产 + 动静线程池配置策略:将接管到的 kafka 数据进行 hash 取模(如果 kafka 分区承受音讯曾经是取模的了,这里肯定要对 id 做一次 hash 再取模)发送到不同的队列,而后开启多个线程去生产对应队列外面的数据。

设计思路:

1. 在利用启动时初始化对应业务的程序生产线程池(demo 中为订单生产线程池);

2. 订单监听类拉取音讯提交工作至线程池中对应的队列;

3. 线程池的线程解决绑定队列中的工作数据;

4. 每个线程解决完工作后减少待提交的 offsets 标识数;

5. 监听类中校验待提交的 offsets 数与拉取到的记录数是否相等,如果相等则;

6. 手动提交 offset(敞开 kafka 的主动提交,待本次拉取到的工作解决实现之后再提交位移)

另外,能够依据业务流量调整的线程配置与 pod 的配置,如高峰期设置一个绝对较高的并发级别数用来疾速解决音讯,平峰期设置一个较小的并发级别数来让出系统资源。这里,能够参考美团提供的一种配置核心批改配置动静设置线程池参数的思路,实现动静的扩容或者缩容。

实现了动静扩容与缩容

1. 通过配置核心刷新 OrderKafkaListener 监听类中的配置 concurrent 的值。

2. 通过 set 办法批改 concurrent 的值时,先批改 stopped 的值去进行以后正在执行的线程池。

3. 执行结束后通过新的并发级别数新建一个新的线程池,实现了动静扩容与缩容。

此外,还能够新增开关,它设置为 true 是能够中断启动中的线程池,故障时进行性能开关。

留神:如果波及数据一致性问题,须要通过数据比对、对账等性能进行校验。

  • Topic 直达策略

当音讯积压是产生在所有的 partition 还是所有的 partition 都有积压状况时,只能操作长期扩容,以更快的速度去生产数据了。

设计思路:

1. 长期建设好原先 10 倍或者 20 倍的 queue 数量(新建一个 topic,partition 是原来的 10 倍);

2. 而后写一个长期散发音讯的 consumer 程序,这个程序部署下来生产积压的音讯,生产之后不做耗时解决,间接平均轮询写入长期建好分 10 数量的 queue 外面;

3. 紧接着征用 10 倍的机器来部署 consumer,每一批 consumer 生产一个长期 queue 的音讯;

4. 这种做法相当于长期将 queue 资源和 consumer 资源扩充 10 倍,以失常速度的 10 倍来生产音讯。

5. 等疾速生产完了之后,复原原来的部署架构,从新用原来的 consumer 机器来生产音讯。

改良

1.consumer 程序能够写在服务外面;

2. 指定一个“预案 topic”,在服务中事后写好对“预案 topic”;

3. 采纳策略模式进行”业务 topic“->“预案 topic”的转换。

留神

1. 如果波及数据一致性问题,须要通过数据比对、对账等性能进行校验;

2. 须要有个独自的 topic 转换服务,或批改服务代码,或在事先将多线程逻辑写好。

(二)Kafka 生产异样导致生产阻塞

问题形容:某个音讯生产异样或者某个操作较为耗时,导致单个 pod 的生产能力降落,甚至产生阻塞。

计划:设置偏移量;开关多线程的生产策略。

  • 设置偏移量

1. 调整偏移量:分割运维,将 offset 后移一位;

2. 音讯补推:针对跳过的音讯或某个时间段内的数据进行音讯补推;

3. 如果波及数据一致性问题,须要通过数据比对、对账等性能进行校验。

  • 开关多线程的生产策略

参考下面的“可配置多线程的生产策略”,在产生阻塞时开启多线程生产开关。

注:须要批改代码或者在事先将多线程逻辑写好

(三)Kafka 音讯失落预案

问题形容:服务没有依照预期生产到 kafka 音讯,导致业务产生问题。

计划:根因剖析;音讯补推。

  • 根因剖析

1. 生产端是否胜利发送生产(源头失落)

Broker 失落音讯:Kafka 为了失去更高的性能和吞吐量,将数据异步批量的存储在磁盘中,异步刷盘有肯能造成源头数据失落;

Producer 失落音讯:发送逻辑存在 Bug,导致音讯为发送胜利。

解决:须要查看生产端与集群衰弱性;音讯补发。

2. 是否被胜利生产

Consumer 主动提交的机制是依据肯定的工夫距离,将收到的音讯进行 commit。commit 过程和生产音讯的过程是异步的。也就是说,可能存在生产过程未胜利(比方抛出异样),commit 音讯曾经提交了。

此外,如果生产逻辑有 bug,也导致音讯失落的假象。

解决:修复问题,视状况批改生产确认机制。

3. 是否有其余服务共用了同一个生产组

多服务误用同一个生产组会导致音讯肯定比率或规律性失落。

例如,创立用户的 kafka 音讯,可能价格核心和促销服务误用了一个生产组,导致每个服务都是生产了局部音讯,导致一些问题呈现偶现的状况。

解决:批改配置,重启服务,各种建设的生产组;事先须要有查看是否有多个服务共用一个生产的状况(检测 + 比对)。

  • 音讯补推

1. 通过业务影响查问影响的数据信息;

2. 构建 kafka 音讯,进行音讯弥补;

3. 如果波及数据一致性问题,须要通过数据比对、对账等性能进行校验。

针对每个对外发送的服务,生产端个别都须要有较为欠缺的音讯补推接口,并且生产端也须要保障音讯生产的幂等。

其余

(一)Kafka 老本管制

机器、存储和网络

  • 机器

须要从新评估你的实例类型决策:你的集群是否饱和?在什么状况下饱和?是否存在其余实例类型,可能比你第一次创立集群时抉择的类型更适合?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否真的比 i3 或 i3en 机器(及其带来的劣势)有更好的性价比?

  • 存储与网络

压缩在 Kafka 中并不陈腐,大多数用户曾经晓得了本人能够在 GZIP、Snappy 和 LZ4 之间做出抉择。但自从 KIP-110 被合并进 Kafka,并增加了用于 Zstandard 压缩的压缩器后,它已实现了显著的性能改良,并且是升高网络老本的完满形式。

以生产者端略高的 CPU 使用率为代价,你将取得更高的压缩率并在线上“挤进”更多信息。

Amplitude 在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽使用量缩小了三分之二,仅在解决管道上就能够节俭每月数万美元的数据传输老本。

  • 集群

不均衡的集群可能会侵害集群性能,导致某些 borker 比其余 broker 的负载更大,让响应提早更高,并且在某些状况下会导致这些 broker 的资源饱和,从而导致不必要的扩容,进而会影响集群老本。

此外,不均衡集群还面临一个危险:在一个 broker 出故障后呈现更高的 MTTR(例如当该 broker 不必要地持有更多分区时),以及更高的数据失落危险(设想一个复制因子为 2 的主题,其中一个节点因为启动时要加载的 segment 过多,于是难以启动)。

(二)音讯生产的幂等

定义:

所谓幂等性,数学概念就是: f(f(x)) = f(x)。f 函数示意对音讯的解决。艰深点来讲就是,在消费者收到反复音讯进行反复解决时,也要保障最终后果的一致性。

比方,银行转账、下单等,不论重试多少次,都要保障最终后果肯定是统一的。

  • 利用数据库的惟一束缚

将数据库中的多个字段联结,创立一个惟一束缚,即便屡次操作也能保障表里至少存在一条记录(如创立订单、创立账单、创立流水等)。

此外,只有是反对相似“INSERT IF NOT EXIST”语义的存储类零碎(如 Redis 的 SETNX)都能够用于实现幂等生产。

  • 设置前置条件

1. 给数据变更设置一个前置条件(版本号 version、updateTime);

2. 如果满足条件就更新数据,否则回绝更新数据;

3. 在更新数据的时候,同时变更前置条件中的数据(版本号 +1、更新 updateTime)。

  • 记录并查看操作

1. 给每条音讯都记录一个全局惟一 ID;

2. 生产时,先依据这个全局惟一 ID 查看这条音讯是否有被生产过;

3. 如果没有生产过,则更新数据,并将生产状态置为“已生产”状态。

其中,在“查看生产状态,而后更新数据并且设置生产状态”中,三个操作必须作为一组操作保障原子性。

参考:

[1]https://iwiki.woa.com/pages/v…
[2]https://www.infoq.cn/article/…\_share
[3]https://blog.csdn.net/qq\_32179907/article/details/122599769
[4]https://blog.csdn.net/qq\_32179907/article/details/122599769
[5]https://zhuanlan.zhihu.com/p/…\_source=wechat\_session&utm\_medium=social&utm\_oi=689250073002930176&utm\_campaign=shareopn
[6]https://blog.csdn.net/philip5…\_medium=distribute.wap\_relevant.none-task-blog-2~default~baidujs\_baidulandingword~default-0-118997899-blog-125192952.wap\_relevant\_multi\_platform\_whitelistv1&spm=1001.2101.3001.4242.1&utm\_relevant\_index=1
[7]https://www.zhihu.com/questio…\_source=wechat\_session&utm\_medium=social&utm\_oi=689250073002930176&utm\_content=group3\_Answer&utm\_campaign=shareopn
[8]https://zhuanlan.zhihu.com/p/…\_source=wechat\_session&utm\_medium=social&utm\_oi=689250073002930176&utm\_campaign=shareopn
[9]https://www.infoq.cn/article/…\_share
[10]https://www.infoq.cn/article/…\_share
[11]https://www.infoq.cn/article/…\_share
[12]https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH?source=app\_share

浏览原文

正文完
 0