一、简介
Apache Kafka 是一个分布式的流解决平台(分布式的基于公布/订阅模式的音讯队列【Message Queue】)。
流解决平台有以下3个个性:
- 能够让你公布和订阅流式的记录。这一方面与音讯队列或者企业音讯零碎相似。
- 能够贮存流式的记录,并且有较好的容错性。
- 能够在流式记录产生时就进行解决。
1.1 音讯队列的两种模式
1.1.1 点对点模式
生产者将音讯发送到queue中,而后消费者从queue中取出并且生产音讯。音讯被生产当前,queue中不再存储,所以消费者不可能生产到曾经被生产的音讯。Queue反对存在多个消费者,然而对一个音讯而言,只能被一个消费者生产。
1.1.2 公布/订阅模式
生产者将音讯公布到topic中,同时能够有多个消费者订阅该音讯。和点对点形式不同,公布到topic的音讯会被所有订阅者生产。
1.2 Kafka 适宜什么样的场景
它能够用于两大类别的利用:
- 结构实时流数据管道,它能够在零碎或利用之间牢靠地获取数据。(相当于message queue)。
- 构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流解决,通过kafka stream topic和topic之间外部进行变动)。
为了了解Kafka是如何做到以上所说的性能,从上面开始,咱们将深刻摸索Kafka的个性。
首先是一些概念:
- Kafka作为一个集群,运行在一台或者多台服务器上。
- Kafka 通过 topic 对存储的流数据进行分类。
- 每条记录中蕴含一个key,一个value和一个timestamp(工夫戳)。
1.3 主题和分区
Kafka的音讯通过主题(Topic)进行分类,就好比是数据库的表,或者是文件系统里的文件夹。主题能够被分为若干个分区(Partition),一个分区就是一个提交日志。音讯以追加的形式写入分区,而后以先进先出的程序读取。留神,因为一个主题个别蕴含几个分区,因而无奈在整个主题范畴内保障音讯的程序,但能够保障音讯在单个分区内的程序。主题是逻辑上的概念,在物理上,一个主题是横跨多个服务器的。
Kafka 集群保留所有公布的记录(无论他们是否已被生产),并通过一个可配置的参数——保留期限来管制(能够同时配置工夫和音讯大小,以较小的那个为准)。举个例子, 如果保留策略设置为2天,一条记录公布后两天内,能够随时被生产,两天过后这条记录会被摈弃并开释磁盘空间。
有时候咱们须要减少分区的数量,比方为了扩大主题的容量、升高单个分区的吞吐量或者要在单个消费者组内运行更多的消费者(因为一个分区只能由消费者组里的一个消费者读取)。从消费者的角度来看,基于键的主题增加分区是很艰难的,因为分区数量扭转,键到分区的映射也会变动,所以对于基于键的主题来说,倡议在一开始就设置好分区,防止当前对其进行调整。
(留神:不能缩小分区的数量,因为如果删除了分区,分区外面的数据也一并删除了,导致数据不统一。如果肯定要缩小分区的数量,只能删除topic重建)
1.4 生产者和消费者
生产者(发布者)创立音讯,个别状况下,一个音讯会被公布到一个特定的主题上。生产者在默认状况下把音讯平衡的散布到主题的所有分区上,而并不关怀特定音讯会被写入哪个分区。不过,生产者也能够把音讯间接写到指定的分区。这通常通过音讯键和分区器来实现,分区器为键生成一个散列值,并将其映射到指定的分区上。生产者也能够自定义分区器,依据不同的业务规定将音讯映射到分区。
消费者(订阅者)读取音讯,消费者能够订阅一个或者多个主题,并依照音讯生成的程序读取它们。消费者通过查看音讯的偏移量来辨别曾经读取过的音讯。偏移量是一种元数据,它是一个一直递增的整数值,在创立音讯时,kafka会把它增加到音讯里。在给定的分区里,每个音讯的偏移量都是惟一的。消费者把每个分区最初读取的音讯偏移量保留在zookeeper或者kafka上,如果消费者敞开或者重启,它的读取状态不会失落。
消费者是消费者组的一部分,也就是说,会有一个或者多个生产独特读取一个主题。消费者组保障每个分区只能被同一个组内的一个消费者应用。如果一个消费者生效,群组里的其余消费者能够接管生效消费者的工作。
1.5 broker和集群
broker:一个独立的kafka服务器被称为broker。broker接管来自生产者的音讯,为音讯设置偏移量,并提交音讯到磁盘保留。broker为消费者提供服务,对读取分区的申请作出相应,返回曾经提交到磁盘上的音讯。
集群:交给同一个zookeeper集群来治理的broker节点就组成了kafka的集群。
broker是集群的组成部分,每个集群都有一个broker同时充当集群控制器的角色。控制器负责管理工作,包含将分区调配给broker和监控broker。在broker中,一个分区从属于一个broker,该broker被称为分区的领袖。一个分区能够调配给多个broker(Topic设置了多个正本的时候),这时会产生分区复制。如下图:
broker如何解决申请:broker会在它所监听的每个端口上运行一个Acceptor线程,这个线程会创立一个连贯并把它交给Processor线程去解决。Processor线程(也叫网络线程)的数量是可配的,Processor线程负责从客户端获取申请信息,把它们放进申请队列,而后从响应队列获取响应信息,并发送给客户端。如下图所示:
生产申请和获取申请都必须发送给分区的领袖正本(分区Leader)。如果broker收到一个针对特定分区的申请,而该分区的领袖在另外一个broker上,那么发送申请的客户端会收到一个“非分区领袖”的谬误响应。Kafka客户端要本人负责把生产申请和获取申请发送到正确的broker上。
客户端如何晓得该往哪里发送申请呢?客户端应用了另外一种申请类型——元数据申请。这种申请蕴含了客户端感兴趣的主题列表,服务器的响应音讯里指明了这些主题所蕴含的分区、每个分区都有哪些正本,以及哪个正本是领袖。元数据申请能够发给任意一个broker,因为所有的broker都缓存了这些信息。客户端缓存这些元数据,并且会定时从broker申请刷新这些信息。此外如果客户端收到“非领袖”谬误,它会在尝试从新发送申请之前,先刷新元数据。
1.6 Kafka 基础架构
二、Kafka架构深刻
2.1 Kafka工作流程及文件存储机制
2.1.1 工作流程
Kafka中音讯是以topic进行分类的,生产者生产音讯,消费者生产音讯,都是面向topic的。
Topic是逻辑上的概念,而partition(分区)是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被一直追加到该log文件末端,且每条数据都有本人的offset。消费者组中的每个消费者,都会实时记录本人生产到哪个offset,以便出错复原时,从上次的地位持续生产。
2.1.2 文件存储机制
因为生产者生产的音讯会一直追加到log文件开端,为避免log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。(由log.segment.bytes决定,管制每个segment的大小,也可通过log.segment.ms管制,指定多长时间后日志片段会被敞开)每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规定为:topic名称+分区序号。例如:bing这个topic有3个分区,则其对应的文件夹为:bing-0、bing-1和bing-2。
索引文件和日志文件命名规定:每个 LogSegment 都有一个基准偏移量,用来示意以后 LogSegment 中第一条音讯的 offset。偏移量是一个 64位的长整形数,固定是20位数字,长度未达到,用 0 进行填补。如下图所示:
index和log文件以以后segment的第一条音讯的offset命名。index文件记录的是数据文件的offset和对应的物理地位,正是有了这个index文件,能力对任一数据写入和查看领有O(1)的复杂度,index文件的粒度能够通过参数log.index.interval.bytes来管制,默认是是每过4096字节记录一条index。下图为index文件和log文件的构造示意图:
查找message的流程(比方要查找offset为170417的message):
- 首先用二分查找确定它是在哪个Segment文件中,其中0000000000000000000.index为最开始的文件,第二个文件为0000000000000170410.index(起始偏移为170410+1 = 170411),而第三个文件为0000000000000239430.index(起始偏移为239430+1 = 239431)。所以这个offset = 170417就落在第二个文件中。其余后续文件能够依此类推,以起始偏移量命名并排列这些文件,而后依据二分查找法就能够疾速定位到具体文件地位。
- 用该offset减去索引文件的编号,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那个编号。能够看出咱们可能找到[4,476]这组数据,476即offset=170410 + 4 = 170414的音讯在log文件中的偏移量。
- 关上数据文件(0000000000000170410.log),从地位为476的那个中央开始程序扫描直到找到offset为170417的那条Message。
2.1.3 数据过期机制
当日志片段大小达到log.segment.bytes指定的下限(默认是1GB)或者日志片段关上时长达到log.segment.ms时,以后日志片段就会被敞开,一个新的日志片段被关上。如果一个日志片段被敞开,就开始期待过期。以后正在写入的片段叫做沉闷片段,沉闷片段永远不会被删除,所以如果你要保留数据1天,然而片段蕴含5天的数据,那么这些数据就会被保留5天,因为片段被敞开之前,这些数据无奈被删除。
2.2 Kafka生产者
2.2.1 分区策略
- 多Partition分布式存储,利于集群数据的平衡。
- 并发读写,放慢读写速度。
- 放慢数据恢复的速率:当某台机器挂了,每个Topic仅需复原一部分的数据,多机器并发。
分区的准则
- 指明partition的状况下,应用指定的partition;
- 没有指明partition,然而有key的状况下,将key的hash值与topic的partition数进行取余失去partition值;
- 既没有指定partition,也没有key的状况下,第一次调用时随机生成一个整数(前面每次调用在这个整数上自增),将这个值与topic可用的partition数取余失去partition值,也就是常说的round-robin算法。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { //key为空时,获取一个自增的计数,而后对分区做取模失去分区编号 int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition // key不为空时,通过key的hash对分区取模(疑难:为什么这里不像下面那样,应用availablePartitions呢?) // 依据《Kafka权威指南》Page45了解:为了保障雷同的键,总是能路由到固定的分区,如果应用可用分区,那么因为分区数变动,会导致雷同的key,路由到不同分区 // 所以如果要应用key来映射分区,最好在创立主题的时候就把分区规划好 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }} private int nextValue(String topic) { //为每个topic保护了一个AtomicInteger对象,每次获取时+1 AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement();}
2.2.2 数据可靠性保障
kafka提供了哪些方面的保障
- kafka能够保障分区音讯的程序。如果应用同一个生产者往同一个分区写入音讯,而且音讯B在音讯A之后写入,那么kafka能够保障音讯B的偏移量比音讯A的偏移量大,而且消费者会先读取到音讯A再读取音讯B。
- 只有当音讯被写入分区的所有正本时,它才被认为是“已提交”的。生产者能够抉择接管不同类型的确认,比方在音讯被齐全提交时的确认、在音讯被写入分区领袖时的确认,或者在音讯被发送到网络时的确认。
- 只有还有一个正本是沉闷的,那么曾经提交的信息就不会失落。
- 消费者只能读取到曾经提交的音讯。
复制
Kafka的复制机制和分区的多正本架构是kafka可靠性保障的外围。把音讯写入多个正本能够使kafka在产生奔溃时仍能保障音讯的持久性。
kafka的topic被分成多个分区,分区是根本的数据块。每个分区能够有多个正本,其中一个是领袖。所有事件都是发给领袖正本,或者间接从领袖正本读取事件。其余正本只须要与领袖正本放弃同步,并及时复制最新的事件。
Leader保护了一个动静的in-sync replica set(ISR),意为和leader放弃同步的follower汇合。当ISR中的follower实现数据同步后,leader就会发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该工夫阈值由replica.lag.time.max.ms参数设定。Leader不可用时,将会从ISR中选举新的leader。满足以下条件能力被认为是同步的:
- 与zookeeper之间有一个沉闷的会话,也就是说,它在过来的6s(可配置)外向zookeeper发送过心跳。
- 在过来的10s(可配置)内从领袖那里获取过最新的数据。
影响Kafka音讯存储可靠性的配置
ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,可能容忍数据的大量失落,所以没有必要等ISR中的follower全副接管胜利。所以Kafka提供了三种可靠性级别,用户能够依据对可靠性和提早的要求进行衡量。acks:
- 0: producer不期待broker的ack,这一操作提供了一个最低的提早,broker一接管到还没写入磁盘就曾经返回,当broker故障时可能失落数据;
- 1: producer期待leader的ack,partition的leader落盘胜利后返回ack,如果在follower同步胜利之前leader故障,那么将会失落数据;
- -1(all):producer期待broker的ack,partition的leader和ISR里的follower全副落盘胜利后才返回ack。然而如果在follower同步实现后,broker发送ack之前,leader产生故障,那么会造成反复数据。(极其状况下也有可能丢数据:ISR中只有一个Leader时,相当于1的状况)。
生产一致性保障
(1)follower故障
follower产生故障后会被长期踢出ISR,待该follower复原后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的局部截取掉,从HW开始向leader进行同步。
等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就能够重新加入ISR了。
(2)leader故障
leader产生故障后,会从ISR中选出一个新的leader,之后为了保障多个正本之间的数据一致性,其余的follower会先将各自的log文件高于HW的局部截掉,而后从新的leader同步数据。
留神:这只能保障正本之间的数据一致性,并不能保证数据不失落或者不反复。
2.2.3 音讯发送流程
Kafka 的producer 发送音讯采纳的是异步发送的形式。在音讯发送过程中,波及到了两个线程——main线程和sender线程,以及一个线程共享变量——RecordAccumulator。main线程将音讯发送给RecordAccumulator,sender线程一直从RecordAccumulator中拉取音讯发送到Kafka broker。
为了提高效率,音讯被分批次写入kafka。批次就是一组音讯,这些音讯属于同一个主题和分区。(如果每一个音讯都独自穿行于网络,会导致大量的网络开销,把音讯分成批次传输能够缩小网络开销。不过要在时间延迟和吞吐量之间做出衡量:批次越大,单位工夫内解决的音讯就越多,单个音讯的传输工夫就越长)。批次数据会被压缩,这样能够晋升数据的传输和存储能力,但要做更多的计算解决。
相干参数:
- batch.size:只有数据积攒到batch.size后,sender才会发送数据。(单位:字节,留神:不是音讯个数)。
- linger.ms:如果数据迟迟未达到batch.size,sender期待 linger.ms之后也会发送数据。(单位:毫秒)。
- client.id:该参数能够是任意字符串,服务器会用它来辨认音讯的起源,还可用用在日志和配额指标里。
- max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前能够发送多少个音讯。它的值越高,就会占用越多的内存,不过也会晋升吞吐量。把它设置为1能够保障音讯时按发送的程序写入服务器的,即便产生了重试。
2.3 Kafka消费者
2.3.1 生产形式
consumer采纳pull(拉)的模式从broker中读取数据。
push(推)模式很难适应生产速率不同的消费者,因为音讯发送速率是由broker决定的。它的指标是尽可能以最快的速度传递音讯,然而这样容易造成consumer来不及解决音讯,典型的体现就是拒绝服务以及网络拥塞。而pull模式能够依据consumer的生产能力以适当的速率生产音讯。
pull模式的不足之处是,如果kafka没有数据,消费者可能会陷入循环中,始终返回空数据。针对这一点,kafka的消费者在生产数据时会传入一个时长参数timeout,如果以后没有数据可生产,consumer会期待一段时间后再返回。
2.3.2 分区调配策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会波及到partition的调配问题,即确定哪个partition由哪个consumer来生产。Kafka提供了3种消费者分区调配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor接口用于用户定义实现分区调配算法,以实现Consumer之间的分区调配。生产组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者抉择其中的一个消费者来执行这个生产组的分区调配并将调配后果转发给生产组内所有的消费者。Kafka默认采纳RangeAssignor的调配算法。
2.3.2.1 RangeAssignor
RangeAssignor对每个Topic进行独立的分区调配。对于每一个Topic,首先对分区依照分区ID进行排序,而后订阅这个Topic的生产组的消费者再进行排序,之后尽量平衡的将分区调配给消费者。这里只能是尽量平衡,因为分区数可能无奈被消费者数量整除,那么有一些消费者就会多调配到一些分区。调配示意图如下:
分区调配的算法如下:
@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); //for循环对订阅的多个topic别离进行解决 for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; //对消费者进行排序 Collections.sort(consumersForTopic); //计算均匀每个消费者调配的分区数 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //计算平均分配后多出的分区数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { //计算第i个消费者,调配分区的起始地位 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); //计算第i个消费者,调配到的分区数量 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment;}
这种调配形式显著的一个问题是随着消费者订阅的Topic的数量的减少,不平衡的问题会越来越重大,比方上图中4个分区3个消费者的场景,C0会多调配一个分区。如果此时再订阅一个分区数为4的Topic,那么C0又会比C1、C2多调配一个分区,这样C0总共就比C1、C2多调配两个分区了,而且随着Topic的减少,这个状况会越来越重大。调配后果:
订阅2个Topic,每个Topic4个分区,共3个Consumer
- C0:[T0P0,T0P1,T1P0,T1P1]
- C1:[T0P2,T1P2]
- C2:[T0P3,T1P3]
2.3.2.2 RoundRobinAssignor
RoundRobinAssignor的调配策略是将生产组内订阅的所有Topic的分区及所有消费者进行排序后尽量平衡的调配(RangeAssignor是针对单个Topic的分区进行排序调配的)。如果生产组内,消费者订阅的Topic列表是雷同的(每个消费者都订阅了雷同的Topic),那么调配后果是尽量平衡的(消费者之间调配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么调配后果是不保障“尽量平衡”的,因为某些消费者不参加一些Topic的调配。
以上两个topic的状况,相比于之前RangeAssignor的调配策略,能够使分区调配的更平衡。不过思考这种状况,假如有三个消费者别离为C0、C1、C2,有3个Topic T0、T1、T2,别离领有1、2、3个分区,并且C0订阅T0,C1订阅T0和T1,C2订阅T0、T1、T2,那么RoundRobinAssignor的调配后果如下:
看上去调配曾经尽量的保障平衡了,不过能够发现C2承当了4个分区的生产而C1订阅了T1,是不是把T1P1交给C1生产能更加的平衡呢?
2.3.2.3 StickyAssignor
StickyAssignor分区调配算法,目标是在执行一次新的调配时,能在上一次调配的后果的根底上,尽量少的调整分区调配的变动,节俭因分区调配变动带来的开销。Sticky是“粘性的”,能够了解为调配后果是带“粘性的”——每一次调配变更绝对上一次调配做起码的变动。其指标有两点:
- 分区的调配尽量的平衡。
- 每一次重调配的后果尽量与上一次调配后果保持一致。
当这两个指标发生冲突时,优先保障第一个指标。第一个指标是每个调配算法都尽量尝试去实现的,而第二个指标才真正体现出StickyAssignor个性的。
StickyAssignor算法比较复杂,上面举例来说明调配的成果(比照RoundRobinAssignor),前提条件:
- 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区。
- 有3个Consumer:C0、C1、C2,所有Consumer都订阅了这4个分区。
下面红色的箭头代表的是有变动的分区调配,能够看出,StickyAssignor的调配策略,变动较小。
2.3.3 offset的保护
因为Consumer在生产过程中可能会呈现断电宕机等故障,Consumer复原后,须要从故障前的地位持续生产,所以Consumer须要实时记录本人生产到哪个地位,以便故障复原后持续生产。Kafka0.9版本之前,Consumer默认将offset保留在zookeeper中,从0.9版本开始,Consumer默认将offset保留在Kafka一个内置的名字叫_consumeroffsets的topic中。默认是无奈读取的,能够通过设置consumer.properties中的exclude.internal.topics=false来读取。
2.3.4 kafka高效读写数据(理解)
程序写磁盘
Kafka 的 producer生产数据,要写入到log文件中,写的过程是始终追加到文件末端,为程序写。数据表明,同样的磁盘,程序写能到600M/s,而随机写只有100K/s。这与磁盘的机械构造无关,程序写之所以快,是因为其省去了大量磁头寻址的工夫。
零拷贝技术
零拷贝次要的工作就是防止CPU将数据从一块存储拷贝到另外一块存储,次要就是利用各种零拷贝技术,防止让CPU做大量的数据拷贝工作,缩小不必要的拷贝,或者让别的组件来做这一类简略的数据传输工作,让CPU解脱进去专一于别的工作。这样就能够让系统资源的利用更加无效。
参考文献
- Kafka中文文档
- [Kafka系列]之指定了一个offset,怎么查找到对应的音讯?
- 尚硅谷 Kafka 教程( Kafka 框架疾速入门)
- Kafka分区调配策略剖析——重点:StickyAssignor
- Kafka 日志存储
- 浅析Linux中的零拷贝技术
- 《Kafka权威指南》
作者:Li Xiaobing,来自vivo互联网技术团队