基本原理
- Kafka是由LinkedIn开发的一个分布式的音讯零碎,应用Scala编写,它因能够程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如Cloudera、Apache Storm、Spark都反对与Kafka集成。
- Kafka是一种分布式的,基于公布/订阅的音讯零碎。次要设计指标如下:
(1)以工夫复杂度为O(1)的形式提供音讯长久化能力,即便对TB级以上数据也能保障常数工夫复杂度的拜访性能
(2)高吞吐率。即便在十分便宜的商用机器上也能做到单机反对每秒100K条以上音讯的传输
(3)反对Kafka Server间的音讯分区,及分布式生产,同时保障每个Partition内的音讯程序传输
(4)同时反对离线数据处理和实时数据处理
(5)Scale out:反对在线程度扩大 - Kafka中各个组件的性能:
(1)Broker: Kafka集群蕴含一个或多个服务器,这种服务器被称为broker
(2)Topic:每条公布到Kafka集群的音讯都有一个类别,这个类别被称为Topic。(物理上不同Topic的音讯离开存储,逻辑上一个Topic的音讯尽管保留于一个或多个broker上,但用户只需指定音讯的Topic即可生产或生产数据,不用关怀数据存于何处)
(3)Partition:Parition是物理上的概念,每个Topic蕴含一个或多个Partition
(4)Producer:负责公布音讯到Kafka broker
(5)Consumer:音讯消费者,向Kafka broker读取音讯的客户端
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
Producer应用Push模式将音讯公布到Broker,Consumer应用Pull模式从Broker订阅并生产音讯。
零碎环境
Linux Ubuntu 20.04
OpenJDK-11.0.11
工作内容
Kafka装置依赖Scala、ZooKeeper,所以须要先装置Scala与ZooKeeper。而后在已装置好Scala和ZooKeeper的环境根底上,装置部署Kafka。
工作步骤
1.首先在Linux本地,新建/data/kafka1目录,用于寄存实验所需文件。
mkdir -p /data/kafka1
切换目录到/data/kafka1下,应用wget命令,下载所需安装包scala-2.13.5.tgz,kafka_2.13-2.8.0.tgz以及apache-zookeeper-3.6.3-bin.tar.gz。
cd /data/kafka1 **wget https://adoptopenjdk.net/**wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
2.装置Scala。
切换到/data/kafka1目录下,将Scala安装包scala-2.13.5.tgz解压到/apps目录下,并将解压后的目录,重命名为scala。
cd /data/kafka1 tar -xzvf /data/kafka1/scala-2.13.5.tgz -C /apps/ cd /apps mv /apps/scala-2.13.5/ /apps/scala
应用vim关上用户环境变量。
sudo vim ~/.bashrc
将以下Scala的门路信息,追加到用户环境变量中。
#scala export SCALA_HOME=/apps/scala export PATH=$SCALA_HOME/bin:$PATH
执行source命令,使环境变量失效。
source ~/.bashrc
3.切换到/data/kafka1目录下,将kafka的压缩包kafka_2.13-2.8.0.tgz解压到/apps目录下,并将解压缩后的目录,重命名为kafka。
cd /data/kafka1 tar -xzvf /data/kafka1/kafka_2.13-2.8.0.tgz -C /apps/ cd /apps mv /apps/kafka_2.13-2.8.0/ /apps/kafka
应用vim关上用户环境变量。
sudo vim ~/.bashrc
将以下Kafka的门路信息,追加到用户环境变量中。
#kafka export KAFKA_HOME=/apps/kafka export PATH=$KAFKA_HOME/bin:$PATH
执行source命令,使环境变量失效。
source ~/.bashrc
4.因为Kafka的局部数据须要存储到ZooKeeper中,所以必须额定装置ZooKeeper,或应用Kafka安装包自带的ZooKeeper程序。
首先来演示应用外置的ZooKeeper程序。
将/data/kafka1目录下apache-zookeeper-3.6.3-bin.tar.gz,解压缩到/apps目录下,并将解压缩的目录,重命名为zookeeper。
cd /data/kafka1 tar -xzvf /data/kafka1/apache-zookeeper-3.6.3-bin.tar.gz -C /apps/ cd /apps mv /apps/apache-zookeeper-3.6.3/ /apps/zookeeper
应用vim关上用户环境变量。
sudo vim ~/.bashrc
将以下Zookeeper的门路信息,追加到用户环境变量中。
#zookeeper export ZOOKEEPER_HOME=/apps/zookeeper export PATH=$ZOOKEEPER_HOME/bin:$PATH
执行source命令,使环境变量失效。
source ~/.bashrc
批改ZooKeeper的配置文件,将ZooKeeper配置为单机模式。
切换到ZooKeeper的配置文件所在目录/apps/zookeeper/conf下,将zoo_sample.cfg重命名为zoo.cfg
cd /apps/zookeeper/conf/ mv /apps/zookeeper/conf/zoo_sample.cfg /apps/zookeeper/conf/zoo.cfg
应用vim关上zoo.cfg文件,并批改dataDir项内容
vim zoo.cfg
由:
dataDir=/tmp/zookeeper
改为:
dataDir=/data/tmp/zookeeper-outkafka/data
这里的/data/tmp/zookeeper-outkafka/data目录须要提前创立。
mkdir -p /data/tmp/zookeeper-outkafka/data
启动ZooKeeper,并查看ZooKeeper的运行状态。
cd /apps/zookeeper/bin ./zkServer.sh start ./zkServer.sh status
敞开ZooKeeper。
cd /apps/zookeeper/bin ./zkServer.sh stop
5.应用Kafka内置的ZooKeeper,切换目录到/apps/kafka/config目录下。
cd /apps/kafka/config
这里搁置着与ZooKeeper的配置文件zoo.cfg性能类似的配置文件zookeeper.properties,应用vim关上zookeeper.properties配置文件。
vim zookeeper.properties
将dataDir目录批改为/data/tmp/zookeeper-inkafka/data目录。
dataDir=/data/tmp/zookeeper-inkafka/data
这里的/data/tmp/zookeeper-inkafka/data目录,须提前创立。
mkdir -p /data/tmp/zookeeper-inkafka/data
上面启动ZooKeeper服务,切换目录到/apps/kafka目录下,在kafka的bin目录下放有ZooKeeper的启动脚本,按Ctrl+c退出。
cd /apps/kafka bin/zookeeper-server-start.sh config/zookeeper.properties &
开端的&符号,会将zookeeper-server-start.sh放到后盾执行。输出jps
jps
查看ZooKeeper的过程QuorumPeerMain
上面敞开ZooKeeper过程
cd /apps/kafka bin/zookeeper-server-stop.sh stop
6.以上两种ZooKeeper的应用形式,默认应用外置的ZooKeeper,对Kafka数据进行治理。
至此Kafka已装置结束。
接下来对Kafka进行测试,检测是否能够失常运行。
7.切换到/apps/zookeeper目录下,启动ZooKeeper服务。
cd /apps/zookeeper bin/zkServer.sh start
8.切换到/apps/kafka/config目录下,这里搁置了Kafka的相干的配置文件。应用vim关上Kafka服务的配置文件server.properties。
cd /apps/kafka/config vim server.properties
server.properties文件中的配置项包含:服务器根本配置,socket服务设置,log日志的配置,log刷新策略,log保留策略,ZooKeeper配置。
服务器根本配置,次要包含以后节点的编号。
ZooKeeper配置中,包含ZooKeeper服务的IP和端口号等。
批改zookeeper.connect项的值为:
zookeeper.connect=localhost:2181
这里的IP和端口,是ZooKeeper发送接管音讯应用的端口。IP必须为ZooKeeper服务的IP,咱们设置为localhost,端口必须和/apps/zookeeper/conf下zoo.cfg中的clientPort端口统一。
9.切换目录到/apps/kafka目录下,启动Kafka服务。启动Kafka服务时,会读取Kafka配置文件目录下的server.properties文件。
cd /apps/kafka bin/kafka-server-start.sh config/server.properties &
这样启动了Kafka的server,并在后端运行。
10.另外开启一个窗口,调用/apps/kafka/bin目录下kafka-topic.sh脚本创立一个topic。
cd /apps/kafka bin/kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --topic sayaword \ --partitions 1
kafka-topic.sh命令后,须要增加一些参数,比方ZooKeeper的配置,主题名称等。
上面查看Kafka中,都有哪些topic
cd /apps/kafkabin/kafka-topics.sh --list --zookeeper localhost:2181
11.调用/apps/kafka/bin目录下kafka-console-producer.sh,来生产一些音讯,producer也就是生产者。
cd /apps/kafkabin/kafka-console-producer.sh --broker-list localhost:9092 --topic sayaword
这里的localhost为Kafka的IP,9092为broker节点的端口。用户能够在console界面上,输出信息,交给producer进行解决,并发给consumer。
12.再令开启一个窗口,调用bin目录下kafka-console-consumer.sh,启动consumer,consumer作为消费者,用来生产数据。
cd /apps/kafka bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sayaword --from-beginning
kafka-console-consumer.sh仍然须要加一些参数,比方ZooKeeper的IP及端口、主题名称、读取数据地位等。
13.在执行kafka-console-producer.sh命令的界面中,轻易输出几行文字,按回车。能够看到在consumer端,会将同样的内容,输入进去。
producer端:
consumer端:
14.退出测试。
在kafka-console-consumer.sh、kafka-console-producer.sh及kafka-server-start.sh在命令行界面,执行Ctrl + c,别离退出consumer,producer及server。
切换目录到/apps/zookeeper/bin目录下,进行ZooKeeper。
cd /apps/zookeeper/bin ./zkServer.sh stop