关于后端:kafka入门介绍详细教程

47次阅读

共计 10813 个字符,预计需要花费 28 分钟才能阅读完成。

什么是 Kafka

Kafka 是一个分布式流式平台,它有三个要害能力

  1. 订阅公布记录流,它相似于企业中的 音讯队列 企业消息传递零碎
  2. 以容错的形式存储记录流
  3. 实时记录流

Kafka 的利用

  1. 作为音讯零碎
  2. 作为存储系统
  3. 作为流处理器

Kafka 能够建设流数据管道,可靠性的在零碎或利用之间获取数据。

建设流式利用传输和响应数据。

Kafka 作为音讯零碎

Kafka 作为音讯零碎,它有三个根本组件

  • Producer : 公布音讯的客户端
  • Broker:一个从生产者承受并存储音讯的客户端
  • Consumer : 消费者从 Broker 中读取音讯

在大型零碎中,会须要和很多子系统做交互,也须要消息传递,在诸如此类零碎中,你会找到源零碎(音讯发送方)和 目标零碎(音讯接管方)。为了在这样的音讯零碎中传输数据,你须要有适合的数据管道

这种数据的交互看起来就很凌乱,如果咱们应用消息传递零碎,那么零碎就会变得更加简略和整洁

  • Kafka 运行在一个或多个数据中心的服务器上作为集群运行
  • Kafka 集群存储音讯记录的目录被称为 topics
  • 每一条音讯记录蕴含三个因素:键(key)、值(value)、工夫戳(Timestamp)

外围 API

Kafka 有四个外围 API,它们别离是

  • Producer API,它容许应用程序向一个或多个 topics 上发送音讯记录
  • Consumer API,容许应用程序订阅一个或多个 topics 并解决为其生成的记录流
  • Streams API,它容许应用程序作为流处理器,从一个或多个主题中生产输出流并为其生成输入流,无效的将输出流转换为输入流。
  • Connector API,它容许构建和运行将 Kafka 主题连贯到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕捉对表的所有更改

Kafka 基本概念

Kafka 作为一个高度可扩大可容错的音讯零碎,它有很多基本概念,上面就来认识一下这些 Kafka 专属的概念

topic

Topic 被称为主题,在 kafka 中,应用一个类别属性来划分音讯的所属类,划分音讯的这个类称为 topic。topic 相当于音讯的调配标签,是一个逻辑概念。主题好比是数据库的表,或者文件系统中的文件夹。

partition

partition 译为分区,topic 中的音讯被宰割为一个或多个的 partition,它是一个物理概念,对应到零碎上的就是一个或若干个目录,一个分区就是一个 提交日志。音讯以追加的模式写入分区,先后以程序的形式读取。

留神:因为一个主题蕴含无数个分区,因而无奈保障在整个 topic 中有序,然而单个 Partition 分区能够保障有序。音讯被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性

分区能够散布在不同的服务器上,也就是说,一个主题能够逾越多个服务器,以此来提供比单个服务器更弱小的性能。

segment

Segment 被译为段,将 Partition 进一步细分为若干个 segment,每个 segment 文件的大小相等。

broker

Kafka 集群蕴含一个或多个服务器,每个 Kafka 中服务器被称为 broker。broker 接管来自生产者的音讯,为音讯设置偏移量,并提交音讯到磁盘保留。broker 为消费者提供服务,对读取分区的申请作出响应,返回曾经提交到磁盘上的音讯。

broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了 集群控制器 (Leader) 的角色,它是由集群中的沉闷成员选举进去的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包含将分区调配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,然而一个分区能够调配给多个 broker(非 Leader),这时候会产生分区复制。这种复制的机制为分区提供了音讯冗余,如果一个 broker 生效,那么其余沉闷用户会从新选举一个 Leader 接管。

producer

生产者,即音讯的发布者,其会将某 topic 的音讯公布到相应的 partition 中。生产者在默认状况下把音讯平衡地散布到主题的所有分区上,而并不关怀特定音讯会被写到哪个分区。不过,在某些状况下,生产者会把音讯间接写到指定的分区。

consumer

消费者,即音讯的使用者,一个消费者能够生产多个 topic 的音讯,对于某一个 topic 的音讯,其只会生产同一个 partition 中的音讯

在理解完 Kafka 的基本概念之后,咱们通过搭建 Kafka 集群来进一步粗浅认识一下 Kafka。

确保装置环境

装置 Java 环境

在装置 Kafka 之前,先确保 Linux 环境上是否有 Java 环境,应用 java -version 命令查看 Java 版本,举荐应用 Jdk 1.8,如果没有装置 Java 环境的话,能够依照这篇文章进行装置(https://www.cnblogs.com/zs-no…)

装置 Zookeeper 环境

Kafka 的底层应用 Zookeeper 贮存元数据,确保一致性,所以装置 Kafka 前须要先装置 Zookeeper,Kafka 的发行版自带了 Zookeeper,能够间接应用脚本来启动,不过装置一个 Zookeeper 也不吃力

Zookeeper 单机搭建

Zookeeper 单机搭建比较简单,间接从

https://www.apache.org/dyn/cl…

官网下载一个稳固版本的 Zookeeper,这里我应用的是 3.4.10,下载实现后,在 Linux 零碎中的 /usr/local 目录下创立 zookeeper 文件夹,应用xftp 工具(xftp 和 xshell 工具都能够在官网

www.netsarang.com/zh/xshell/

申请收费的家庭版)把下载好的 zookeeper 压缩包放到 /usr/local/zookeeper 目录下。

如果下载的是一个 tar.gz 包的话,间接应用 tar -zxvf zookeeper-3.4.10.tar.gz解压即可

如果下载的是 zip 包的话,还要检查一下 Linux 中是否有 unzip 工具,如果没有的话,应用 yum install unzip 装置 zip 解压工具,实现后应用 unzip zookeeper-3.4.10.zip 解压即可。

解压实现后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10,创立一个 data 文件夹,而后进入到 conf 文件夹下,应用 mv zoo_sample.cfg zoo.cfg 进行重命名操作

而后应用 vi 关上 zoo.cfg,更改一下dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data,保留。

进入 bin 目录,启动服务输出命令 ./zkServer.sh start 输入上面内容示意搭建胜利

敞开服务输出命令,./zkServer.sh stop

应用 ./zkServer.sh status 能够查看状态信息。

Zookeeper 集群搭建

筹备条件

筹备条件:须要三个服务器,这里我应用了 CentOS7 并装置了三个虚拟机,并为各自的虚拟机调配了 1GB 的内存,在每个 /usr/local/ 上面新建 zookeeper 文件夹,把 zookeeper 的压缩包挪过去,解压,实现后会有 zookeeper-3.4.10 文件夹,进入到文件夹,新建两个文件夹,别离是 datalog 文件夹

注:上一节单机搭建中曾经创立了一个 data 文件夹,就不须要从新创立了,间接新建一个 log 文件夹,对另外两个新增的服务须要新建这两个文件夹。

设置集群

新建实现后,须要编辑 conf/zoo.cfg 文件,三个文件的内容如下

作者:程序员 cxuan
链接:https://juejin.cn/post/691713…
起源:掘金
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888

server.1 中的这个 1 示意的是服务器的标识也能够是其余数字,示意这是第几号服务器,这个标识要和上面咱们配置的 myid 的标识统一能够。

192.168.1.7:12888:13888 为集群中的 ip 地址,第一个端口示意的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是 leader 选举的端口,集群刚启动的时候选举或者 leader 挂掉之后进行新的选举的端口,默认是 3888

当初对下面的配置文件进行解释

tickTime: 这个工夫是作为 Zookeeper 服务器之间或客户端与服务器之间 维持心跳 的工夫距离,也就是每个 tickTime 工夫就会发送一个心跳。

initLimit:这个配置项是用来配置 Zookeeper 承受客户端(这里所说的客户端不是用户连贯 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连贯到 Leader 的 Follower 服务器)初始化连贯时最长能忍耐多少个心跳工夫距离数。当曾经超过 5 个心跳的工夫(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连贯失败。总的工夫长度就是 5*2000=10 秒

syncLimit: 这个配置项标识 Leader 与 Follower 之间发送音讯,申请和应答工夫长度,最长不能超过多少个 tickTime 的工夫长度,总的工夫长度就是 5 *2000=10 秒

dataDir: 快照日志的存储门路

dataLogDir: 事务日志的存储门路,如果不配置这个那么事务日志会默认存储到 dataDir 指定的目录,这样会重大影响 zk 的性能,当 zk 吞吐量较大的时候,产生的事务日志、快照日志太多

clientPort: 这个端口就是客户端连贯 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,承受客户端的拜访申请。

创立 myid 文件

在理解完其配置文件后,当初来创立每个集群节点的 myid,咱们下面说过,这个 myid 就是 server.1 的这个 1,相似的,须要为集群中的每个服务都指定标识,应用 echo 命令进行创立

server.1

echo “1” > /usr/local/zookeeper/zookeeper-3.4.10/data/myid

server.2

echo “2” > /usr/local/zookeeper/zookeeper-3.4.10/data/myid

server.3

echo “3” > /usr/local/zookeeper/zookeeper-3.4.10/data/myid

启动服务并测试

配置实现,为每个 zk 服务启动并测试,我在 windows 电脑的测试后果如下

启动服务(每台都须要执行)

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

查看服务状态

应用 ./zkServer.sh status 命令查看服务状态

192.168.1.7 — follower

192.168.1.8 — leader

192.168.1.9 — follower

zk 集群个别只有一个 leader,多个 follower,主个别是相应客户端的读写申请,而从主同步数据,当主挂掉之后就会从 follower 里投票选举一个 leader 进去。

Kafka 集群搭建

筹备条件

  • 搭建好的 Zookeeper 集群
  • Kafka 压缩包(https://www.apache.org/dyn/cl…)

/usr/local 下新建 kafka 文件夹,而后把下载实现的 tar.gz 包移到 /usr/local/kafka 目录下,应用 tar -zxvf 压缩包 进行解压,解压实现后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下

咱们能够看到有很多 properties 配置文件,这里次要关注 server.properties 这个文件即可。

kafka 启动形式有两种,一种是应用 kafka 自带的 zookeeper 配置文件来启动(能够依照官网来进行启动,并应用单个服务多个节点来模仿集群 http://kafka.apache.org/quick…),一种是通过应用独立的 zk 集群来启动,这里举荐应用第二种形式,应用 zk 集群来启动

批改配置项

须要为 每个服务 都批改一下配置项,也就是server.properties,须要更新和增加的内容有

broker.id=0 // 初始是 0,每个 server 的 broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务别离设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

在 log.retention.hours=168 上面新增上面三项

message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

设置 zookeeper 的连贯端口

zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

配置项的含意

broker.id=0 #以后机器在集群中的惟一标识,和 zookeeper 的 myid 性质一样
port=9092 #以后 kafka 对外提供服务的端口默认是 9092
host.name=192.168.1.7 #这个参数默认是敞开的,在 0.8.1 有个 bug,DNS 解析问题,失败率的问题。
num.network.threads=3 #这个是 borker 进行网络解决的线程数
num.io.threads=8 #这个是 borker 进行 I / O 解决的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #音讯寄存的目录,这个目录能够配置为“,”逗号宰割的表达式,下面的 num.io.threads 要大于这个目录的个数这个目录,如果配置多个目录,新创建的 topic 他把音讯长久化的中央是,以后以逗号宰割的目录中,那个分区数起码就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区 buffer 大小,数据不是一下子就发送的,先回存储到缓冲区了达到肯定的大小后在发送,能进步性能
socket.receive.buffer.bytes=102400 #kafka 接收缓冲区大小,当数据达到肯定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向 kafka 申请音讯或者向 kafka 发送音讯的请申请的最大数,这个值不能超过 java 的堆栈大小
num.partitions=1 #默认的分区数,一个 topic 默认 1 个分区数
log.retention.hours=168 #默认音讯的最大长久化工夫,168 小时,7 天
message.max.byte=5242880 #音讯保留的最大值 5M
default.replication.factor=2 #kafka 保留音讯的正本数,如果一个正本生效了,另一个还能够持续提供服务
replica.fetch.max.bytes=5242880 #取音讯的最大间接数
log.segment.bytes=1073741824 #这个参数是:因为 kafka 的音讯是以追加的模式落地到文件,当超过这个值的时候,kafka 会新起一个文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去查看下面配置的 log 生效工夫(log.retention.hours=168),到目录查看是否有过期的音讯如果有,删除
log.cleaner.enable=false #是否启用 log 压缩,个别不必启用,启用的话能够进步性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置 zookeeper 的连贯端口

 启动 Kafka 集群并测试

启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下

启动后盾过程

./kafka-server-start.sh -daemon ../config/server.properties

查看服务是否启动

执行命令 jps

6201 QuorumPeerMain
7035 Jps
6972 Kafka

  • kafka 曾经启动
  • 创立 Topic 来验证是否创立胜利

    cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下

    bin/kafka-topics.sh –create –zookeeper 192.168.1.7:2181 –replication-factor 2 –partitions 1 –topic cxuan

    对下面的解释

    –replication-factor 2 复制两份

    –partitions 1 创立 1 个分区

    –topic 创立主题

    查看咱们的主题是否出创立胜利

    bin/kafka-topics.sh –list –zookeeper 192.168.1.7:2181

    启动一个服务就能把集群启动起来

    在一台机器上创立一个发布者

创立一个 broker,发布者

./kafka-console-producer.sh –broker-list 192.168.1.7:9092 –topic cxuantopic

在一台服务器上创立一个订阅者

创立一个 consumer,消费者

bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.7:9092 –topic cxuantopic –from-beginning

留神:这里应用 –zookeeper 的话可能呈现 zookeeper is not a recognized option 的谬误,这是因为 kafka 版本太高,须要应用 --bootstrap-server 指令

测试后果

公布

生产

其余命令

显示 topic

作者:程序员 cxuan
链接:https://juejin.cn/post/691713…
起源:掘金
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。

bin/kafka-topics.sh –list –zookeeper 192.168.1.7:2181

显示

cxuantopic

查看 topic 状态

bin/kafka-topics.sh –describe –zookeeper 192.168.1.7:2181 –topic cxuantopic

上面是显示的详细信息

Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

分区为为 1 复制因子为 2 主题 cxuantopic 的分区为 0

Replicas: 0,1 复制的为 1,2

Leader 负责给定分区的所有读取和写入的节点,每个节点都会通过随机抉择成为 leader。

Replicas 是为该分区复制日志的节点列表,无论它们是 Leader 还是以后处于活动状态。

Isr 是同步正本的汇合。它是正本列表的子集,以后仍处于活动状态并追寻 Leader。

至此,kafka 集群搭建结束。

验证多节点接收数据

刚刚咱们都应用的是 雷同的 ip 服务,上面应用其余集群中的节点,验证是否可能承受到服务

在另外两个节点上应用

bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.7:9092 –topic cxuantopic –from-beginning

 而后再应用 broker 进行音讯发送,经测试三个节点都能够承受到音讯。

配置详解

在搭建 Kafka 的时候咱们简略介绍了一下 server.properties 中配置的含意,当初咱们来具体介绍一下参数的配置和概念

惯例配置

这些参数是 kafka 中最根本的配置

  • broker.id

每个 broker 都须要有一个标识符,应用 broker.id 来示意。它的默认值是 0,它能够被设置成其余任意整数,在集群中须要保障每个节点的 broker.id 都是惟一的。

  • port

如果应用配置样本来启动 kafka,它会监听 9092 端口,批改 port 配置参数能够把它设置成其余任意可用的端口。

  • zookeeper.connect

用于保留 broker 元数据的地址是通过 zookeeper.connect 来指定。localhost:2181 示意运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含意如下:

hostname 是 zookeeper 服务器的服务名或 IP 地址

port 是 zookeeper 连贯的端口

/path 是可选的 zookeeper 门路,作为 Kafka 集群的 chroot 环境。如果不指定,默认应用跟门路

  • log.dirs

Kafka 把音讯都保留在磁盘上,寄存这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统门路。如果指定了多个门路,那么 broker 会依据 “ 起码应用 ” 准则,把同一分区的日志片段保留到同一门路下。要留神,broker 会向领有起码数目分区的门路新增分区,而不是向领有最小磁盘空间的门路新增分区。

  • num.recovery.threads.per.data.dir

对于如下 3 种状况,Kafka 会应用可配置的线程池来解决日志片段

服务器失常启动,用于关上每个分区的日志片段;

服务器解体后启动,用于检查和截断每个分区的日志片段;

服务器失常敞开,用于敞开日志片段

默认状况下,每个日志目录只应用一个线程。因为这些线程只是在服务器启动和敞开时会用到,所以齐全能够设置大量的线程来达到井行操作的目标。特地是对于蕴含大量分区的服务器来说,一旦产生崩愤,在进行复原时应用井行操作可能会省下数小时的工夫。设置此参数时须要留神,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个门路,那么总共须要 24 个线程。

  • auto.create.topics.enable

默认状况下,Kafka 会在如下 3 种状况下创立主题

当一个生产者开始往主题写入音讯时

当一个消费者开始从主题读取音讯时

当任意一个客户向主题发送元数据申请时

  • delete.topic.enable

如果你想要删除一个主题,你能够应用主题管理工具。默认状况下,是不容许删除主题的,delete.topic.enable 的默认值是 false 因而你不能随便删除主题。这是对生产环境的合理性爱护,然而在开发环境和测试环境,是能够容许你删除主题的,所以,如果你想要删除主题,须要把 delete.topic.enable 设为 true。

主题默认配置

Kafka 为新创建的主题提供了很多默认配置参数,上面就来一起认识一下这些参数

  • num.partitions

num.partitions 参数指定了新创建的主题须要蕴含多少个分区。如果启用了主题主动创立性能(该性能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要留神,咱们能够减少主题分区的个数,但不能缩小分区的个数。

  • default.replication.factor

这个参数比较简单,它示意 kafka 保留音讯的正本数,如果一个正本生效了,另一个还能够持续提供服务 default.replication.factor 的默认值为 1,这个参数在你启用了主题主动创立性能后无效。

  • log.retention.ms

Kafka 通常依据工夫来决定数据能够保留多久。默认应用 log.retention.hours 参数来配置工夫,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms。这三个参数作用是一样的,都是决定音讯多久当前被删除,举荐应用 log.retention.ms。

  • log.retention.bytes

另一种保留音讯的形式是判断音讯是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个蕴含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多能够保留 8GB 数据。所以,当主题的分区个数减少时,整个主题能够保留的数据也随之减少。

  • log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个音讯上。当音讯达到 broker 时,它们被追加到分区的以后日志片段上,当日志片段大小达到 log.segment.bytes 指定下限(默认为 1GB)时,以后日志片段就会被敞开,一个新的日志片段被关上。如果一个日志片段被敞开,就开始期待过期。这个参数的值越小,就越会频繁的敞开和调配新文件,从而升高磁盘写入的整体效率。

  • log.segment.ms

下面提到日志片段经敞开后需期待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被敞开的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或工夫达到下限时被敞开,就看哪个条件先失去满足。

  • message.max.bytes

broker 通过设置 message.max.bytes 参数来限度单个音讯的大小,默认是 1000 000,也就是 1MB,如果生产者尝试发送的音讯超过这个大小,不仅音讯不会被接管,还会收到 broker 返回的谬误音讯。跟其余与字节相干的配置参数一样,该参数指的是压缩后的音讯大小,也就是说,只有压缩后的音讯小于 mesage.max.bytes,那么音讯的理论大小能够大于这个值

这个值对性能有显著的影响。值越大,那么负责解决网络连接和申请的线程就须要花越多的工夫来解决这些申请。它还会减少磁盘写入块的大小,从而影响 IO 吞吐量。

作者:程序员 cxuan

正文完
 0