共计 21577 个字符,预计需要花费 54 分钟才能阅读完成。
首发公众号: 石臻臻的杂货铺 ID:jjdlmn
集体 wx: jjdlmn_
以下大部分运维操作, 都能够应用 LogI-Kafka-Manager 在平台上可视化操作;
@[TOC]
1.TopicCommand
1.1.Topic 创立
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--bootstrap-server 指定 kafka 服务 |
指定连贯到的 kafka 服务; 如果有这个参数, 则 --zookeeper 能够不须要 |
–bootstrap-server localhost:9092 |
--zookeeper |
弃用, 通过 zk 的连贯形式连贯到 kafka 集群; | –zookeeper localhost:2181 或者 localhost:2181/kafka |
--replication-factor |
正本数量, 留神不能大于 broker 数量; 如果不提供, 则会用集群中默认配置 | –replication-factor 3 |
--partitions |
分区数量, 当创立或者批改 topic 的时候, 用这个来指定分区数; 如果创立的时候没有提供参数, 则用集群中默认值; 留神如果是批改的时候, 分区比之前小会有问题 | –partitions 3 |
--replica-assignment |
正本分区调配形式; 创立 topic 的时候能够本人指定正本分配情况; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个正本, 对应调配的 Broker; 逗号隔开标识分区; 冒号隔开示意正本 |
--config <String: name=value> |
用来设置 topic 级别的配置以笼罩默认配置;只在 –create 和 –bootstrap-server 同时应用时候失效; 能够配置的参数列表请看文末附件 | 例如笼罩两个配置 --config retention.bytes=123455 --config retention.ms=600001 |
--command-config <String: command 文件门路 > |
用来配置客户端 Admin Client 启动配置,只在 –bootstrap-server 同时应用时候失效; | 例如: 设置申请的超时工夫 --command-config config/producer.proterties ; 而后在文件中配置 request.timeout.ms=300000 |
1.2. 删除 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
反对正则表达式匹配 Topic 来进行删除, 只须要将 topic 用双引号包裹起来
例如: 删除以 create_topic_byhand_zk
为结尾的 topic;
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”
.
示意任意匹配除换行符 \n 之外的任何单字符。要匹配 .,请应用 .。·*·
:匹配后面的子表达式零次或屡次。要匹配 * 字符,请应用 *。.*
: 任意字符
删除任意 Topic (慎用)
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”
更多的用法请参考正则表达式
1.3.Topic 分区扩容
zk 形式(不举荐)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka 版本 >= 2.2 反对上面形式(举荐)
单个 Topic 扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量扩容 (将所有正则表达式匹配到的 Topic 分区扩容到 4 个)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?"
正则表达式的意思是匹配所有; 您可按需匹配
PS: 当某个 Topic 的分区少于指定的分区数时候, 他会抛出异样; 然而不会影响其余 Topic 失常进行;
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--replica-assignment |
正本分区调配形式; 创立 topic 的时候能够本人指定正本分配情况; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个正本, 对应调配的 Broker; 逗号隔开标识分区; 冒号隔开示意正本 |
PS: 尽管这里配置的是全副的分区正本调配配置, 然而正在失效的是新增的分区;
比方: 以前 3 分区 1 正本是这样的
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 |
当初新增一个分区,--replica-assignment
2,1,3,4 ; 看这个意思如同是把 0,1 号分区相互换个 Broker
Broker-1 | Broker-2 | Broker-3 | Broker-4 | |
---|---|---|---|---|
1 | 0 | 2 | 3 |
然而实际上不会这样做,Controller 在解决的时候会把后面 3 个截掉; 只取新增的分区调配形式, 原来的还是不会变
Broker-1 | Broker-2 | Broker-3 | Broker-4 | |
---|---|---|---|---|
0 | 1 | 2 | 3 |
1.4. 查问 Topic 形容
1. 查问单个 Topic
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
2. 批量查问 Topic(正则表达式匹配, 上面是查问所有 Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal
反对正则表达式匹配 Topic, 只须要将 topic 用双引号包裹起来
相干可选参数
参数 | 形容 | 例子 | |
---|---|---|---|
--bootstrap-server 指定 kafka 服务 |
指定连贯到的 kafka 服务; 如果有这个参数, 则 --zookeeper 能够不须要 |
–bootstrap-server localhost:9092 | |
--at-min-isr-partitions |
查问的时候省略一些计数和配置信息 | --at-min-isr-partitions |
|
--exclude-internal |
排除 kafka 外部 topic, 比方__consumer_offsets-* |
--exclude-internal |
|
--topics-with-overrides |
仅显示已笼罩配置的主题, 也就是独自针对 Topic 设置的配置笼罩默认配置;不展现分区信息 | --topics-with-overrides |
5. 查问 Topic 列表
1. 查问所有 Topic 列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
2. 查问匹配 Topic 列表(正则表达式)
查问
test_create_
结尾的所有 Topic 列表sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--exclude-internal |
排除 kafka 外部 topic, 比方__consumer_offsets-* |
--exclude-internal |
--topic |
能够正则表达式进行匹配, 展现 topic 名称 | --topic |
2.ConfigCommand
Config 相干操作; 动静配置能够笼罩默认的动态配置;
2.1 查问配置
Topic 配置查问
展现对于 Topic 的动动态配置
1. 查问单个 Topic 配置(只列举动静配置)
sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic
或者sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic
2. 查问所有 Topic 配置(包含外部 Topic)(只列举动静配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics
3. 查问 Topic 的具体配置(动静 + 动态)
只须要加上一个参数
--all
其余配置 /clients/users/brokers/broker-loggers 的查问
同理;只须要将
--entity-type
改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)
查问 kafka 版本信息
sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version
<font color=red> 所有可配置的动静配置 请看最初面的 附件 局部 </font>
2.2 增删改 配置 --alter
–alter
删除配置 : --delete-config
k1=v1,k2=v2
增加 / 批改配 置: --add-config
k1,k2
抉择类型: --entity-type
(topics/clients/users/brokers/broker-
loggers)
类型名称: --entity-name
Topic 增加 / 批改动静配置
--add-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
Topic 删除动静配置
--delete-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
其余配置同理, 只须要类型改下--entity-type
类型有: (topics/clients/users/brokers/broker- loggers)
<font color=red> 哪些配置能够批改 请看最初面的附件:ConfigCommand 的一些可选配置 </font>
3. 正本扩缩、分区迁徙、跨门路迁徙 kafka-reassign-partitions
请戳 [【kafka 运维】正本扩缩容、数据迁徙、正本重调配、正本跨门路迁徙]() (如果点不进去, 示意文章暂未发表, 请急躁期待)
4.Topic 的发送 kafka-console-producer.sh
4.1 生产无 key 音讯
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
4.2 生产有 key 音讯
加上属性--property parse.key=true
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
<font color=red> 默认音讯 key 与音讯 value 间应用“Tab 键”进行分隔,所以音讯 key 以及 value 中切勿应用转义字符(\t)</font>
可选参数
参数 | 值类型 | 阐明 | 有效值 |
---|---|---|---|
–bootstrap-server | String | 要连贯的服务器必须(除非指定 –broker-list) | 如:host1:prot1,host2:prot2 |
–topic | String | (必须)接管音讯的主题名称 | |
–batch-size | Integer | 单个批处理中发送的音讯数 | 200(默认值) |
–compression-codec | String | 压缩编解码器 | none、gzip(默认值)snappy、lz4、zstd |
–max-block-ms | Long | 在发送申请期间,生产者将阻止的最长工夫 | 60000(默认值) |
–max-memory-bytes | Long | 生产者用来缓冲期待发送到服务器的总内存 | 33554432(默认值) |
–max-partition-memory-bytes | Long | 为分区调配的缓冲区大小 | 16384 |
–message-send-max-retries | Integer | 最大的重试发送次数 | 3 |
–metadata-expiry-ms | Long | 强制更新元数据的工夫阈值(ms) | 300000 |
–producer-property | String | 将自定义属性传递给生成器的机制 | 如:key=value |
–producer.config | String | 生产者配置属性文件 [–producer-property] 优先于此配置 配置文件残缺门路 | |
–property | String | 自定义音讯读取器 | parse.key=true/false key.separator=<key.separator>ignore.error=true/false |
–request-required-acks | String | 生产者申请的确认形式 | 0、1(默认值)、all |
–request-timeout-ms | Integer | 生产者申请的确认超时工夫 | 1500(默认值) |
–retry-backoff-ms | Integer | 生产者重试前,刷新元数据的等待时间阈值 | 100(默认值) |
–socket-buffer-size | Integer | TCP 接管缓冲大小 | 102400(默认值) |
–timeout | Integer | 音讯排队异步期待解决的工夫阈值 | 1000(默认值) |
–sync | 同步发送音讯 | ||
–version | 显示 Kafka 版本 | 不配合其余参数时,显示为本地 Kafka 版本 | |
–help | 打印帮忙信息 |
5. Topic 的生产 kafka-console-consumer.sh
1. 新客户端从头生产--from-beginning
(留神这里是新客户端, 如果之前曾经生产过了是不会从头生产的)
上面没有指定客户端名称, 所以每次执行都是新客户端都会从头生产
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
2. 正则表达式匹配 topic 进行生产 --whitelist
生产所有的 topic
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’
生产所有的 topic,并且还从头生产
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’ –from-beginning
3. 显示 key 进行生产--property print.key=true
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –property print.key=true
4. 指定分区生产--partition
指定起始偏移量生产--offset
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –partition 0 –offset 100
5. 给客户端命名--group
留神给客户端命名之后, 如果之前有过生产,那么 --from-beginning
就不会再从头生产了
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –group test-group
6. 增加客户端属性--consumer-property
这个参数也能够给客户端增加属性, 然而留神 不能多个中央配置同一个属性, 他们是互斥的; 比方在上面的根底上还加上属性--group test-group
那必定不行
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test
--consumer-property group.id=test-consumer-group
7. 增加客户端属性--consumer.config
跟--consumer-property
一样的性质, 都是增加客户端的属性, 不过这里是指定一个文件, 把属性写在文件外面, --consumer-property
的优先级大于 --consumer.config
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –consumer.config config/consumer.properties
参数 | 形容 | 例子 | |
---|---|---|---|
--group |
指定消费者所属组的 ID | ||
--topic |
被生产的 topic | ||
--partition |
指定分区;除非指定 –offset ,否则从分区完结(latest) 开始生产 |
--partition 0 |
|
--offset |
执行生产的起始 offset 地位 ; 默认值: latest; /latest /earliest / 偏移量 | --offset 10 |
|
--whitelist |
正则表达式匹配 topic;--topic 就不必指定了; 匹配到的所有 topic 都会生产; 当然用了这个参数,--partition --offset 等就不能应用了 |
||
--consumer-property |
将用户定义的属性以 key=value 的模式传递给使用者 | --consumer-property group.id=test-consumer-group |
|
--consumer.config |
消费者配置属性文件请留神,[consumer-property ]优先于此配置 |
--consumer.config config/consumer.properties |
|
--property |
初始化音讯格式化程序的属性 | print.timestamp=true,false、print.key=true,false、print.value=true,false、key.separator=<key.separator>、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer> | |
--from-beginning |
从存在的最早音讯开始,而不是从最新消息开始, 留神如果配置了客户端名称并且之前生产过,那就不会从头生产了 | ||
--max-messages |
生产的最大数据量,若不指定,则继续生产上来 | --max-messages 100 |
|
--skip-message-on-error |
如果解决音讯时出错,请跳过它而不是暂停 | ||
--isolation-level |
设置为 read_committed 以过滤掉未提交的事务性音讯, 设置为 read_uncommitted 以读取所有音讯, 默认值:read_uncommitted | ||
--formatter |
kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter |
6.kafka-leader-election Leader 从新选举
6.1 指定 Topic 指定分区用从新PREFERRED:优先正本策略
进行 Leader 重选举
> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2 所有 Topic 所有分区用从新PREFERRED:优先正本策略
进行 Leader 重选举
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions
6.3 设置配置文件批量指定 topic 和分区进行 Leader 重选举
先配置 leader-election.json 文件
{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--bootstrap-server 指定 kafka 服务 |
指定连贯到的 kafka 服务 | –bootstrap-server localhost:9092 |
--topic |
指定 Topic,此参数跟 --all-topic-partitions 和path-to-json-file 三者互斥 |
|
--partition |
指定分区, 跟 --topic 搭配应用 |
|
--election-type |
两个选举策略 (PREFERRED: 优先正本选举, 如果第一个正本不在线的话会失败;UNCLEAN : 策略) |
|
--all-topic-partitions |
所有 topic 所有分区执行 Leader 重选举; 此参数跟 --topic 和path-to-json-file 三者互斥 |
|
--path-to-json-file |
配置文件批量选举,此参数跟 --topic 和all-topic-partitions 三者互斥 |
7. 继续批量推送音讯 kafka-verifiable-producer.sh
单次发送 100 条音讯--max-messages 100
一共要推送多少条,默认为 -1,- 1 示意始终推送到过程敞开地位
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--max-messages 100
每秒发送最大吞吐量不超过音讯 --throughput 100
推送音讯时的吞吐量,单位 messages/sec。默认为 -1,示意没有限度
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--throughput 100
发送的音讯体带前缀--value-prefix
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--value-prefix 666
留神
--value-prefix 666
必须是整数, 发送的音讯体的格局是加上一个 点号.
例如:666.
其余参数: --producer.config CONFIG_FILE
指定 producer 的配置文件--acks ACKS
每次推送音讯的 ack 值,默认是 -1
8. 继续批量拉取音讯 kafka-verifiable-consumer
继续生产
sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4
单次最大生产 10 条音讯--max-messages 10
sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4
--max-messages 10
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--bootstrap-server 指定 kafka 服务 |
指定连贯到的 kafka 服务; | –bootstrap-server localhost:9092 |
--topic |
指定生产的 topic | |
--group-id |
消费者 id;不指定的话每次都是新的组 id | |
group-instance-id |
生产组实例 ID, 惟一值 | |
--max-messages |
单次最大生产的音讯数量 | |
--enable-autocommit |
是否开启 offset 主动提交;默认为 false | |
--reset-policy |
当以前没有生产记录时,抉择要拉取 offset 的策略,能够是earliest , latest ,none 。默认是 earliest |
|
--assignment-strategy |
consumer 调配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor |
|
--consumer.config |
指定 consumer 的配置文件 |
9. 生产者压力测试 kafka-producer-perf-test.sh
1. 发送 1024 条音讯 --num-records 100
并且每条音讯大小为 1KB--record-size 1024
最大吞吐量每秒 10000 条--throughput 100
sh bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100000 –producer-props bootstrap.servers=localhost:9092 –record-size 1024
你能够通过 LogIKM 查看分区是否减少了对应的数据大小
从 LogIKM 能够看到发送了 1024 条音讯; 并且总数据量 =1M; 1024 条 *1024byte = 1M;
2. 用指定音讯文件 --payload-file
发送 100 条音讯最大吞吐量每秒 100 条--throughput 100
- 先配置好消息文件
batchmessage.txt
-
而后执行命令
发送的音讯会从batchmessage.txt
外面随机抉择; 留神这里咱们没有用参数--payload-delimeter
指定分隔符,默认分隔符是 \n 换行;bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100 –producer-props bootstrap.servers=localhost:9090 –payload-file config/batchmessage.txt
-
验证音讯,能够通过 LogIKM 查看发送的音讯
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--topic |
指定生产的 topic | |
--num-records |
发送多少条音讯 | |
--throughput |
每秒音讯最大吞吐量 | |
--producer-props |
生产者配置, k1=v1,k2=v2 | --producer-props bootstrap.servers= localhost:9092,client.id=test_client |
--producer.config |
生产者配置文件 | --producer.config config/producer.propeties |
--print-metrics |
在 test 完结的时候打印监控信息, 默认 false | --print-metrics true |
--transactional-id |
指定事务 ID,测试并发事务的性能时须要,只有在 –transaction-duration-ms > 0 时失效,默认值为 performance-producer-default-transactional-id | |
--transaction-duration-ms |
指定事务继续的最长工夫,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0 | |
--record-size |
一条音讯的大小 byte; 和 –payload-file 两个中必须指定一个,但不能同时指定 | |
--payload-file |
指定音讯的起源文件,只反对 UTF-8 编码的文本文件,文件的音讯分隔符通过 --payload-delimeter 指定, 默认是用换行 \nl 来宰割的,和 –record-size 两个中必须指定一个,但不能同时指定 ; 如果提供的音讯 |
|
--payload-delimeter |
如果通过 --payload-file 指定了从文件中获取音讯内容,那么这个参数的意义是指定文件的音讯分隔符,默认值为 \n,即文件的每一行视为一条音讯;如果未指定 --payload-file 则此参数不失效;发送音讯的时候是随机送文件外面抉择音讯发送的; |
10. 消费者压力测试 kafka-consumer-perf-test.sh
生产 100 条音讯 --messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 –bootstrap-server localhost:9090 –messages 100
相干可选参数
参数 | 形容 | 例子 | |
---|---|---|---|
--bootstrap-server |
|||
--consumer.config |
消费者配置文件 | ||
--date-format |
后果打印进去的工夫格式化 | 默认:yyyy-MM-dd HH:mm:ss:SSS | |
--fetch-size |
单次申请获取数据的大小 | 默认 1048576 | |
--topic |
指定生产的 topic | ||
--from-latest |
|||
--group |
生产组 ID | ||
--hide-header |
如果设置了, 则不打印 header 信息 | ||
--messages |
须要生产的数量 | ||
--num-fetch-threads |
feth 数据的线程数 | 默认:1 | |
--print-metrics |
完结的时候打印监控数据 | ||
--show-detailed-stats |
|||
--threads |
生产线程数; | 默认 10 |
11. 删除指定分区的音讯 kafka-delete-records.sh
删除指定 topic 的某个分区的音讯删除至 offset 为 1024
先配置 json 文件offset-json-file.json
{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}
在执行命令
sh bin/kafka-delete-records.sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file.json
验证 通过 LogIKM 查看发送的音讯
从这里能够看进去, 配置"offset": 1024
的意思是从最开始的中央删除音讯到 1024 的 offset; 是从最后面开始删除的
12. 查看 Broker 磁盘信息
查问指定 topic 磁盘信息 --topic-list topic1,topic2
sh bin/kafka-log-dirs.sh –bootstrap-server xxxx:9090 –describe –topic-list test2
查问指定 Broker 磁盘信息--broker-list 0 broker1,broker2
sh bin/kafka-log-dirs.sh –bootstrap-server xxxxx:9090 –describe –topic-list test2 –broker-list 0
例如我一个 3 分区 3 正本的 Topic 的查出来的信息logDir
Broker 中配置的log.dir
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []}]
}]
}
如果你感觉通过命令查问磁盘信息比拟麻烦,你也能够通过 LogIKM 查看
12. 消费者组治理 kafka-consumer-groups.sh
1. 查看消费者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
先调用 MetadataRequest
拿到所有在线 Broker 列表
再给每个 Broker 发送 ListGroupsRequest
申请获取 消费者组数据
2. 查看消费者组详情--describe
DescribeGroupsRequest
查看生产组详情--group
或 --all-groups
查看指定生产组详情
--group
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
查看所有生产组详情
--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
查看该生产组 生产的所有 Topic、及所在分区、最新生产 offset、Log 最新数据 offset、Lag 还未生产数量、消费者 ID 等等信息
查问消费者成员信息--members
所有生产组成员信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定生产组成员信息sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查问消费者状态信息--state
所有生产组状态信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定生产组状态信息sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090
3. 删除消费者组--delete
DeleteGroupsRequest
删除生产组 –delete
删除指定生产组
--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
删除所有生产组--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: 想要删除生产组前提是这个生产组的所有客户端都进行生产 / 不在线才可能胜利删除; 否则会报上面异样
Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. 重置生产组的偏移量 --reset-offsets
<font color=red> 可能执行胜利的一个前提是 生产组这会是不可用状态;</font>
上面的示例应用的参数是: --dry-run
; 这个参数示意预执行, 会打印进去将要解决的后果;
等你想真正执行的时候请换成参数--excute
;
上面示例 重置模式都是 --to-earliest
重置到最早的;
请依据须要参考上面 相干重置 Offset 的模式 换成其余模式;
重置指定生产组的偏移量 --group
重置指定生产组的所有 Topic 的偏移量
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定生产组的指定 Topic 的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
重置所有生产组的偏移量 --all-group
重置所有生产组的所有 Topic 的偏移量
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有生产组中指定 Topic 的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
--reset-offsets
前面须要接 重置的模式
相干重置 Offset 的模式
参数 | 形容 | 例子 |
---|---|---|
--to-earliest : |
重置 offset 到最开始的那条 offset(找到还未被删除最早的那个 offset) | |
--to-current : |
间接重置 offset 到以后的 offset,也就是 LOE | |
--to-latest : |
重置到最初一个 offset | |
--to-datetime : |
重置到指定工夫的 offset; 格局为:YYYY-MM-DDTHH:mm:SS.sss ; |
--to-datetime "2021-6-26T00:00:00.000" |
--to-offset |
重置到指定的 offset, 然而通常状况下, 匹配到多个分区, 这里是将匹配到的所有分区都重置到这一个值; 如果 1. 指标最大 offset<--to-offset , 这个时候重置为指标最大 offset;2. 指标最小 offset>--to-offset ,则重置为最小; 3. 否则的话才会重置为 --to-offset 的目标值; 个别不必这个 |
--to-offset 3465 |
--shift-by |
依照偏移量减少或者缩小多少个 offset;正的为往前减少; 负的往后退;当然这里也是匹配所有的; | --shift-by 100 、--shift-by -100 |
--from-file |
依据 CVS 文档来重置; 这里上面独自解说 |
--from-file
着重解说一下
下面其余的一些模式重置的都是匹配到的所有分区; 不可能每个分区重置到不同的 offset;不过
--from-file
能够让咱们更灵便一点;
-
先配置 cvs 文档
格局为: Topic: 分区号: 重置指标偏移量test2,0,100 test2,1,200 test2,2,300
-
执行命令
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. 删除偏移量delete-offsets
<font color=red> 可能执行胜利的一个前提是 生产组这会是不可用状态;</font>
偏移量被删除了之后,Consumer Group 下次启动的时候, 会从头生产;
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2
相干可选参数
参数 | 形容 | 例子 |
---|---|---|
--bootstrap-server |
指定连贯到的 kafka 服务; | –bootstrap-server localhost:9092 |
--list |
列出所有生产组名称 | --list |
--describe |
查问消费者形容信息 | --describe |
--group |
指定生产组 | |
--all-groups |
指定所有生产组 | |
--members |
查问生产组的成员信息 | |
--state |
查问消费者的状态信息 | |
--offsets |
在查问生产组形容信息的时候, 这个参数会列出音讯的偏移量信息; 默认就会有这个参数的; | |
dry-run |
重置偏移量的时候, 应用这个参数能够让你事后看到重置状况,这个时候还没有真正的执行, 真正执行换成--excute ; 默认为dry-run |
|
--excute |
真正的执行重置偏移量的操作; | |
--to-earliest |
将 offset 重置到最早 | |
to-latest |
将 offset 重置到最近 |
附件
ConfigCommand 的一些可选配置
Topic 相干可选配置
key | value | 示例 |
---|---|---|
cleanup.policy | 清理策略 | |
compression.type | 压缩类型(通常倡议在 produce 端管制) | |
delete.retention.ms | 压缩日志的保留工夫 | |
file.delete.delay.ms | ||
flush.messages | 长久化 message 限度 | |
flush.ms | 长久化频率 | |
follower.replication.throttled.replicas | flowwer 正本限流 格局:分区号: 正本 follower 号, 分区号: 正本 follower 号 | 0:1,1:1 |
index.interval.bytes | ||
leader.replication.throttled.replicas | leader 正本限流 格局:分区号: 正本 Leader 号 | 0:0 |
max.compaction.lag.ms | ||
max.message.bytes | 最大的 batch 的 message 大小 | |
message.downconversion.enable | message 是否向下兼容 | |
message.format.version | message 格局版本 | |
message.timestamp.difference.max.ms | ||
message.timestamp.type | ||
min.cleanable.dirty.ratio | ||
min.compaction.lag.ms | ||
min.insync.replicas | 最小的 ISR | |
preallocate | ||
retention.bytes | 日志保留大小(通常依照工夫限度) | |
retention.ms | 日志保留工夫 | |
segment.bytes | segment 的大小限度 | |
segment.index.bytes | ||
segment.jitter.ms | ||
segment.ms | segment 的切割工夫 | |
unclean.leader.election.enable | 是否容许非同步正本选主 |
Broker 相干可选配置
key | value | 示例 |
---|---|---|
advertised.listeners | ||
background.threads | ||
compression.type | ||
follower.replication.throttled.rate | ||
leader.replication.throttled.rate | ||
listener.security.protocol.map | ||
listeners | ||
log.cleaner.backoff.ms | ||
log.cleaner.dedupe.buffer.size | ||
log.cleaner.delete.retention.ms | ||
log.cleaner.io.buffer.load.factor | ||
log.cleaner.io.buffer.size | ||
log.cleaner.io.max.bytes.per.second | ||
log.cleaner.max.compaction.lag.ms | ||
log.cleaner.min.cleanable.ratio | ||
log.cleaner.min.compaction.lag.ms | ||
log.cleaner.threads | ||
log.cleanup.policy | ||
log.flush.interval.messages | ||
log.flush.interval.ms | ||
log.index.interval.bytes | ||
log.index.size.max.bytes | ||
log.message.downconversion.enable | ||
log.message.timestamp.difference.max.ms | ||
log.message.timestamp.type | ||
log.preallocate | ||
log.retention.bytes | ||
log.retention.ms | ||
log.roll.jitter.ms | ||
log.roll.ms | ||
log.segment.bytes | ||
log.segment.delete.delay.ms | ||
max.connections | ||
max.connections.per.ip | ||
max.connections.per.ip.overrides | ||
message.max.bytes | ||
metric.reporters | ||
min.insync.replicas | ||
num.io.threads | ||
num.network.threads | ||
num.recovery.threads.per.data.dir | ||
num.replica.fetchers | ||
principal.builder.class | ||
replica.alter.log.dirs.io.max.bytes.per.second | ||
sasl.enabled.mechanisms | ||
sasl.jaas.config | ||
sasl.kerberos.kinit.cmd | ||
sasl.kerberos.min.time.before.relogin | ||
sasl.kerberos.principal.to.local.rules | ||
sasl.kerberos.service.name | ||
sasl.kerberos.ticket.renew.jitter | ||
sasl.kerberos.ticket.renew.window.factor | ||
sasl.login.refresh.buffer.seconds | ||
sasl.login.refresh.min.period.seconds | ||
sasl.login.refresh.window.factor | ||
sasl.login.refresh.window.jitter | ||
sasl.mechanism.inter.broker.protocol | ||
ssl.cipher.suites | ||
ssl.client.auth | ||
ssl.enabled.protocols | ||
ssl.endpoint.identification.algorithm | ||
ssl.key.password | ||
ssl.keymanager.algorithm | ||
ssl.keystore.location | ||
ssl.keystore.password | ||
ssl.keystore.type | ||
ssl.protocol | ||
ssl.provider | ||
ssl.secure.random.implementation | ||
ssl.trustmanager.algorithm | ||
ssl.truststore.location | ||
ssl.truststore.password | ||
ssl.truststore.type | ||
unclean.leader.election.enable |
Users 相干可选配置
key | value | 示例 |
---|---|---|
SCRAM-SHA-256 | ||
SCRAM-SHA-512 | ||
consumer_byte_rate | 针对消费者 user 进行限流 | |
producer_byte_rate | 针对生产者进行限流 | |
request_percentage | 申请百分比 |
clients 相干可选配置
key | value | 示例 |
---|---|---|
consumer_byte_rate | ||
producer_byte_rate | ||
request_percentage |
以上大部分运维操作, 都能够应用 LogI-Kafka-Manager 在平台上可视化操作;