关于kafka:kafka运维Kafka全网最全最详细运维命令合集精品强烈建议收藏

85次阅读

共计 21577 个字符,预计需要花费 54 分钟才能阅读完成。

首发公众号: 石臻臻的杂货铺 ID:jjdlmn
集体 wx: jjdlmn_

以下大部分运维操作, 都能够应用 LogI-Kafka-Manager 在平台上可视化操作;

@[TOC]

1.TopicCommand

1.1.Topic 创立

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


相干可选参数

参数 形容 例子
--bootstrap-server 指定 kafka 服务 指定连贯到的 kafka 服务; 如果有这个参数, 则 --zookeeper能够不须要 –bootstrap-server localhost:9092
--zookeeper 弃用, 通过 zk 的连贯形式连贯到 kafka 集群; –zookeeper localhost:2181 或者 localhost:2181/kafka
--replication-factor 正本数量, 留神不能大于 broker 数量; 如果不提供, 则会用集群中默认配置 –replication-factor 3
--partitions 分区数量, 当创立或者批改 topic 的时候, 用这个来指定分区数; 如果创立的时候没有提供参数, 则用集群中默认值; 留神如果是批改的时候, 分区比之前小会有问题 –partitions 3
--replica-assignment 正本分区调配形式; 创立 topic 的时候能够本人指定正本分配情况; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个正本, 对应调配的 Broker; 逗号隔开标识分区; 冒号隔开示意正本
--config <String: name=value> 用来设置 topic 级别的配置以笼罩默认配置;只在 –create 和 –bootstrap-server 同时应用时候失效; 能够配置的参数列表请看文末附件 例如笼罩两个配置 --config retention.bytes=123455 --config retention.ms=600001
--command-config <String: command 文件门路 > 用来配置客户端 Admin Client 启动配置,只在 –bootstrap-server 同时应用时候失效; 例如: 设置申请的超时工夫 --command-config config/producer.proterties ; 而后在文件中配置 request.timeout.ms=300000

1.2. 删除 Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


反对正则表达式匹配 Topic 来进行删除, 只须要将 topic 用双引号包裹起来
例如: 删除以 create_topic_byhand_zk 为结尾的 topic;

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”
.示意任意匹配除换行符 \n 之外的任何单字符。要匹配 .,请应用 .。
·*·:匹配后面的子表达式零次或屡次。要匹配 * 字符,请应用 *。
.* : 任意字符

删除任意 Topic (慎用)

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”

更多的用法请参考正则表达式

1.3.Topic 分区扩容

zk 形式(不举荐)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka 版本 >= 2.2 反对上面形式(举荐)

单个 Topic 扩容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量扩容 (将所有正则表达式匹配到的 Topic 分区扩容到 4 个)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正则表达式的意思是匹配所有; 您可按需匹配

PS: 当某个 Topic 的分区少于指定的分区数时候, 他会抛出异样; 然而不会影响其余 Topic 失常进行;


相干可选参数

参数 形容 例子
--replica-assignment 正本分区调配形式; 创立 topic 的时候能够本人指定正本分配情况; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个正本, 对应调配的 Broker; 逗号隔开标识分区; 冒号隔开示意正本

PS: 尽管这里配置的是全副的分区正本调配配置, 然而正在失效的是新增的分区;
比方: 以前 3 分区 1 正本是这样的

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2

当初新增一个分区,--replica-assignment 2,1,3,4 ; 看这个意思如同是把 0,1 号分区相互换个 Broker

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

然而实际上不会这样做,Controller 在解决的时候会把后面 3 个截掉; 只取新增的分区调配形式, 原来的还是不会变

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

1.4. 查问 Topic 形容

1. 查问单个 Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2. 批量查问 Topic(正则表达式匹配, 上面是查问所有 Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

反对正则表达式匹配 Topic, 只须要将 topic 用双引号包裹起来


相干可选参数

参数 形容 例子
--bootstrap-server 指定 kafka 服务 指定连贯到的 kafka 服务; 如果有这个参数, 则 --zookeeper能够不须要 –bootstrap-server localhost:9092
--at-min-isr-partitions 查问的时候省略一些计数和配置信息 --at-min-isr-partitions
--exclude-internal 排除 kafka 外部 topic, 比方__consumer_offsets-* --exclude-internal
--topics-with-overrides 仅显示已笼罩配置的主题, 也就是独自针对 Topic 设置的配置笼罩默认配置;不展现分区信息 --topics-with-overrides

5. 查问 Topic 列表

1. 查问所有 Topic 列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2. 查问匹配 Topic 列表(正则表达式)

查问 test_create_ 结尾的所有 Topic 列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"


相干可选参数

参数 形容 例子
--exclude-internal 排除 kafka 外部 topic, 比方__consumer_offsets-* --exclude-internal
--topic 能够正则表达式进行匹配, 展现 topic 名称 --topic

2.ConfigCommand

Config 相干操作; 动静配置能够笼罩默认的动态配置;

2.1 查问配置

Topic 配置查问

展现对于 Topic 的动动态配置

1. 查问单个 Topic 配置(只列举动静配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic
或者
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2. 查问所有 Topic 配置(包含外部 Topic)(只列举动静配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3. 查问 Topic 的具体配置(动静 + 动态)

只须要加上一个参数--all

其余配置 /clients/users/brokers/broker-loggers 的查问

同理;只须要将--entity-type 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

查问 kafka 版本信息

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

<font color=red> 所有可配置的动静配置 请看最初面的 附件 局部 </font>

2.2 增删改 配置 --alter

–alter

删除配置 : --delete-config k1=v1,k2=v2
增加 / 批改配 置: --add-config k1,k2
抉择类型: --entity-type (topics/clients/users/brokers/broker-

                                     loggers)

类型名称: --entity-name

Topic 增加 / 批改动静配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic 删除动静配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

其余配置同理, 只须要类型改下--entity-type

类型有: (topics/clients/users/brokers/broker- loggers)

<font color=red> 哪些配置能够批改 请看最初面的附件:ConfigCommand 的一些可选配置 </font>

3. 正本扩缩、分区迁徙、跨门路迁徙 kafka-reassign-partitions

请戳 [【kafka 运维】正本扩缩容、数据迁徙、正本重调配、正本跨门路迁徙]() (如果点不进去, 示意文章暂未发表, 请急躁期待)

4.Topic 的发送 kafka-console-producer.sh

4.1 生产无 key 音讯

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生产有 key 音讯
加上属性--property parse.key=true

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

<font color=red> 默认音讯 key 与音讯 value 间应用“Tab 键”进行分隔,所以音讯 key 以及 value 中切勿应用转义字符(\t)</font>


可选参数

参数 值类型 阐明 有效值
–bootstrap-server String 要连贯的服务器必须(除非指定 –broker-list) 如:host1:prot1,host2:prot2
–topic String (必须)接管音讯的主题名称
–batch-size Integer 单个批处理中发送的音讯数 200(默认值)
–compression-codec String 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd
–max-block-ms Long 在发送申请期间,生产者将阻止的最长工夫 60000(默认值)
–max-memory-bytes Long 生产者用来缓冲期待发送到服务器的总内存 33554432(默认值)
–max-partition-memory-bytes Long 为分区调配的缓冲区大小 16384
–message-send-max-retries Integer 最大的重试发送次数 3
–metadata-expiry-ms Long 强制更新元数据的工夫阈值(ms) 300000
–producer-property String 将自定义属性传递给生成器的机制 如:key=value
–producer.config String 生产者配置属性文件 [–producer-property] 优先于此配置 配置文件残缺门路
–property String 自定义音讯读取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
–request-required-acks String 生产者申请的确认形式 0、1(默认值)、all
–request-timeout-ms Integer 生产者申请的确认超时工夫 1500(默认值)
–retry-backoff-ms Integer 生产者重试前,刷新元数据的等待时间阈值 100(默认值)
–socket-buffer-size Integer TCP 接管缓冲大小 102400(默认值)
–timeout Integer 音讯排队异步期待解决的工夫阈值 1000(默认值)
–sync 同步发送音讯
–version 显示 Kafka 版本 不配合其余参数时,显示为本地 Kafka 版本
–help 打印帮忙信息

5. Topic 的生产 kafka-console-consumer.sh

1. 新客户端从头生产--from-beginning (留神这里是新客户端, 如果之前曾经生产过了是不会从头生产的)
上面没有指定客户端名称, 所以每次执行都是新客户端都会从头生产

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

2. 正则表达式匹配 topic 进行生产 --whitelist
生产所有的 topic

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’

生产所有的 topic,并且还从头生产
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’ –from-beginning

3. 显示 key 进行生产--property print.key=true

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –property print.key=true

4. 指定分区生产--partition 指定起始偏移量生产--offset

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –partition 0 –offset 100

5. 给客户端命名--group

留神给客户端命名之后, 如果之前有过生产,那么 --from-beginning 就不会再从头生产了

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –group test-group

6. 增加客户端属性--consumer-property

这个参数也能够给客户端增加属性, 然而留神 不能多个中央配置同一个属性, 他们是互斥的; 比方在上面的根底上还加上属性--group test-group 那必定不行

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test --consumer-property group.id=test-consumer-group

7. 增加客户端属性--consumer.config

--consumer-property 一样的性质, 都是增加客户端的属性, 不过这里是指定一个文件, 把属性写在文件外面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –consumer.config config/consumer.properties


参数 形容 例子
--group 指定消费者所属组的 ID
--topic 被生产的 topic
--partition 指定分区;除非指定 –offset,否则从分区完结(latest) 开始生产 --partition 0
--offset 执行生产的起始 offset 地位 ; 默认值: latest; /latest /earliest / 偏移量 --offset 10
--whitelist 正则表达式匹配 topic;--topic就不必指定了; 匹配到的所有 topic 都会生产; 当然用了这个参数,--partition --offset等就不能应用了
--consumer-property 将用户定义的属性以 key=value 的模式传递给使用者 --consumer-property group.id=test-consumer-group
--consumer.config 消费者配置属性文件请留神,[consumer-property]优先于此配置 --consumer.config config/consumer.properties
--property 初始化音讯格式化程序的属性 print.timestamp=true,false、print.key=true,false、print.value=true,false、key.separator=<key.separator>、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 从存在的最早音讯开始,而不是从最新消息开始, 留神如果配置了客户端名称并且之前生产过,那就不会从头生产了
--max-messages 生产的最大数据量,若不指定,则继续生产上来 --max-messages 100
--skip-message-on-error 如果解决音讯时出错,请跳过它而不是暂停
--isolation-level 设置为 read_committed 以过滤掉未提交的事务性音讯, 设置为 read_uncommitted 以读取所有音讯, 默认值:read_uncommitted
--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

6.kafka-leader-election Leader 从新选举

6.1 指定 Topic 指定分区用从新PREFERRED:优先正本策略 进行 Leader 重选举


> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有 Topic 所有分区用从新PREFERRED:优先正本策略 进行 Leader 重选举

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

6.3 设置配置文件批量指定 topic 和分区进行 Leader 重选举

先配置 leader-election.json 文件


{
  "partitions": [
    {
      "topic": "test_create_topic4",
      "partition": 1
    },
    {
      "topic": "test_create_topic4",
      "partition": 2
    }
  ]
}

 sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
 

相干可选参数

参数 形容 例子
--bootstrap-server 指定 kafka 服务 指定连贯到的 kafka 服务 –bootstrap-server localhost:9092
--topic 指定 Topic,此参数跟 --all-topic-partitionspath-to-json-file 三者互斥
--partition 指定分区, 跟 --topic 搭配应用
--election-type 两个选举策略 (PREFERRED: 优先正本选举, 如果第一个正本不在线的话会失败;UNCLEAN: 策略)
--all-topic-partitions 所有 topic 所有分区执行 Leader 重选举; 此参数跟 --topicpath-to-json-file 三者互斥
--path-to-json-file 配置文件批量选举,此参数跟 --topicall-topic-partitions 三者互斥

7. 继续批量推送音讯 kafka-verifiable-producer.sh

单次发送 100 条音讯--max-messages 100

一共要推送多少条,默认为 -1,- 1 示意始终推送到过程敞开地位

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --max-messages 100

每秒发送最大吞吐量不超过音讯 --throughput 100

推送音讯时的吞吐量,单位 messages/sec。默认为 -1,示意没有限度

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --throughput 100

发送的音讯体带前缀--value-prefix

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --value-prefix 666

留神 --value-prefix 666 必须是整数, 发送的音讯体的格局是加上一个 点号. 例如:666.

其余参数:
--producer.config CONFIG_FILE 指定 producer 的配置文件
--acks ACKS 每次推送音讯的 ack 值,默认是 -1

8. 继续批量拉取音讯 kafka-verifiable-consumer

继续生产

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4

单次最大生产 10 条音讯--max-messages 10

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4 --max-messages 10


相干可选参数

参数 形容 例子
--bootstrap-server 指定 kafka 服务 指定连贯到的 kafka 服务; –bootstrap-server localhost:9092
--topic 指定生产的 topic
--group-id 消费者 id;不指定的话每次都是新的组 id
group-instance-id 生产组实例 ID, 惟一值
--max-messages 单次最大生产的音讯数量
--enable-autocommit 是否开启 offset 主动提交;默认为 false
--reset-policy 当以前没有生产记录时,抉择要拉取 offset 的策略,能够是earliest, latest,none。默认是 earliest
--assignment-strategy consumer 调配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor
--consumer.config 指定 consumer 的配置文件

9. 生产者压力测试 kafka-producer-perf-test.sh

1. 发送 1024 条音讯 --num-records 100 并且每条音讯大小为 1KB--record-size 1024 最大吞吐量每秒 10000 条--throughput 100

sh bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100000 –producer-props bootstrap.servers=localhost:9092 –record-size 1024

你能够通过 LogIKM 查看分区是否减少了对应的数据大小

从 LogIKM 能够看到发送了 1024 条音讯; 并且总数据量 =1M; 1024 条 *1024byte = 1M;

2. 用指定音讯文件 --payload-file 发送 100 条音讯最大吞吐量每秒 100 条--throughput 100

  1. 先配置好消息文件batchmessage.txt
  2. 而后执行命令
    发送的音讯会从batchmessage.txt 外面随机抉择; 留神这里咱们没有用参数 --payload-delimeter 指定分隔符,默认分隔符是 \n 换行;

    bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100 –producer-props bootstrap.servers=localhost:9090 –payload-file config/batchmessage.txt

  3. 验证音讯,能够通过 LogIKM 查看发送的音讯


相干可选参数

参数 形容 例子
--topic 指定生产的 topic
--num-records 发送多少条音讯
--throughput 每秒音讯最大吞吐量
--producer-props 生产者配置, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config 生产者配置文件 --producer.config config/producer.propeties
--print-metrics 在 test 完结的时候打印监控信息, 默认 false --print-metrics true
--transactional-id 指定事务 ID,测试并发事务的性能时须要,只有在 –transaction-duration-ms > 0 时失效,默认值为 performance-producer-default-transactional-id
--transaction-duration-ms 指定事务继续的最长工夫,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0
--record-size 一条音讯的大小 byte; 和 –payload-file 两个中必须指定一个,但不能同时指定
--payload-file 指定音讯的起源文件,只反对 UTF-8 编码的文本文件,文件的音讯分隔符通过 --payload-delimeter 指定, 默认是用换行 \nl 来宰割的,和 –record-size 两个中必须指定一个,但不能同时指定 ; 如果提供的音讯
--payload-delimeter 如果通过 --payload-file 指定了从文件中获取音讯内容,那么这个参数的意义是指定文件的音讯分隔符,默认值为 \n,即文件的每一行视为一条音讯;如果未指定 --payload-file 则此参数不失效;发送音讯的时候是随机送文件外面抉择音讯发送的;

10. 消费者压力测试 kafka-consumer-perf-test.sh

生产 100 条音讯 --messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 –bootstrap-server localhost:9090 –messages 100


相干可选参数

参数 形容 例子
--bootstrap-server
--consumer.config 消费者配置文件
--date-format 后果打印进去的工夫格式化 默认:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 单次申请获取数据的大小 默认 1048576
--topic 指定生产的 topic
--from-latest
--group 生产组 ID
--hide-header 如果设置了, 则不打印 header 信息
--messages 须要生产的数量
--num-fetch-threads feth 数据的线程数 默认:1
--print-metrics 完结的时候打印监控数据
--show-detailed-stats
--threads 生产线程数; 默认 10

11. 删除指定分区的音讯 kafka-delete-records.sh

删除指定 topic 的某个分区的音讯删除至 offset 为 1024

先配置 json 文件offset-json-file.json

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}

在执行命令

sh bin/kafka-delete-records.sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file.json

验证 通过 LogIKM 查看发送的音讯


从这里能够看进去, 配置"offset": 1024 的意思是从最开始的中央删除音讯到 1024 的 offset; 是从最后面开始删除的

12. 查看 Broker 磁盘信息

查问指定 topic 磁盘信息 --topic-list topic1,topic2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxx:9090 –describe –topic-list test2

查问指定 Broker 磁盘信息--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxxx:9090 –describe –topic-list test2 –broker-list 0

例如我一个 3 分区 3 正本的 Topic 的查出来的信息
logDir Broker 中配置的log.dir

{
    "version": 1,
    "brokers": [{
        "broker": 0,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 1,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 2,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 3,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
            "error": null,
            "partitions": []}]
    }]
}

如果你感觉通过命令查问磁盘信息比拟麻烦,你也能够通过 LogIKM 查看

12. 消费者组治理 kafka-consumer-groups.sh

1. 查看消费者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

先调用 MetadataRequest 拿到所有在线 Broker 列表
再给每个 Broker 发送 ListGroupsRequest 申请获取 消费者组数据

2. 查看消费者组详情--describe

DescribeGroupsRequest

查看生产组详情--group--all-groups

查看指定生产组详情--group
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group


查看所有生产组详情 --all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
查看该生产组 生产的所有 Topic、及所在分区、最新生产 offset、Log 最新数据 offset、Lag 还未生产数量、消费者 ID 等等信息

查问消费者成员信息--members

所有生产组成员信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定生产组成员信息
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查问消费者状态信息--state

所有生产组状态信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定生产组状态信息
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 删除消费者组--delete

DeleteGroupsRequest

删除生产组 –delete

删除指定生产组 --group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
删除所有生产组--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除生产组前提是这个生产组的所有客户端都进行生产 / 不在线才可能胜利删除; 否则会报上面异样

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置生产组的偏移量 --reset-offsets

<font color=red> 可能执行胜利的一个前提是 生产组这会是不可用状态;</font>

上面的示例应用的参数是: --dry-run ; 这个参数示意预执行, 会打印进去将要解决的后果;
等你想真正执行的时候请换成参数--excute ;

上面示例 重置模式都是 --to-earliest 重置到最早的;

请依据须要参考上面 相干重置 Offset 的模式 换成其余模式;

重置指定生产组的偏移量 --group

重置指定生产组的所有 Topic 的偏移量 --all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定生产组的指定 Topic 的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有生产组的偏移量 --all-group

重置所有生产组的所有 Topic 的偏移量 --all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有生产组中指定 Topic 的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 前面须要接 重置的模式

相干重置 Offset 的模式

参数 形容 例子
--to-earliest : 重置 offset 到最开始的那条 offset(找到还未被删除最早的那个 offset)
--to-current: 间接重置 offset 到以后的 offset,也就是 LOE
--to-latest 重置到最初一个 offset
--to-datetime: 重置到指定工夫的 offset; 格局为:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000"
--to-offset 重置到指定的 offset, 然而通常状况下, 匹配到多个分区, 这里是将匹配到的所有分区都重置到这一个值; 如果 1. 指标最大 offset<--to-offset, 这个时候重置为指标最大 offset;2. 指标最小 offset>--to-offset,则重置为最小; 3. 否则的话才会重置为 --to-offset 的目标值; 个别不必这个 --to-offset 3465
--shift-by 依照偏移量减少或者缩小多少个 offset;正的为往前减少; 负的往后退;当然这里也是匹配所有的; --shift-by 100--shift-by -100
--from-file 依据 CVS 文档来重置; 这里上面独自解说

--from-file着重解说一下

下面其余的一些模式重置的都是匹配到的所有分区; 不可能每个分区重置到不同的 offset;不过 --from-file 能够让咱们更灵便一点;

  1. 先配置 cvs 文档
    格局为: Topic: 分区号: 重置指标偏移量

    test2,0,100
    test2,1,200
    test2,2,300
  2. 执行命令

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量delete-offsets

<font color=red> 可能执行胜利的一个前提是 生产组这会是不可用状态;</font>

偏移量被删除了之后,Consumer Group 下次启动的时候, 会从头生产;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


相干可选参数

参数 形容 例子
--bootstrap-server 指定连贯到的 kafka 服务; –bootstrap-server localhost:9092
--list 列出所有生产组名称 --list
--describe 查问消费者形容信息 --describe
--group 指定生产组
--all-groups 指定所有生产组
--members 查问生产组的成员信息
--state 查问消费者的状态信息
--offsets 在查问生产组形容信息的时候, 这个参数会列出音讯的偏移量信息; 默认就会有这个参数的;
dry-run 重置偏移量的时候, 应用这个参数能够让你事后看到重置状况,这个时候还没有真正的执行, 真正执行换成--excute; 默认为dry-run
--excute 真正的执行重置偏移量的操作;
--to-earliest 将 offset 重置到最早
to-latest 将 offset 重置到最近

附件

ConfigCommand 的一些可选配置


Topic 相干可选配置

key value 示例
cleanup.policy 清理策略
compression.type 压缩类型(通常倡议在 produce 端管制)
delete.retention.ms 压缩日志的保留工夫
file.delete.delay.ms
flush.messages 长久化 message 限度
flush.ms 长久化频率
follower.replication.throttled.replicas flowwer 正本限流 格局:分区号: 正本 follower 号, 分区号: 正本 follower 号 0:1,1:1
index.interval.bytes
leader.replication.throttled.replicas leader 正本限流 格局:分区号: 正本 Leader 号 0:0
max.compaction.lag.ms
max.message.bytes 最大的 batch 的 message 大小
message.downconversion.enable message 是否向下兼容
message.format.version message 格局版本
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas 最小的 ISR
preallocate
retention.bytes 日志保留大小(通常依照工夫限度)
retention.ms 日志保留工夫
segment.bytes segment 的大小限度
segment.index.bytes
segment.jitter.ms
segment.ms segment 的切割工夫
unclean.leader.election.enable 是否容许非同步正本选主

Broker 相干可选配置

key value 示例
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

Users 相干可选配置

key value 示例
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate 针对消费者 user 进行限流
producer_byte_rate 针对生产者进行限流
request_percentage 申请百分比

clients 相干可选配置

key value 示例
consumer_byte_rate
producer_byte_rate
request_percentage

以上大部分运维操作, 都能够应用 LogI-Kafka-Manager 在平台上可视化操作;


正文完
 0