本文由云 + 社区发表
一、困难点
建立 topic 的时候,可以通过指定参数 –replication-factor 设置备份数量。但是,一旦完成建立 topic,则无法通过 kafka-topic.sh 或者 命令修改 replica 数量。
二、解决办法
实际上,我们可以考虑一种“另类”的办法:可以利用 kafka-reassign-partitions.sh 命令对所有分区进行重新分布,在做分区重新分布的时候,通过增加每个分区的 replica 备份数量来达到目的。
本文将介绍如何利用 kafka-reassign-partitions.sh 命令增加 topic 的备份数量。
注意:以下命令使用到的 topic 名称、zookeeper 的 ip 和 port,需要读者替换成为实际集群的参数。
(假设 kafka 集群有 4 个 broker,id 分别为:1001,1002,1003,1004)
2.1、获取当前 topic 的所有分区分布在 broker 的情况
[root@tbds bin]# ./kafka-topics.sh –zookeeper 172.16.32.13:2181 –topic ranger_audits –describe
Topic:ranger_audits PartitionCount:10 ReplicationFactor:1 Configs:
Topic: ranger_audits Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 3 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 5 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 7 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 9 Leader: 1002 Replicas: 1002 Isr: 1002
可以看出,ranger_audits 这个 topic 有 10 个分区,每个分区只有一个 feplica 备份,分布在 1001 和 1002 两台 broker 上面。
下面我们需要将 ranger_audits 的每个分区数据都增加到 2 个 replica 备份,且分布到 4 个 broker 上面。
2.2、创建增加 replica 备份数量的配置文件
(注意:尽量保持 topic 的原有每个分区的主备份不变化。因此,配置文件的每个分区的第一个 broker 保持不变。)
[root@tbds bin]# vim ../config/increase-replication-factor.json
{“version”:1,
“partitions”:[
{“topic”:”ranger_audits”,”partition”:0,”replicas”:[1001,1003]},
{“topic”:”ranger_audits”,”partition”:1,”replicas”:[1002,1004]},
{“topic”:”ranger_audits”,”partition”:2,”replicas”:[1001,1003]},
{“topic”:”ranger_audits”,”partition”:3,”replicas”:[1002,1004]},
{“topic”:”ranger_audits”,”partition”:4,”replicas”:[1001,1003]},
{“topic”:”ranger_audits”,”partition”:5,”replicas”:[1002,1004]},
{“topic”:”ranger_audits”,”partition”:6,”replicas”:[1001,1003]},
{“topic”:”ranger_audits”,”partition”:7,”replicas”:[1002,1004]},
{“topic”:”ranger_audits”,”partition”:8,”replicas”:[1001,1003]},
{“topic”:”ranger_audits”,”partition”:9,”replicas”:[1002,1004]}
]}
上面的配置文件说明,我们将 topic 的每个分区都增加了一个 replica,且保持每个分区原有的主备份所在 broker 不变化,将每个分区新增的 replica 备份数据放到到 1003 和 1004 两个 broker 上面。
2.3、开始执行增加分区
[root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 –reassignment-json-file ../config/increase-replication-factor.json –execute
Current partition replica assignment
{“version”:1,”partitions”:[{“topic”:”ranger_audits”,”partition”:3,”replicas”:[1002]},{“topic”:”ranger_audits”,”partition”:9,”replicas”:[1002]},{“topic”:”ranger_audits”,”partition”:8,”replicas”:[1001]},{“topic”:”ranger_audits”,”partition”:1,”replicas”:[1002]},{“topic”:”ranger_audits”,”partition”:4,”replicas”:[1001]},{“topic”:”ranger_audits”,”partition”:2,”replicas”:[1001]},{“topic”:”ranger_audits”,”partition”:5,”replicas”:[1002]},{“topic”:”ranger_audits”,”partition”:0,”replicas”:[1001]},{“topic”:”ranger_audits”,”partition”:6,”replicas”:[1001]},{“topic”:”ranger_audits”,”partition”:7,”replicas”:[1002]}]}
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions
{“version”:1,”partitions”:[{“topic”:”ranger_audits”,”partition”:0,”replicas”:[1001,1003]},{“topic”:”ranger_audits”,”partition”:8,”replicas”:[1001,1003]},{“topic”:”ranger_audits”,”partition”:5,”replicas”:[1002,1004]},{“topic”:”ranger_audits”,”partition”:2,”replicas”:[1001,1003]},{“topic”:”ranger_audits”,”partition”:9,”replicas”:[1002,1004]},{“topic”:”ranger_audits”,”partition”:1,”replicas”:[1002,1004]},{“topic”:”ranger_audits”,”partition”:3,”replicas”:[1002,1004]},{“topic”:”ranger_audits”,”partition”:4,”replicas”:[1001,1003]},{“topic”:”ranger_audits”,”partition”:7,”replicas”:[1002,1004]},{“topic”:”ranger_audits”,”partition”:6,”replicas”:[1001,1003]}]}
2.4、查看执行进度
[root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 –reassignment-json-file ../config/increase-replication-factor.json –verify
Status of partition reassignment:
Reassignment of partition [ranger_audits,0] completed successfully
Reassignment of partition [ranger_audits,8] completed successfully
Reassignment of partition [ranger_audits,5] completed successfully
Reassignment of partition [ranger_audits,2] completed successfully
Reassignment of partition [ranger_audits,9] completed successfully
Reassignment of partition [ranger_audits,1] completed successfully
Reassignment of partition [ranger_audits,3] completed successfully
Reassignment of partition [ranger_audits,4] completed successfully
Reassignment of partition [ranger_audits,7] completed successfully
Reassignment of partition [ranger_audits,6] completed successfully
上面显示增加分区操作成功
2.5、再次查看 topic 的情况
[root@tbds bin]# ./kafka-topics.sh –zookeeper 172.16.32.13:2181 –topic ranger_audits –describe
Topic:ranger_audits PartitionCount:10 ReplicationFactor:2 Configs:
Topic: ranger_audits Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 1 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 2 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 3 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 4 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 5 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 6 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 7 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 8 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 9 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
从上面可以看出,备份数量增加成功
三、进一步思考
利用上述介绍的办法,除了可以用来增加 topic 的备份数量之外,还能够处理以下几个场景:
1、对 topic 的所有分区数据进行整体迁移。怎么理解呢?假如集群有 N 个 broker,后来新扩容 M 个 broker。由于新扩容的 broker 磁盘都是空的,原有的 broker 磁盘占用都很满。那么我们可以利用上述方法,将存储在原有 N 个 broker 的某些 topic 整体搬迁到新扩容的 M 个 broker,进而实现 kafka 集群的整体数据均衡。
具体使用方法就是:通过编写 2.2 章节的配置文件,将 topic 的所有分区都配置到新的 M 个 broker 上面去,再执行 excute,即可完成 topic 的所有分区数据整体迁移到新扩容的 M 个 broker 节点。
*2、broker 坏掉的情况。* 导致某些 topic 的某些分区的 replica 数量减少,可以利用 kafka-reassign-partitions.sh 增加 replica;
*3、kafka 某些 broker 磁盘占用很满,某些磁盘占用又很少。* 可以利用 kafka-reassign-partitions.sh 迁移某些 topic 的分区数据到磁盘占用少的 broker,实现数据均衡;
*4、kafka 集群扩容。* 需要把原来 broker 的 topic 数据整体迁移到新的 broker,合理利用新扩容的 broker,实现负载均衡。
此文已由作者授权腾讯云 + 社区在各渠道发布
获取更多新鲜技术干货,可以关注我们腾讯云技术社区 - 云加社区官方号及知乎机构号