1.SocketServer
SocketServer作为Broker对外提供Socket服务的模块,次要用于接管socket连贯的申请,而后产生相应为之服务的SocketChannel对象。
外部次要包含三个模块:
Acceptor次要用于监听Socket连贯;
Processor次要用于转发Socket的申请和响应。
RequestChannel次要用于缓存Socket的申请和响应。
1.1Acceptor对象次要性能
(1)开启socket服务
(2)注册Accept事件
(3)监听此ServerChannel上的ACCEPT事件,当其产生时,将其以轮询的形式把对应的 SocketChannel转交给Processor解决线程。
1.2Processor对象次要性能
(1)当有新的SocketChannel对象进来的时候,注册其上的OP_READ事件以便接管客户端的申请。
(2)从RequestChannel中的响应队列获取对应客户端的申请,而后产生OP_WRITE事件。
(3)监听selector上的事件。如果是读事件,阐明有新的request到来,须要转移给 RequestChannel的申请队列;如果是写事件,阐明之前的request曾经处理完毕,须要从 RequestChannel的响应队列获取响应并发送回客户端;如果是敞开事件,阐明客户端曾经敞开了 该Socket连贯,此时服务端也应该开释相干资源。
1.3RequestChannel
实质上就是为理解耦SocketServer和KafkaApis两个模块,外部蕴含Request的阻塞队列和Response的阻塞队列。
注:SocketServer为了避免闲暇连贯大量存在,采纳了LRU算法,即最近起码应用算法,会将长时间没有交互的SocketChannel对象敞开,及时开释资源。因而Processor仅仅是起到了接管Request,发送Response的作用,其解决Request的具体业务逻辑是由KafkaApis层负责的,并且两者之间是通过RequestChannel互相分割起来的。
总结可得,SocketServer负责上面三个方面:
(1)建设Socket,放弃和客户端的通信;
(2)转发客户端的Request;
(3)返回Response给客户端。最初通过RequestChannel与其余模块解耦。
2.KafkaRequestHandlerPool
KafkaRequestHandlerPool实质上就是一个线程池,外面蕴含了num.io.threads 个IO解决线程,默认 为8个。KafkaRequestHandlerPool在外部启动了若干个KafkaRequestHandler解决线程,并将RequestChannel对象和KafkaApis对象传递给了KafkaRequestHandler解决线程,因为KafkaRequestHandler须要从前者的requestQueue中取出Request,并且利用后者来实现具体的业务逻辑。
3.KafkaApis
KafkaApis负责具体的业务逻辑,它次要和Producer、Consumer、Broker Server交互。 KafkaApis次要依赖以下四个组件来实现具体的业务逻辑:
LogManager提供针对Kafka的topic日志的读取和写入性能。
ReplicaManager提供针对topic分区正本数据的同步性能。
OffsetManager提供针对提交至Kafka偏移量的治理性能。
KafkaSchedule为其余模块提供定时的调度和治理性能。
3.1LogManager
LogManager负责提供Broker Server上topic的分区数据读取和写入性能,负责读取和写入位于Broker Server上的所有分区正本数据;如果Partition有多个Replica,则每个Broker Server不会存在雷同Partition的Replica;如果存在的话,一旦遇到Broker Server下线,则会立即失落Partition的多份正本,失去 了肯定的可靠性。
Topic、Partition和Replica三者之间的关联关系:
3.2ReplicaManager
ReplicaManager负责提供针对topic的分区正本数据的同步性能,须要针对不同的变动做出及时响应,例如Partition的Replicas发送Leader切换时,Partition的Replicas所在的Broker Server离线的时候,Partition的Replicas产生Follower同步Leader数据异样的时候,等等。
分区两个名词:AR和ISR
AR是Assign Replicas的缩写,代表曾经调配给Partition的正本。
ISR是In-Sync Replicas的缩写,代表处于同步状态的正本。
并不是所有的AR都是ISR,尤其是当Broker Server离线的时候会导致对应TopicAndPartition的Replica没有及时同步Leader状态的Replica,从而该Replica不是ISR。
a.ReplicaManager是如何实现Replica数据的同步?
次要利用ReplicaFetcherThread(正本数据拉取线程)和Height Watermark Mechanism(高水位线机制)来实现数据的同步治理。
b.什么是高水位?
实质上代表的是ISR中的所有replicas的last commited message的最小起始偏移量,即在这偏移之前的数据都被ISR所有的replicas所接管,然而在这偏移之后的数据被ISR中的局部replicas所接管。
其中RecoverPoint代表的是recover-point-offset-checkpoint文件中记录的偏移量,LogEndOffset代表的是以后TopicAndPartition的replica所接管到音讯的最大偏移量,HeightWatermark代表的是曾经同步给所有ISR的最小偏移量。Replica的HeightWatermark产生更新在以下两种状况:
(1)Leader状态的Replica接管到其余Follower状态的Replica的FetchRequest申请时,会选择性得更新HeightWatermark。
(2)Follower状态的Replica接管到来自Leader状态的Replica的FetchResponse时,会选择性更新HeightWatermark,即ReplicaFetcherThread外部的processPartitionData流程。
4.OffsetManager
4.1Kafka提供两种保留Consumer偏移量的办法:
(1)将偏移量保留到Zookeeper中。
(2)将偏移量保留至Kafka外部一个名为_consumer_offsets的Topic外面。
将偏移量保留至Zookeeper中是kafka始终就反对的,然而思考到zookeeper并不太适宜大批量的频繁写入操作,大数据培训因而kafka开始反对将Consumer的偏移量保留再Kafka外部的topic中,即_consumer_offsets Topic。当用户配置offsets.storage=kafka时,高级消费者会将偏移量保留至Topic外面,同时通过OffsetManager提供对这些偏移量的治理。
4.2 OffsetManager次要性能
缓存最新的偏移量。
提供对偏移量的查问。
Compact,保留最新的偏移量,以此来管制Topic日志的大小。
Kafka如何将Consumer Group 产生的偏移量信息保留在_consumer_offsets的不同分区?
实质是通过计算不同Consumer Group的hash值和_consumer_offsets的分区数的模数,其后果作为指定分区的索引。
5.KafkaScheduler
KafkaScheduler为其余模块提供定时工作的调度和治理,例如LogManager外部的cleanupLogs定时工作,flushDirtyLogs定时工作和checkpointRecoverPointOffsets定时工作;ReplicaManager模块外部的maybeShrinkIsr定时工作;OffsetManager外部的offsets-cache-compactor定时工作等等。KafkaScheduler外部是基于ScheduledThreadPoolExecutor实现的,对外封装了任务调度的接口schedule,线程个数由参数background.threads决定,默认值为10。
6.KafkaHealthcheck
KafkaHealthcheck次要提供Broker Server衰弱状态的上报。Broker Server衰弱状态实质上就是指Broker Server是否在线,如果Broker Server在线,阐明处于衰弱状态,如果Broker Server离线,阐明处于死亡状态。
Broker Server如何上报衰弱状态?
BrokerChangeListener通过监听目录为/brokers/ids的zookeeper门路,当产生有数据变动时,则获取当前目录下的数据,从而获取以后集群的在线Broker Server列表。而KafkaHealthcheck正是提供了在目录为/brokers/ids的Zookeeper门路上注册节点的能力,该节点所在门路为EphemeralPath(非永恒门路),当Broker Server因为异常情况导致下线时,此EphemeralPath随着Broker Server和zookeeper链接的断开而隐没。
7.TopicConfigManager
kafka提供对topic配置参数的在线批改能力,批改实现之后无需重新启动kafka集群,在线失效。Topic配置参数包含:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;
Topic的配置参数位于门路为/config/topics/[topic]的zookeeper上,Broker Server外部为了防止针对每个Topic都在相干门路上建设监听器,对外提供了一个被告诉的门路,其位于/brokers/config_changes,如果检测到该门路 上发生变化,则读取该门路上的数据,获取配置文件待更新的Topic,而后再从/config/topics/[topic]上加载最新的配置文件。