基本原理
- Kafka 是由 LinkedIn 开发的一个分布式的音讯零碎,应用 Scala 编写,它因能够程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如 Cloudera、Apache Storm、Spark 都反对与 Kafka 集成。
- Kafka 是一种分布式的,基于公布 / 订阅的音讯零碎。次要设计指标如下:
(1)以工夫复杂度为 O(1) 的形式提供音讯长久化能力,即便对 TB 级以上数据也能保障常数工夫复杂度的拜访性能
(2)高吞吐率。即便在十分便宜的商用机器上也能做到单机反对每秒 100K 条以上音讯的传输
(3)反对 Kafka Server 间的音讯分区,及分布式生产,同时保障每个 Partition 内的音讯程序传输
(4)同时反对离线数据处理和实时数据处理
(5)Scale out:反对在线程度扩大 - Kafka 中各个组件的性能:
(1)Broker:Kafka 集群蕴含一个或多个服务器,这种服务器被称为 broker
(2)Topic:每条公布到 Kafka 集群的音讯都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的音讯离开存储,逻辑上一个 Topic 的音讯尽管保留于一个或多个 broker 上,但用户只需指定音讯的 Topic 即可生产或生产数据,不用关怀数据存于何处)
(3)Partition:Parition 是物理上的概念,每个 Topic 蕴含一个或多个 Partition
(4)Producer:负责公布音讯到 Kafka broker
(5)Consumer:音讯消费者,向 Kafka broker 读取音讯的客户端
(6)Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)
Producer 应用 Push 模式将音讯公布到 Broker,Consumer 应用 Pull 模式从 Broker 订阅并生产音讯。
零碎环境
Linux Ubuntu 20.04
OpenJDK-11.0.11
工作内容
Kafka 装置依赖 Scala、ZooKeeper,所以须要先装置 Scala 与 ZooKeeper。而后在已装置好 Scala 和 ZooKeeper 的环境根底上,装置部署 Kafka。
工作步骤
1. 首先在 Linux 本地,新建 /data/kafka1 目录,用于寄存实验所需文件。
mkdir -p /data/kafka1
切换目录到 /data/kafka1 下,应用 wget 命令,下载所需安装包 scala-2.13.5.tgz,kafka_2.13-2.8.0.tgz 以及 apache-zookeeper-3.6.3-bin.tar.gz。
cd /data/kafka1
**wget https://adoptopenjdk.net/**
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
2. 装置 Scala。
切换到 /data/kafka1 目录下,将 Scala 安装包 scala-2.13.5.tgz 解压到 /apps 目录下,并将解压后的目录,重命名为 scala。
cd /data/kafka1
tar -xzvf /data/kafka1/scala-2.13.5.tgz -C /apps/
cd /apps
mv /apps/scala-2.13.5/ /apps/scala
应用 vim 关上用户环境变量。
sudo vim ~/.bashrc
将以下 Scala 的门路信息,追加到用户环境变量中。
#scala
export SCALA_HOME=/apps/scala
export PATH=$SCALA_HOME/bin:$PATH
执行 source 命令,使环境变量失效。
source ~/.bashrc
3. 切换到 /data/kafka1 目录下,将 kafka 的压缩包 kafka_2.13-2.8.0.tgz 解压到 /apps 目录下,并将解压缩后的目录,重命名为 kafka。
cd /data/kafka1
tar -xzvf /data/kafka1/kafka_2.13-2.8.0.tgz -C /apps/
cd /apps
mv /apps/kafka_2.13-2.8.0/ /apps/kafka
应用 vim 关上用户环境变量。
sudo vim ~/.bashrc
将以下 Kafka 的门路信息,追加到用户环境变量中。
#kafka
export KAFKA_HOME=/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
执行 source 命令,使环境变量失效。
source ~/.bashrc
4. 因为 Kafka 的局部数据须要存储到 ZooKeeper 中,所以必须额定装置 ZooKeeper,或应用 Kafka 安装包自带的 ZooKeeper 程序。
首先来演示应用外置的 ZooKeeper 程序。
将 /data/kafka1 目录下 apache-zookeeper-3.6.3-bin.tar.gz,解压缩到 /apps 目录下,并将解压缩的目录,重命名为 zookeeper。
cd /data/kafka1
tar -xzvf /data/kafka1/apache-zookeeper-3.6.3-bin.tar.gz -C /apps/
cd /apps
mv /apps/apache-zookeeper-3.6.3/ /apps/zookeeper
应用 vim 关上用户环境变量。
sudo vim ~/.bashrc
将以下 Zookeeper 的门路信息,追加到用户环境变量中。
#zookeeper
export ZOOKEEPER_HOME=/apps/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
执行 source 命令,使环境变量失效。
source ~/.bashrc
批改 ZooKeeper 的配置文件,将 ZooKeeper 配置为单机模式。
切换到 ZooKeeper 的配置文件所在目录 /apps/zookeeper/conf 下,将 zoo_sample.cfg 重命名为 zoo.cfg
cd /apps/zookeeper/conf/
mv /apps/zookeeper/conf/zoo_sample.cfg /apps/zookeeper/conf/zoo.cfg
应用 vim 关上 zoo.cfg 文件,并批改 dataDir 项内容
vim zoo.cfg
由:
dataDir=/tmp/zookeeper
改为:
dataDir=/data/tmp/zookeeper-outkafka/data
这里的 /data/tmp/zookeeper-outkafka/data 目录须要提前创立。
mkdir -p /data/tmp/zookeeper-outkafka/data
启动 ZooKeeper,并查看 ZooKeeper 的运行状态。
cd /apps/zookeeper/bin
./zkServer.sh start
./zkServer.sh status
敞开 ZooKeeper。
cd /apps/zookeeper/bin
./zkServer.sh stop
5. 应用 Kafka 内置的 ZooKeeper,切换目录到 /apps/kafka/config 目录下。
cd /apps/kafka/config
这里搁置着与 ZooKeeper 的配置文件 zoo.cfg 性能类似的配置文件 zookeeper.properties,应用 vim 关上 zookeeper.properties 配置文件。
vim zookeeper.properties
将 dataDir 目录批改为 /data/tmp/zookeeper-inkafka/data 目录。
dataDir=/data/tmp/zookeeper-inkafka/data
这里的 /data/tmp/zookeeper-inkafka/data 目录,须提前创立。
mkdir -p /data/tmp/zookeeper-inkafka/data
上面启动 ZooKeeper 服务,切换目录到 /apps/kafka 目录下,在 kafka 的 bin 目录下放有 ZooKeeper 的启动脚本,按 Ctrl+ c 退出。
cd /apps/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
开端的 & 符号,会将 zookeeper-server-start.sh 放到后盾执行。输出 jps
jps
查看 ZooKeeper 的过程 QuorumPeerMain
上面敞开 ZooKeeper 过程
cd /apps/kafka
bin/zookeeper-server-stop.sh stop
6. 以上两种 ZooKeeper 的应用形式,默认应用外置的 ZooKeeper,对 Kafka 数据进行治理。
至此 Kafka 已装置结束。
接下来对 Kafka 进行测试,检测是否能够失常运行。
7. 切换到 /apps/zookeeper 目录下,启动 ZooKeeper 服务。
cd /apps/zookeeper
bin/zkServer.sh start
8. 切换到 /apps/kafka/config 目录下,这里搁置了 Kafka 的相干的配置文件。应用 vim 关上 Kafka 服务的配置文件 server.properties。
cd /apps/kafka/config
vim server.properties
server.properties 文件中的配置项包含:服务器根本配置,socket 服务设置,log 日志的配置,log 刷新策略,log 保留策略,ZooKeeper 配置。
服务器根本配置,次要包含以后节点的编号。
ZooKeeper 配置中,包含 ZooKeeper 服务的 IP 和端口号等。
批改 zookeeper.connect 项的值为:
zookeeper.connect=localhost:2181
这里的 IP 和端口,是 ZooKeeper 发送接管音讯应用的端口。IP 必须为 ZooKeeper 服务的 IP,咱们设置为 localhost,端口必须和 /apps/zookeeper/conf 下 zoo.cfg 中的 clientPort 端口统一。
9. 切换目录到 /apps/kafka 目录下,启动 Kafka 服务。启动 Kafka 服务时,会读取 Kafka 配置文件目录下的 server.properties 文件。
cd /apps/kafka
bin/kafka-server-start.sh config/server.properties &
这样启动了 Kafka 的 server,并在后端运行。
10. 另外开启一个窗口,调用 /apps/kafka/bin 目录下 kafka-topic.sh 脚本创立一个 topic。
cd /apps/kafka
bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--topic sayaword \
--partitions 1
kafka-topic.sh 命令后,须要增加一些参数,比方 ZooKeeper 的配置,主题名称等。
上面查看 Kafka 中,都有哪些 topic
cd /apps/kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181
11. 调用 /apps/kafka/bin 目录下 kafka-console-producer.sh,来生产一些音讯,producer 也就是生产者。
cd /apps/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sayaword
这里的 localhost 为 Kafka 的 IP,9092 为 broker 节点的端口。用户能够在 console 界面上,输出信息,交给 producer 进行解决,并发给 consumer。
12. 再令开启一个窗口,调用 bin 目录下 kafka-console-consumer.sh,启动 consumer,consumer 作为消费者,用来生产数据。
cd /apps/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sayaword --from-beginning
kafka-console-consumer.sh 仍然须要加一些参数,比方 ZooKeeper 的 IP 及端口、主题名称、读取数据地位等。
13. 在执行 kafka-console-producer.sh 命令的界面中,轻易输出几行文字,按回车。能够看到在 consumer 端,会将同样的内容,输入进去。
producer 端:
consumer 端:
14. 退出测试。
在 kafka-console-consumer.sh、kafka-console-producer.sh 及 kafka-server-start.sh 在命令行界面,执行 Ctrl + c,别离退出 consumer,producer 及 server。
切换目录到 /apps/zookeeper/bin 目录下,进行 ZooKeeper。
cd /apps/zookeeper/bin
./zkServer.sh stop