共计 3032 个字符,预计需要花费 8 分钟才能阅读完成。
Kafka 的学习与应用
简介
起初是由 LinkedIn 公司采纳 Scala 语言开发的一个分布式、多分区、多正本且基于 zookeeper 协调的分布式音讯零碎,现已募捐给 Apache 基金会。它是一种高吞吐量的分布式公布订阅音讯零碎,以可程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如 Cloudera、Apache Storm、Spark、Flink 等都反对与 Kafka 集成。
kafka 是用 Scala 写的,一下两种框架都是应用 scala 语言写的:
- kafka(音讯队列): 分布式音讯队列 外部代码常常用来解决并发的问题 用 scala 能够大大简化其代码。
- spark(实时的大数据分析框架,和 flink 性能类似): 解决多线程场景不便 另外 spark 次要用作内存计算 常常要用来实现简单的算法 利用 scala 这种函数式编程语言 能够大大简化代码
Kafka 与 zk 的关系
zk 为 kafka 提供一下几点作用(须要留神,在 0.9 版本之前 offset 存储在 ZK0.9 版本及之后 offset 存储在 kafka 本地外部 topic 本人保留)
1.Broker 注册
Broker 是分布式部署并且相互之间互相独立,然而须要有一个注册零碎可能将整个集群中的 Broker 治理起来 ,此时就应用到了 Zookeeper。在 Zookeeper 上会有一个专门 用来进行 Broker 服务器列表记录 的节点:/brokers/ids
每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创立属于本人的节点,如 /brokers/ids/[0…N]。
Kafka 应用了全局惟一的数字来指代每个 Broker 服务器,不同的 Broker 必须应用不同的 Broker ID 进行注册,创立完节点后,每个 Broker 就会将本人的 IP 地址和端口信息记录 到该节点中去。其中,Broker 创立的节点类型是长期节点,一旦 Broker 宕机,则对应的长期节点也会被主动删除。
2.Topic 注册
在 Kafka 中,同一个 Topic 的音讯会被分成多个分区 并将其散布在多个 Broker 上,这些分区信息及与 Broker 的对应关系 也都是由 Zookeeper 在保护,由专门的节点来记录,如:/borkers/topics
Kafka 中每个 Topic 都会以 /brokers/topics/[topic]的模式被记录,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker 服务器启动后,会到对应 Topic 节点(/brokers/topics)上注册本人的 Broker ID 并写入针对该 Topic 的分区总数,如 /brokers/topics/login/3->2,这个节点示意 Broker ID 为 3 的一个 Broker 服务器,对于 ”login” 这个 Topic 的音讯,提供了 2 个分区进行音讯存储,同样,这个分区节点也是长期节点。
https://www.cnblogs.com/Lee-y…
Kafka 的架构图解析
Kafka 装置步骤
kafka 是依赖 zookeeper 的,所以在装置前你须要装置并启动 zookeeper。
- 上传文件
这里我应用的 kafka 版本比拟老,因为个别公司应用的都不会是最新的版本。在 linux 中的 /opt
文件夹中新增 kafka 文件夹,并把安装包上传,能够应用 xftp 进行上传。
- 解压
应用命令 tar -zxvf kafka_2.11-0.11.0.0.tgz
解压 kafka 的文件,解压后应用 mv 命令进行重命名,把解压后的文件夹重命名为 kafka,这样不便当前输出。
- 批改配置
进入 kafka 文件夹的 config 目录。能够看到目录构造如下
首先批改 server.properties
文件,命令vi server.properties
先在 kafka 文件夹下新建一个 logs 的文件夹,上面须要应用
这里次要批改到处,在高版本中曾经没有删除 topic 性能这个配置能够自行疏忽
#broker 的全局惟一编号,不能反复
broker.id=0
#删除 topic 性能使能
delete.topic.enable=true
#kafka 运行日志寄存的门路
log.dirs=/opt/kafka/logs
#zk 的地址, 这里改成本人的 zk 地址就好
zookeeper.connect=localhost:2181
这里须要留神的点是默认 kafka 音讯默认是存储 168 小时也就是 7 天,这个也是在这个文件中配置的。
而后配置环境变量
应用命令:vi /etc/profile
,在文件的最初配置 kafka
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
最初如果是集群装置还要散发安装包,这里临时不须要。散发应用xsync kafka/
, 散发后记得批改 brokerid,在集群中须要惟一。
- 启动
启动的时候要带上咱们的配置文件,且以守护过程的形式启动
bin/kafka-server-start.sh -daemon config/server.properties
如果是集群能够本人写群起脚本。
Kafka 命令行操作
首先进入 kafka 装置目录的 bin 目录。
- 查看以后服务器中的所有 topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
- 创立 topic
kafka-topics.sh --zookeeper 127.0.0.1:12181 --create --replication-factor 3 --partitions 1 --topic first
选项阐明:
–topic 定义 topic 名
–replication-factor 定义正本数
–partitions 定义分区数 - 删除 topic
kafka-topics.sh –zookeeper 127.0.0.1:12181 –delete –topic first
须要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
- 发送音讯
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first
输出命令后就能够进行发送音讯了
- 生产音讯(多种形式)
这是之前老版本的形式,连贯 zk,当初不举荐
kafka-console-consumer.sh --zookeeper 127.0.0.1:12181 --topic first
新版本的对于 offset 等数据曾经存储在 kafka 本地了,所以应用这个命令
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first
–from-beginning:会把主题中以往所有的数据都读取进去。
- 查看某个 Topic 的详情
kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe --topic first
- 批改分区数
kafka-topics.sh --zookeeper127.0.0.1:12181 --alter --topic first --partitions 6