下载kafka,自带 zookeeper。搭建Zookeeper集群zookeeper 集群使用 Raft 选举模式,故至少要三个节点(生产中应部署在三个不同的服务器实例上,这里用于演示就不那么做了)。# 复制三分节点配置cp config/zookeeper.properties config/zookeeper.2181.propertiescp config/zookeeper.properties config/zookeeper.2182.propertiescp config/zookeeper.properties config/zookeeper.2183.properties修改配置config/zookeeper.2181.properties# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/2181# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0tickTime=2000initLimit=10syncLimit=5server.1=localhost:12888:13888server.2=localhost:22888:23888server.3=localhost:32888:33888config/zookeeper.2182.properties 修改clientPort=2182 dataDir=/tmp/zookeeper/2182 其他一致config/zookeeper.2183.properties 修改clientPort=2183 dataDir=/tmp/zookeeper/2183 其他一致主要是修改服务端口clientPort和数据目录dataDir,其他参数表征如下:tickTime=2000为zk的基本时间单元,毫秒initLimit=10Leader-Follower初始通信时限(tickTime10)syncLimit=5Leader-Follower同步通信时限(tickTime5)server.实例集群标识=实例地址:数据通信端口:选举通信端口为实例添加集群标识echo 1 >> /tmp/zookeeper/2181/myidecho 2 >> /tmp/zookeeper/2182/myidecho 3 >> /tmp/zookeeper/2183/myid启动集群服务bin/zookeeper-server-start.sh config/zookeeper.2181.propertiesbin/zookeeper-server-start.sh config/zookeeper.2182.propertiesbin/zookeeper-server-start.sh config/zookeeper.2183.properties搭建Kafka集群Kafka集群节点>=2时便可对外提供高可用服务cp config/server.properties config/server.9092.propertiescp config/server.properties config/server.9093.properties修改节点标识、服务端口、数据目录和zk集群节点列表vi config/server.9092.propertiesbroker.id=1…listeners=PLAINTEXT://:9092…log.dirs=/tmp/kafka-logs/1…zookeeper.connect=localhost:2181,localhost:2182,localhost:2183vi config/server.9093.propertiesbroker.id=2…listeners=PLAINTEXT://:9093…log.dirs=/tmp/kafka-logs/2…zookeeper.connect=localhost:2181,localhost:2182,localhost:2183启动集群bin/kafka-server-start.sh config/server.9092.propertiesbin/kafka-server-start.sh config/server.9093.propertiesTopic管理创建topicbin/kafka-topics.sh –create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partition 4 --topic topic_1–replication-factor 2:副本集数量,不能大于 broker 节点数量,多了也没用,1个节点放>=2个副本挂了都完蛋。–partition 4:分区数查看topic列表bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 –listtopic_1topic_2查看Topic详情可以描述Topic分区数/副本数/副本Leader/副本ISR等信息:bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --describe –topic topic_1Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2删除Topic注意,只是删除Topic在zk的元数据,日志数据仍需手动删除。bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --delete –topic topic_2#Topic topic_2 is marked for deletion.#Note: This will have no impact if delete.topic.enable is not set to true.#再查看topic列表bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 –list#topic_1#topic_2 - marked for deletion生产者bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic topic_1# 进入 cli 输入消息回车发送# hello kafka [enter]# send message [enter]消费者新模式,offset存储在borker–new-consumer Use new consumer. This is the default.–bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.老消费模式,offset存储在zk–zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.创建消费者bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic topic_1可以尝试创建多个不同消费组的消费者(这里的sh脚本创建的都是不同消费组的),订阅同一个topic来实现发布订阅模式。查看消费组/消费者bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --list#这里有两个消费组的消费者console-consumer-47566console-consumer-50875查看消费详情可以查看到消费的订阅的 topic,负责的 partition,消费进度 offset, 积压的消息LAG。bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --group console-consumer-47566 --describeGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERconsole-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1