一、Kafka 是什么?
有人说世界上有三个平凡的创造:火,轮子,以及 Kafka。
倒退到当初,Apache Kafka 无疑是很胜利的,Confluent 公司曾示意世界五百强中有三分之一的企业在应用 Kafka。在流式计算中,Kafka 个别用来缓存数据,例如 Flink 通过生产 Kafka 的数据进行计算。
对于 Kafka,咱们最先须要理解的是以下四点:
- Apache Kafka 是一个开源 「音讯」 零碎,由 Scala 写成。是由 Apache 软件基金会开发的 一个开源音讯零碎我的项目。
- Kafka 最后是由 LinkedIn 公司开发,用作 LinkedIn 的流动流(Activity Stream)和经营数据处理管道(Pipeline)的根底,当初它已被多家不同类型的公司作为多种类型的数据管道和音讯零碎应用。
- 「Kafka 是一个分布式音讯队列」。Kafka 对音讯保留时依据 Topic 进行归类,发送音讯 者称为 Producer,音讯接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个 实例 (server) 称为 broker。
- 无论是 kafka 集群,还是 consumer 都依赖于 「Zookeeper」 集群保留一些 meta 信息,来保证系统可用性。
二、为什么要有 Kafka?
「kafka」 之所以受到越来越多的青眼,与它所表演的三大角色是分不开的的:
- 「音讯零碎」:kafka 与传统的消息中间件都具备零碎解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等性能。与此同时,kafka 还提供了大多数音讯零碎难以实现的音讯程序性保障及回溯性生产的性能。
- 「存储系统」:kafka 把音讯长久化到磁盘,相比于其余基于内存存储的零碎而言,无效的升高了音讯失落的危险。这得益于其音讯长久化和多正本机制。也能够将 kafka 作为长期的存储系统来应用,只须要把对应的数据保留策略设置为“永恒”或启用主题日志压缩性能。
- 「流式解决平台」:kafka 为风行的流式解决框架提供了牢靠的数据起源,还提供了一个残缺的流式解决框架,比方窗口、连贯、变换和聚合等各类操作。
Kafka 个性分布式具备经济、疾速、牢靠、易裁减、数据共享、设施共享、通信不便、灵便等,分布式所具备的个性高吞吐量同时为数据生产者和消费者进步吞吐量高可靠性反对多个消费者,当某个消费者失败的时候,可能主动负载平衡离线能将音讯长久化,进行批量处了解耦作为各个系统连贯的桥梁,防止零碎之间的耦合
三、Kafka 基本概念
在深刻了解 Kafka 之前,能够先理解下 Kafka 的基本概念。
一个典型的 Kafka 蕴含若干 Producer、若干 Broker、若干 Consumer 以及一个 Zookeeper 集群。Zookeeper 是 Kafka 用来负责集群元数据管理、控制器选举等操作的。Producer 是负责将音讯发送到 Broker 的,Broker 负责将音讯长久化到磁盘,而 Consumer 是负责从 Broker 订阅并生产音讯。Kafka 体系结构如下所示:
概念一:生产者(Producer)与消费者(Consumer)
生产者和消费者
对于 Kafka 来说客户端有两种根本类型:「生产者」(Producer)和 「消费者」(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式解决的「Kafka Streams」 等高阶客户端,但这些高阶客户端底层依然是生产者和消费者 API,只不过是在下层做了封装。
- 「Producer」:音讯生产者,就是向 Kafka broker 发消息的客户端;
- 「Consumer」:音讯消费者,向 Kafka broker 取音讯的客户端;
概念二:Broker 和集群(Cluster)
一个 Kafka 服务器也称为 「Broker」,它承受生产者发送的音讯并存入磁盘;Broker 同时服务消费者拉取分区音讯的申请,返回目前曾经提交的音讯。应用特定的机器硬件,一个 Broker 每秒能够解决成千上万的分区和百万量级的音讯。
若干个 Broker 组成一个 「集群」(「Cluster」),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包含调配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区能够被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时能够将其分区重新分配到其余 Broker 来负责。下图是一个样例:
Broker 和集群(Cluster)
概念三:主题(Topic)与分区(Partition)
主题(Topic)与分区(Partition)
在 Kafka 中,音讯以 「主题」(「Topic」)来分类,每一个主题都对应一个「「音讯队列」」,这有点儿相似于数据库中的表。然而如果咱们把所有同类的音讯都塞入到一个“核心”队列中,势必短少可伸缩性,无论是生产者 / 消费者数目的减少,还是音讯数量的减少,都可能耗尽零碎的性能或存储。
咱们应用一个生存中的例子来阐明:当初 A 城市生产的某商品须要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「当初 C 城市也要往 B 城市运输货色」这样的状况下都会呈现「吞吐量有余」的问题。所以咱们当初引入 「分区」(「Partition」)的概念,相似“容许多修几条道”的形式对咱们的主题实现了程度扩大。
四、Kafka 工作流程剖析
4.1 Kafka 生产过程剖析
4.1.1 写入形式
producer 采纳推(push)模式将音讯公布到 broker,每条音讯都被追加(append)到分区(patition)中,属于程序写磁盘(程序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)
4.1.2 分区(Partition)
音讯发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些 Partition Logs(分区日志)组成,其组织构造如下图所示:
咱们能够看到,每个 Partition 中的音讯都是 「有序」 的,生产的音讯被一直追加到 Partition log 上,其中的每一个音讯都被赋予了一个惟一的 「offset」 值。
「1)分区的起因」
- 不便在集群中扩大,每个 Partition 能够通过调整以适应它所在的机器,而一个 topic 又能够有多个 Partition 组成,因而整个集群就能够适应任意大小的数据了;
- 能够进步并发,因为能够以 Partition 为单位读写了。
「2)分区的准则」
- 指定了 patition,则间接应用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个 patition;
- patition 和 key 都未指定,应用轮询选出一个 patition。
DefaultPartitioner 类
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
4.1.3 正本(Replication)
同 一 个 partition 可 能 会 有 多 个 replication(对 应 server.properties 配 置 中 的 default.replication.factor=N)。没有 replication 的状况下,一旦 broker 宕机,其上所有 patition 的数据都不可被生产,同时 producer 也不能再将数据存于其上的 patition。引入 replication 之后,同一个 partition 可能会有多个 replication,而这时须要在这些 replication 之间选出一 个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据。
4.1.4 写入流程
producer 写入音讯流程如下:
1)producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader;
2)producer 将音讯发送给该 leader;
3)leader 将音讯写入本地 log;
4)followers 从 leader pull 音讯,写入本地 log 后向 leader 发送 ACK;
5)leader 收到所有 ISR 中的 replication 的 ACK 后,减少 HW(high watermark,最初 commit 的 offset)并向 producer 发送 ACK;
4.2 Broker 保留音讯
4.2.1 存储形式
物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配 置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有音讯和索引文 件),如下:
[root@hadoop102 logs]$ ll
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:37 first-0
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:35 first-1
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:37 first-2
[root@hadoop102 logs]$ cd first-0
[root@hadoop102 first-0]$ ll
-rw-rw-r–. 1 demo demo 10485760 8 月 6 14:33 00000000000000000000.index
-rw-rw-r–. 1 demo demo 219 8 月 6 15:07 00000000000000000000.log
-rw-rw-r–. 1 demo demo 10485756 8 月 6 14:33 00000000000000000000.timeindex
-rw-rw-r–. 1 demo demo 8 8 月 6 14:37 leader-epoch-checkpoint
4.2.2 存储策略
无论音讯是否被生产,kafka 都会保留所有音讯。有两种策略能够删除旧数据:
- 基于工夫:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
须要留神的是,因为 Kafka 读取特定音讯的工夫复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与进步 Kafka 性能无关。
4.2.3 Zookeeper 存储构造
留神:producer 不在 zk 中注册,消费者在 zk 中注册。
4.3 Kafka 生产过程剖析
kafka 提供了两套 consumer API:高级 Consumer API 和低级 Consumer API。
4.3.1 高级 API
「1)高级 API 长处」
- 高级 API 写起来简略
- 不须要自行去治理 offset,零碎通过 zookeeper 自行治理。
- 不须要治理分区,正本等状况,零碎主动治理。
- 消费者断线会主动依据上一次记录在 zookeeper 中的 offset 去接着获取数据(默认设置 1 分钟更新一下 zookeeper 中存的 offset)
- 能够应用 group 来辨别对同一个 topic 的不同程序拜访拆散开来(不同的 group 记录不同的 offset,这样不同程序读取同一个 topic 才不会因为 offset 相互影响)
「2)高级 API 毛病」
- 不能自行管制 offset(对于某些非凡需要来说)
- 不能细化管制如分区、正本、zk 等
4.3.2 低级 API
「1)低级 API 长处」
- 可能让开发者本人管制 offset,想从哪里读取就从哪里读取。
- 自行管制连贯分区,对分区自定义进行负载平衡
- 对 zookeeper 的依赖性升高(如:offset 不肯定非要靠 zk 存储,自行存储 offset 即可,比方存在文件或者内存中)
「2)低级 API 毛病」
- 太过简单,须要自行管制 offset,连贯哪个分区,找到分区 leader 等。
4.3.3 消费者组
消费者是以 consumer group 消费者组的形式工作,由一个或者多个消费者组成一个组,独特生产一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,然而多个 group 能够同时生产这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个别离读取一个分区。某个消费者读取某个分区,也能够叫做某个消费者是某个分区的拥有者。
在这种状况下,消费者能够通过程度扩大的形式同时读取大量的音讯。另外,如果一个消费者失败了,那么其余的 group 成员会主动负载平衡读取之前失败的消费者读取的分区。
4.3.4 生产形式
consumer 采纳 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应生产速率不同的消费者,因为音讯发送速率是由 broker 决定的。它的指标是尽可能以最快速度传递音讯,然而这样很容易造成 consumer 来不及解决音讯,典型的体现就是拒绝服务以及网络拥塞。而 pull 模式则能够依据 consumer 的生产能力以适当的速率生产音讯。
对于 Kafka 而言,pull 模式更适合,它可简化 broker 的设计,consumer 可自主管制生产 音讯的速率,同时 consumer 能够本人管制生产形式——即可批量生产也可逐条生产,同时还能抉择不同的提交形式从而实现不同的传输语义。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,始终期待数据 达到。为了防止这种状况,咱们在咱们的拉申请中有参数,容许消费者申请在期待数据达到 的“长轮询”中进行阻塞(并且可选地期待到给定的字节数,以确保大的传输大小)。
五、Kafka 装置
5.1 装置环境与前提条件
装置环境:Linux
前提条件:
Linux 零碎下装置好 jdk 1.8 以上版本,正确配置环境变量 Linux 零碎下装置好 scala 2.11 版本
装置 ZooKeeper(注:kafka 自带一个 Zookeeper 服务,如果不独自装置,也能够应用自带的 ZK)
5.2 装置步骤
Apache 基金会开源的这些软件基本上装置都比拟不便,只须要下载、解压、配置环境变量三步即可实现,kafka 也一样,官网抉择对应版本下载后间接解压到一个装置目录下就能够应用了,如果为了不便能够在~/.bashrc 里配置一下环境变量,这样应用的时候就不须要每次都切换到装置目录了。
具体可参考:Kafka 集群装置与环境测试
5.3 测试
接下来能够通过简略的 console 窗口来测试 kafka 是否装置正确。
「(1)首先启动 ZooKeeper 服务」
如果启动本人装置的 ZooKeeper,应用命令 zkServer.sh start 即可。
如果应用 kafka 自带的 ZK 服务,启动命令如下(启动之后 shell 不会返回,后续其余命令须要另开一个 Terminal):
$ cd /opt/tools/kafka #进入装置目录
$ bin/zookeeper-server-start.sh config/zookeeper.properties
「(2)第二步启动 kafka 服务」
启动 Kafka 服务的命令如下所示:
$ cd /opt/tools/kafka #进入装置目录
$ bin/kafka-server-start.sh config/server.properties
「(3)第三步创立一个 topic,假如为“test”」
创立 topic 的命令如下所示,其参数也都比拟好了解,顺次指定了依赖的 ZooKeeper,正本数量,分区数量,topic 的名字:
$ cd /opt/tools/kafka #进入装置目录
$ bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test1
创立实现后,能够通过如下所示的命令查看 topic 列表:
$ bin/kafka-topics.sh –list –zookeeper localhost:2181
「(4)开启 Producer 和 Consumer 服务」
kafka 提供了生产者和消费者对应的 console 窗口程序,能够先通过这两个 console 程序来进行验证。
首先启动 Producer:
$ cd /opt/tools/kafka #进入装置目录
$ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
而后启动 Consumer:
$ cd /opt/tools/kafka #进入装置目录
$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
在关上生产者服务的终端输出一些数据,回车后,在关上消费者服务的终端能看到生产者终端输出的数据,即阐明 kafka 装置胜利。
六、Apache Kafka 简略示例
6.1 创立音讯队列
kafka-topics.sh –create –zookeeper 192.168.56.137:2181 –topic test –replication-factor 1 –partitions 1
6.2 pom.xml
<!– https://mvnrepository.com/art… –>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
6.3 生产者
package com.njbdqn.services;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
- @author:Tokgo J
- @date:2020/9/11
- @aim:生产者:往 test 音讯队列写入音讯
*/
public class MyProducer {
public static void main(String[] args) {
// 定义配置信息
Properties prop = new Properties();
// kafka 地址,多个地址用逗号宰割 "192.168.23.76:9092,192.168.23.77:9092"
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String> prod = new KafkaProducer<String, String>(prop);
// 发送音讯
try {for(int i=0;i<10;i++) {
// 生产者记录音讯
ProducerRecord<String, String> pr = new ProducerRecord<String, String>("test", "hello world"+i);
prod.send(pr);
Thread.sleep(500);
}
} catch (InterruptedException e) {e.printStackTrace();
} finally {prod.close();
}
}
}
留神:
- kafka 如果是集群,多个地址用逗号宰割(,);
- Properties 的 put 办法,第一个参数能够是字符串,如:p.put(“bootstrap.servers”,”192.168.23.76:9092″);
- kafkaProducer.send(record)能够通过返回的 Future 来判断是否曾经发送到 kafka,加强音讯的可靠性。同时也能够应用 send 的第二个参数来回调,通过回调判断是否发送胜利;
- p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 设置序列化类,能够写类的全门路。
6.4 消费者
package com.njbdqn.services;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
- @author:Tokgo J
- @date:2020/9/11
- @aim:消费者:读取 kafka 数据
*/
public class MyConsumer {
public static void main(String[] args) {Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.137:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put("session.timeout.ms", "30000");
// 消费者是否主动提交偏移量,默认是 true 避免出现反复数据 设为 false
prop.put("enable.auto.commit", "false");
prop.put("auto.commit.interval.ms", "1000");
//auto.offset.reset 消费者在读取一个没有偏移量的分区或者偏移量有效的状况下的解决
//earliest 在偏移量有效的状况下 消费者将从起始地位读取分区的记录
//latest 在偏移量有效的状况下 消费者将从最新地位读取分区的记录
prop.put("auto.offset.reset", "earliest");
// 设置组名
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
KafkaConsumer<String, String> con = new KafkaConsumer<String, String>(prop);
con.subscribe(Collections.singletonList("test"));
while (true) {ConsumerRecords<String, String> records = con.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> rec : records) {System.out.println(String.format("offset:%d,key:%s,value:%s", rec.offset(), rec.key(), rec.value()));
}
}
}
}
留神:
- 订阅音讯能够订阅多个主题;
- ConsumerConfig.GROUP_ID_CONFIG 示意消费者的分组,kafka 依据分组名称判断是不是同一组消费者,同一组消费者去生产一个主题的数据的时候,数据将在这一组消费者下面轮询;
- 主题波及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者生产。呈现分区小于消费者个数的时候,能够动静减少分区;
- 留神和生产者的比照,Properties 中的 key 和 value 是反序列化,而生产者是序列化。
七、参考
朱小厮:《深刻了解 Kafka: 外围设计与实际原理》
宇宙湾:《Apache Kafka 分布式音讯队列框架》
须要上述参考资料或者是想更多 kafka 相干参考资料的读者能够关注公众号【Java 斗帝】回复 666 即可收费获取
八、举荐观看:
视频解说:对于【暴力递归算法】你所不晓得的思路
价值 2W 多的程序员职业规划,真香
看完三件事❤️
========
如果你感觉这篇内容对你还蛮有帮忙,我想邀请你帮我三个小忙:
点赞,转发,有你们的『点赞和评论』,才是我发明的能源。
关注公众号『Java 斗帝』,不定期分享原创常识。
同时能够期待后续文章 ing????