关于kafka:Kafka-负载均衡在-vivo-的落地实践

​vivo 互联网服务器团队-You Shuo

正本迁徙是Kafka最高频的操作,对于一个领有几十万个正本的集群,通过人工去实现正本迁徙是一件很艰难的事件。Cruise Control作为Kafka的运维工具,它蕴含了Kafka 服务高低线、集群内负载平衡、正本扩缩容、正本缺失修复以及节点降级等性能。显然,Cruise Control的呈现,使得咱们可能更容易的运维大规模Kafka集群。
备注:本文基于 Kafka 2.1.1发展。

一、 Kafka 负载平衡

1.1 生产者负载平衡

Kafka 客户端能够应用分区器根据音讯的key计算分区,如果在发送音讯时未指定key,则默认分区器会基于round robin算法为每条音讯调配分区;

否则会基于murmur2哈希算法计算key的哈希值,并与分区数取模的到最初的分区编号。

很显然,这并不是咱们要探讨的Kafka负载平衡,因为生产者负载平衡看起来并不是那么的简单。

1.2 消费者负载平衡

思考到消费者高低线、topic分区数变更等状况,KafkaConsumer还须要负责与服务端交互执行分区再调配操作,以保障消费者可能更加平衡的生产topic分区,从而晋升生产的性能;

Kafka目前支流的分区调配策略有2种(默认是range,能够通过partition.assignment.strategy参数指定):

  • range: 在保障平衡的前提下,将间断的分区调配给消费者,对应的实现是RangeAssignor;
  • round-robin:在保障平衡的前提下,轮询调配,对应的实现是RoundRobinAssignor;
  • 0.11.0.0版本引入了一种新的分区调配策略StickyAssignor,其劣势在于可能保障分区平衡的前提下尽量放弃原有的分区调配后果,从而防止许多冗余的分区调配操作,缩小分区再调配的执行工夫。

无论是生产者还是消费者,Kafka 客户端外部曾经帮咱们做了负载平衡了,那咱们还有探讨负载平衡的必要吗?答案是必定的,因为Kafka负载不均的次要问题存在于服务端而不是客户端。

二、 Kafka 服务端为什么要做负载平衡

咱们先来看一下Kafka集群的流量散布(图1)以及新上线机器后集群的流量散布(图2):

从图1能够看出资源组内各broker的流量散布并不是很平衡,而且因为局部topic分区集中散布在某几个broker上,当topic流量突增的时候,会呈现只有局部broker流量突增。

这种状况下,咱们就须要扩容topic分区或手动执行迁挪动操作。

图2是咱们Kafka集群的一个资源组扩容后的流量散布状况,流量无奈主动的摊派到新扩容的节点上。此时,就须要咱们手动的触发数据迁徙,从而能力把流量引到新扩容的节点上。

2.1 Kafka 存储构造

为什么会呈现上述的问题呢?这个就须要从Kafka的存储机制说起。

下图是Kafka topic的存储构造,其具体层级构造形容如下:

  1. 每个broker节点能够通过logDirs配置项指定多个log目录,咱们线上机器共有12块盘,每块盘都对应一个log目录。
  2. 每个log目录下会有若干个[topic]-[x]字样的目录,该目录用于存储指定topic指定分区的数据,对应的如果该topic是3正本,那在集群的其余broker节点上会有两个和该目录同名的目录。
  3. 客户端写入kafka的数据最终会依照工夫程序成对的生成.index、.timeindex、.snapshot以及.log文件,这些文件保留在对应的topic分区目录下。
  4. 为了实现高可用目标,咱们线上的topic个别都是2正本/3正本,topic分区的每个正本都散布在不同的broker节点上,有时为了升高机架故障带来的危险,topic分区的不同正本也会被要求调配在不同机架的broker节点上。

理解完Kafka存储机制之后,咱们能够清晰的理解到,客户端写入Kafka的数据会依照topic分区被路由到broker的不同log目录下,只有咱们不人工干预,那每次路由的后果都不会扭转。因为每次路由后果都不会扭转,那么问题来了

随着topic数量一直增多,每个topic的分区数量又不统一,最终就会呈现topic分区在Kafka集群内调配不均的状况。

比方:topic1是10个分区、topic2是15个分区、topic3是3个分区,咱们集群有6台机器。那6台broker上总会有4台broker有两个topic1的分区,有3台broke上有3个topic3分区等等。

这样的问题就会导致分区多的broker上的出入流量可能要比其余broker上要高,如果要思考同一topic不同分区流量不统一、不同topic流量又不统一,再加上咱们线上有7000个topic、13万个分区、27万个正本等等这些。

这么简单的状况下,集群内总会有broker负载特地高,有的broker负载特地低,当broker负载高到肯定的时候,此时就须要咱们的运维同学染指进来了,咱们须要帮这些broker减减压,从而间接的晋升集群总体的负载能力。

集群整体负载都很高,业务流量会持续增长的时候,咱们会往集群内扩机器。有些同学想扩机器是坏事呀,这会有什么问题呢?问题和下面是一样的,因为发往topic分区的数据,其路由后果不会扭转,如果没有人工干预的话,那新扩进来机器的流量就始终是0,集群内原来的broker负载仍然得不到加重。

三、如何对 Kafka 做负载平衡

3.1 人工生成迁徙打算和迁徙

如下图所示,咱们模仿一个简略的场景,其中的T0-P0-R0示意topic-分区-正本,假如topic各分区流量雷同,假如每个分区R0正本是leader。

咱们能够看到,有两个topic T0和T1,T0是5分区2正本(出入流量为10和5),T1是3分区2正本(出入流量为5和1),如果严格思考机架的话,那topic正本的散布可能如下:

假如咱们当初新扩入一台broker3(Rack2),如下图所示:因为之前思考了topic在机架上的散布,所以从整体上看,broker2的负载要高一些。

咱们当初想把broker2上的一些分区迁徙到新扩进来的broker3上,综合思考机架、流量、正本个数等因素,咱们将T0-P2-R0、T0-P3-R1、T0-P4-R0、T1-P0-R1四个分区迁徙到broker3上。

看起来还不是很平衡,咱们再将T1-P2分区切换一下leader:

经验一番折腾后,整个集群就平衡许多了,对于下面迁徙正本和leader切换的命令参考如下:

Kafka 正本迁徙脚本

# 正本迁徙脚本:kafka-reassign-partitions.sh
# 1. 配置迁徙文件
$ vi topic-reassignment.json
{"version":1,"partitions":[
{"topic":"T0","partition":2,"replicas":[broker3,broker1]},
{"topic":"T0","partition":3,"replicas":[broker0,broker3]},
{"topic":"T0","partition":4,"replicas":[broker3,broker1]},
{"topic":"T1","partition":0,"replicas":[broker2,broker3]},
{"topic":"T1","partition":2,"replicas":[broker2,broker0]}
]}
# 2. 执行迁徙命令
bin/kafka-reassign-partitions.sh --throttle 73400320 --zookeeper zkurl --execute --reassignment-json-file topic-reassignment.json
# 3. 查看迁徙状态/革除限速配置
bin/kafka-reassign-partitions.sh --zookeeper zkurl --verify --reassignment-json-file topic-reassignment.json

3.2 应用负载平衡工具-cruise control

通过对Kafka存储构造、人工干预topic分区散布等的理解后,咱们能够看到Kafka运维起来是十分繁琐的,那有没有一些工具能够帮忙咱们解决这些问题呢?

答案是必定的。

cruise control是LinkedIn针对Kafka集群运维艰难问题而开发的一个我的项目,cruise control可能对Kafka集群各种资源进行动静负载平衡,这些资源包含:CPU、磁盘使用率、入流量、出流量、正本散布等,同时cruise control也具备首选leader切换和topic配置变更等性能。

3.2.1 cruise cotnrol 架构

咱们先简略介绍下cruise control的架构。

如下图所示,其次要由Monitor、Analyzer、Executor和Anomaly Detector 四局部组成:

(1)Monitor

Monitor分为客户端Metrics Reporter和服务端Metrics Sampler:

  • Metrics Reporter实现了Kafka的指标上报接口MetricsReporter,以特定的格局将原生的Kafka指标上报到topic \_\_CruiseControlMetrics中。
  • Metrics Sampler从\_\_CruiseControlMetrics中获取原生指标后依照broker和分区级指标别离进行聚合,聚合后的指标蕴含了broker、分区负载的均值、最大值等统计值,这些两头后果将被发送topic \_\_KafkaCruiseControlModelTrainingSamples和\_\_KafkaCruiseControlPartitionMetricSamples中;

(2)Analyzer

Analyzer作为cruise control的外围局部,它依据用户提供的优化指标和基于Monitor生成的集群负载模型生成迁徙打算。

在cruise control中,“用户提供的优化指标”包含硬性指标和软性指标两大类,硬性指标是Analyzer在做预迁徙的时候必须满足的一类指标(例如:正本在迁徙后必须满足机架分散性准则),软性指标则是尽可能要达到的指标,如果某一副本在迁徙后只能同时满足硬性指标和软性指标中的一类,则以硬性指标为主,如果存在硬性指标无奈满足的状况则本次剖析失败。

Analyzer可能须要改良的中央:

  1. 因为Monitor生成的是整个集群的负载模型,咱们的Kafka平台将Kafka集群划分为多个资源组,不同资源组的资源利用率存在很大差异,所以原生的集群负载模型不再实用于咱们的利用场景。
  2. 大多数业务没有指定key进行生产,所以各分区的负载偏差不大。如果topic分区正本均匀分布在资源组内,则资源组也随之变得平衡。
  3. 原生的cruise control会从集群维度来开展平衡工作,指定资源组后能够从资源组维度开展平衡工作,但无奈满足跨资源组迁徙的场景。

(3)Executor

Executor作为一个执行者,它执行Analyzer剖析失去的迁徙打算。它会将迁徙打算以接口的模式分批提交到Kafka集群上,后续Kafka会依照提交上来的迁徙脚本执行正本迁徙。

Executor可能须要改良的中央:

cruise control 在执行正本迁徙类的性能时,不能触发集群首选leader切换:有时在集群平衡过程中呈现了宕机重启,以问题机器作为首选leader的分区,其leader不能主动切换回来,造成集群内其余节点压力陡增,此时往往会产生连锁反应。

(4)Anomaly Detector

Anomaly Detector是一个定时工作,它会定期检测Kafka集群是否不平衡或者是否有正本缺失这些异常情况,当Kafka集群呈现这些状况后,Anomaly Detector会主动触发一次集群内的负载平衡。

在前面的次要性能形容中,我会次要介绍MonitorAnalyzer的解决逻辑。

3.2.2 平衡 broker 出入流量 / 机器高低线平衡

对于Kafka集群内各broker之间流量负载不均的起因、示意图以及解决方案,咱们在下面曾经介绍过了,那么cruise control是如何解决这个问题的

其实cruise control平衡集群的思路和咱们手动去平衡集群的思路大体一致,只不过它须要Kafka集群具体的指标数据,以这些指标为根底,去计算各broker之间的负载差距,并依据咱们关注的资源去做剖析,从而得出最终的迁徙打算。

以topic分区leader正本这类资源为例:

服务端在接管到平衡申请后,Monitor会先依据缓存的集群指标数据构建一个可能形容整个集群负载散布的模型。

下图简略形容了整个集群负载信息的生成过程,smaple fetcher线程会将获取到的原生指标加载成可读性更好的Metric Sample,并对其进行进一步的加工,失去带有brokerid、partition分区等信息的统计指标,这些指标保留在对应broker、replica的load属性中,所以broker和repilca会蕴含流量负载、存储大小、以后正本是否是leader等信息。

Analyzer 会遍历咱们指定的broker(默认是集群所有的broker),因为每台broker及其上面的topic分区正本都有具体的指标信息,剖析算法间接依据这些指标和指定资源对broker进行排序。

本例子的资源就是topic分区leader正本数量,接着Analyzer会依据咱们提前设置的最大/最小阈值、离散因子等来判断以后broker上某topic的leader正本数量是否须要减少或缩减,如果是减少,则变更clustermodel将负载比拟高的broker上对应的topic leader正本迁徙到以后broker上,反之亦然,在前面的革新点中,咱们会对Analyzer的工作过程做简略的形容。

遍历过所有broker,并且针对咱们指定的所有资源都进行剖析之后,就得出了最终版的clustermodel,再与咱们最后生成的clustermodel比照,就生成了topic迁徙打算。

cruise control会依据咱们指定的迁徙策略分批次的将topic迁徙打算提交给kafka集群执行。

迁徙打算示意图如下:

3.2.3 首选 leader 切换

切换非首选leader正本,迁徙打算示意图如下:

3.2.4 topic配置变更

扭转topic正本个数,迁徙打算示意图如下:

3.3 革新 cruise control

3.3.1 指定资源组进行平衡

当集群规模十分宏大的时候,咱们想要平衡整个集群就变得十分艰难,往往平衡一次就须要半个月甚至更长时间,这在无形之中也加大了咱们运维同学的压力。

针对这个场景,咱们对cruise control也进行了革新,咱们从逻辑上将Kafka集群划分成多个资源组,使得业务领有本人的资源组,当某个业务呈现流量稳定的时候,不会影响到其余的业务。

通过指定资源组,咱们每次只须要对集群的一小部分或多个局部进行平衡即可,大大缩短了平衡的工夫,使得平衡的过程更加可控。

革新后的cruise control能够做到如下几点:

  1. 通过平衡参数,咱们能够只平衡某个或多个资源组的broker。
  2. 更改topic配置,比方减少topic正本时,新扩的正本须要和topic原先的正本在同一个资源组内。
  3. 在资源组内剖析broker上的资源是迁入还是迁出。对于每一类资源指标,cruise control是计算资源组范畴内的统计指标,而后联合阈值和离散因子来剖析broker是迁出资源还是迁入资源。

如下图所示,咱们将集群、资源组、以及资源组下的topic这些元数据保留在数据库中,Analyzer得以在指定的资源组范畴内,对每个broker依照资源散布指标做平衡剖析。

例如:当对broker-0做平衡剖析的时候,Analyzer会遍历goals列表,每个goals负责一类资源负载指标(cpu、入流量等),当平衡剖析达到goal-0的时候,goal-0会判断broker-0的负载是否超出下限阈值,如果超出,则须要将broker-0的一些topic正本迁徙到负载较低的broker上;反之,则须要将其余broker上的正本迁徙到broker-0上。

其中,上面的recheck goals是排在前面的goal在做平衡剖析的时候,在更新cluster model之前会判断本次迁徙会不会与之前的goal抵触,如果抵触,那就不更新cluster model,以后的goal会持续尝试往其余broker上迁徙,直到找到适宜的迁徙指标,而后更新cluster model。

3.3.2 topic/topic分区往指定broker上迁徙

思考这些场景:

  1. 一个我的项目下会有几个资源组,因为业务变更,业务想要把A资源组下的topic迁徙到B资源组。
  2. 业务想要把公共资源组的topic迁徙到C资源组。
  3. 平衡实现之后,发现总有几个topic/分区散布不是很平均。

面对这些场景,咱们下面指定资源组进行平衡的性能就满足不了咱们的需要了。所以,咱们针对上述场景革新后的cruise control能够做到如下几点:

  1. 只平衡指定的topic或topic分区;
  2. 平衡的topic或topic分区只往指定的broker上迁徙。

3.3.3 新增指标剖析——topic分区leader正本分散性

业务方大多都是没有指定key进行发送数据的,所以同一topic每个分区的流量、存储都是靠近的,即每一个topic的各个分区的leader正本尽可能平均的散布在集群的broker上时,那集群的负载就会很平均。

有同学会问了,topic分区数并不总是可能整除broker数量,那最初各broker的负载不还是不统一嘛?

答案是必定的,只通过分区的leader正本还不能做到最终的平衡。

针对上述场景革新后的cruise control能够做到如下几点:

  1. 新增一类资源剖析:topic分区leader正本分散性。
  2. 首先保障每个topic的leader正本和follower正本尽可能的均匀分布在资源组的broker上。
  3. 在2的根底上,正本会尽可能的往负载较低的broker上散布。

如下图所示,针对每一个topic的正本,Analyzer会顺次计算以后broker的topic leader数是否超过阈值下限,如果超过,则Analyzer会依照topic的leader正本数量、topic的follower正本数量、broker的出流量负载等来选出AR中的follower正本作为新的leader进行切换,如果AR正本中也没有符合要求的broker,则会抉择AR列表以外的broker。

3.3.4 最终平衡成果

下图是某个资源组平衡后的流量散布,各节点间流量偏差十分小,这种状况下,既能够加强集群扛住流量异样突增的能力又能够晋升集群整体资源利用率和服务稳定性,降低成本。

3.4 装置/部署cruise control

3.4.1 客户端部署:指标采集

【步骤1】:创立Kafka账号,用于前面生产和生产指标数据

【步骤2】:创立3个Kafka外部topic:a是用来存储Kafka服务原生jmx指标;b和c别离是用来存储cruise control解决过后的分区和模型指标;

【步骤3】:给步骤1创立的账号授予读/写以及集群的操作权限,用于读/写步骤2创立的topic;

【步骤4】:批改kafka的server.properties,减少如下配置:

在Kafka服务上配置采集程序

# 批改kafka的server.properties
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
cruise.control.metrics.reporter.bootstrap.servers=域名:9092
 
cruise.control.metrics.reporter.security.protocol=SASL_PLAINTEXT
cruise.control.metrics.reporter.sasl.mechanism=SCRAM-SHA-256
cruise.control.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ys\" password=\"ys\";

【步骤5】:增加cruise-control-metrics-reporter的jar包到Kafka的lib目录下:mv cruise-control-metrics-reporter-2.0.104-SNAPSHOT.jar kafka\_dir/lib/;

【步骤6】:重启Kafka服务。

3.4.2 服务端部署:指标聚合/平衡剖析

(1)到https://github.com/linkedin/cruise-control 下载zip文件并解压;

(2)将本人本地cruise control子模块下生成的jar包替换cruise control的:mv cruise-control-2.0.xxx-SNAPSHOT.jar cruise-control/build/libs;

(3)批改cruise control配置文件,次要关注如下配置:

# 批改cruise control配置文件
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ys\" password=\"ys\";
bootstrap.servers=域名:9092
zookeeper.connect=zkURL

(4)批改数据库连贯配置:

# 集群id
cluster_id=xxx  
db_url=jdbc:mysql://hostxxxx:3306/databasexxx
db_user=xxx
db_pwd=xxx

四、总结

通过以上的介绍,咱们能够看出Kafka存在比拟显著的两个缺点:

  1. Kafka每个partition replica与机器的磁盘绑定,partition replica由一系列的Segment组成,所以往往单分区存储会占用比拟大的磁盘空间,对于磁盘会有很大压力。
  2. 在集群扩容broker时必须做Rebalance,须要broker有良好的执行流程,保障没有任何故障的状况下使得各broker负载平衡。

cruise control就是针对Kafka集群运维艰难问题而诞生的,它可能很好的解决kafka运维艰难的问题。

参考文章:

  1. linkedIn/cruise-control
  2. Introduction to Kafka Cruise Control
  3. Cloudera Cruise Control REST API Reference
  4. http://dockone.io/article/2434664
  5. . https://www.zhenchao.org/2019/06/22/kafka/kafka-log-manage/

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理