应用背景:
- 目前 Kafka 在我所在部门做的是数据同步,如沈阳生成的数据要同步到北京集群,就需要这样的分布式消息队列。
- 也有用 Kafka 做数据缓存
- 同步完数据之后就可以用 Stream、Storm、FLink 等处理流式数据
Kafka 特点
- 高吞吐率是第一需求、低延迟(实时性),每秒处理几十万消息,延迟最低几毫秒
- 可扩展性,支持动态扩展节点数据
- 持久性与可靠性,数据被持久化磁盘,支持数据多副本防止数据丢失
- 高容错,允许节点失败
- 高并发,支持上千个客户端同时读写
Kafka 架构
- kafka 运行在一群 broker 上,这里 broker 指的是各个节点 / 服务器
- 每个 broker 上,消息的组织是按照 topic 来的,有 topic1, topic2, …, topicm
- 每个 topic 对应着 partition1,partition2,…,partitionn,把数据分片的目的:为了并行计算时增加并行度,计算时并行处理各个分片
- Kafka 的 容错机制,是因为它会有消息副本(存放在其他位置),在使用 Kafka 备份时,要指定分区、指定备份因子(备几份)
Kafka 基于消息订阅消费
- Producer 生产者身份,制作生产数据
- Consumer 消费者身份,消费即读数据。Consumer 有 Consumer Group 的概念,也就是一个 topic 的 n 个分片会被 Consumer Group 的 consumer1,consumer2,…,cnsumerk 消费
- 需要注意的是:同一 Group 的 consumer 不可重复消费,每个 Consumer 不同重复消费数据,这是为了保障并行计算的效率。那么每次消费,就需要记录消费到哪儿了,这里使用的是 Offset,用来记录读到哪个位置
- Offset(即位置信息)保存在 zooKeeper 集群上,以目录的形式存储,如 broker1/topic1/consumer1/47/。这种机制类似 Spark 的 Checkpoint 检查点文件,用于做信息回溯。
Kafka 的整体架构和消费机制如下图:
列出各个 broker 上的 topic:
./kafka-topics.sh --zookeeper ip1:port1,ip2:port2,ip3:port3 --list
创建 topic:
./kafka-topics.sh --zookeeper 192.168.152.122:2181,192.168.152.222:2181,192.168.152.131:2181,192.168.152.231:2181,192.168.152.241:2181 --create --topic sunjingru --partitions 2 --replication-factor 2
console 生产者输入数据:
./kafka-console-producer.sh --broker-list ip1:port1,ip2:port2,ip3:port3 --topic sjr
console 消费者消费数据:
./kafka-console-consumer.sh --zookeeper ip1:port1,ip2:port2,ip3:port3 --topic sjr --from-beginning