下载 kafka,自带 zookeeper。
搭建 Zookeeper 集群
zookeeper 集群使用 Raft 选举模式,故至少要三个节点(生产中应部署在三个不同的服务器实例上,这里用于演示就不那么做了)。
# 复制三分节点配置
cp config/zookeeper.properties config/zookeeper.2181.properties
cp config/zookeeper.properties config/zookeeper.2182.properties
cp config/zookeeper.properties config/zookeeper.2183.properties
修改配置 config/zookeeper.2181.properties
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper/2181
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
server.1=localhost:12888:13888
server.2=localhost:22888:23888
server.3=localhost:32888:33888
config/zookeeper.2182.properties 修改 clientPort=2182 dataDir=/tmp/zookeeper/2182 其他一致 config/zookeeper.2183.properties 修改 clientPort=2183 dataDir=/tmp/zookeeper/2183 其他一致
主要是修改服务端口 clientPort 和数据目录 dataDir,其他参数表征如下:tickTime=2000 为 zk 的基本时间单元,毫秒 initLimit=10Leader-Follower 初始通信时限(tickTime*10)syncLimit=5Leader-Follower 同步通信时限(tickTime*5)server. 实例集群标识 = 实例地址: 数据通信端口: 选举通信端口
为实例添加集群标识
echo 1 >> /tmp/zookeeper/2181/myid
echo 2 >> /tmp/zookeeper/2182/myid
echo 3 >> /tmp/zookeeper/2183/myid
启动集群服务
bin/zookeeper-server-start.sh config/zookeeper.2181.properties
bin/zookeeper-server-start.sh config/zookeeper.2182.properties
bin/zookeeper-server-start.sh config/zookeeper.2183.properties
搭建 Kafka 集群
Kafka 集群节点 >= 2 时便可对外提供高可用服务
cp config/server.properties config/server.9092.properties
cp config/server.properties config/server.9093.properties
修改节点标识、服务端口、数据目录和 zk 集群节点列表 vi config/server.9092.properties
broker.id=1
…
listeners=PLAINTEXT://:9092
…
log.dirs=/tmp/kafka-logs/1
…
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
vi config/server.9093.properties
broker.id=2
…
listeners=PLAINTEXT://:9093
…
log.dirs=/tmp/kafka-logs/2
…
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
启动集群
bin/kafka-server-start.sh config/server.9092.properties
bin/kafka-server-start.sh config/server.9093.properties
Topic 管理
创建 topic
bin/kafka-topics.sh –create \
–zookeeper localhost:2181,localhost:2182,localhost:2183 \
–replication-factor 2 \
–partition 4 \
–topic topic_1
–replication-factor 2:副本集数量,不能大于 broker 节点数量,多了也没用,1 个节点放 >= 2 个副本挂了都完蛋。–partition 4:分区数
查看 topic 列表
bin/kafka-topics.sh \
–zookeeper localhost:2181,localhost:2182,localhost:2183 –list
topic_1
topic_2
查看 Topic 详情
可以描述 Topic 分区数 / 副本数 / 副本 Leader/ 副本 ISR 等信息:
bin/kafka-topics.sh \
–zookeeper localhost:2181,localhost:2182,localhost:2183 \
–describe –topic topic_1
Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
删除 Topic
注意,只是删除 Topic 在 zk 的元数据,日志数据仍需手动删除。
bin/kafka-topics.sh \
–zookeeper localhost:2181,localhost:2182,localhost:2183 \
–delete –topic topic_2
#Topic topic_2 is marked for deletion.
#Note: This will have no impact if delete.topic.enable is not set to true.
# 再查看 topic 列表
bin/kafka-topics.sh \
–zookeeper localhost:2181,localhost:2182,localhost:2183 –list
#topic_1
#topic_2 – marked for deletion
生产者
bin/kafka-console-producer.sh \
–broker-list localhost:9092,localhost:9093 \
–topic topic_1
# 进入 cli 输入消息回车发送
# hello kafka [enter]
# send message [enter]
消费者
新模式,offset 存储在 borker–new-consumer Use new consumer. This is the default.–bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to. 老消费模式,offset 存储在 zk–zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
创建消费者
bin/kafka-console-consumer.sh \
–new-consumer \
–bootstrap-server localhost:9092,localhost:9093 \
–from-beginning \
–topic topic_1
可以尝试创建多个不同消费组的消费者(这里的 sh 脚本创建的都是不同消费组的),订阅同一个 topic 来实现发布订阅模式。
查看消费组 / 消费者
bin/kafka-consumer-groups.sh \
–new-consumer \
–bootstrap-server localhost:9092,localhost:9093 \
–list
#这里有两个消费组的消费者
console-consumer-47566
console-consumer-50875
查看消费详情
可以查看到消费的订阅的 topic,负责的 partition,消费进度 offset, 积压的消息 LAG。
bin/kafka-consumer-groups.sh \
–new-consumer \
–bootstrap-server localhost:9092,localhost:9093 \
–group console-consumer-47566 \
–describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
console-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1
console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1
console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1
console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1