1.SocketServer
SocketServer 作为 Broker 对外提供 Socket 服务的模块,次要用于接管 socket 连贯的申请,而后产生相应为之服务的 SocketChannel 对象。
外部次要包含三个模块:
Acceptor 次要用于监听 Socket 连贯;
Processor 次要用于转发 Socket 的申请和响应。
RequestChannel 次要用于缓存 Socket 的申请和响应。
1.1Acceptor 对象次要性能
(1)开启 socket 服务
(2)注册 Accept 事件
(3)监听此 ServerChannel 上的 ACCEPT 事件,当其产生时,将其以轮询的形式把对应的 SocketChannel 转交给 Processor 解决线程。
1.2Processor 对象次要性能
(1)当有新的 SocketChannel 对象进来的时候,注册其上的 OP_READ 事件以便接管客户端的申请。
(2)从 RequestChannel 中的响应队列获取对应客户端的申请,而后产生 OP_WRITE 事件。
(3)监听 selector 上的事件。如果是读事件,阐明有新的 request 到来,须要转移给 RequestChannel 的申请队列;如果是写事件,阐明之前的 request 曾经处理完毕,须要从 RequestChannel 的响应队列获取响应并发送回客户端;如果是敞开事件,阐明客户端曾经敞开了 该 Socket 连贯,此时服务端也应该开释相干资源。
1.3RequestChannel
实质上就是为理解耦 SocketServer 和 KafkaApis 两个模块,外部蕴含 Request 的阻塞队列和 Response 的阻塞队列。
注:SocketServer 为了避免闲暇连贯大量存在,采纳了 LRU 算法,即最近起码应用算法,会将长时间没有交互的 SocketChannel 对象敞开,及时开释资源。因而 Processor 仅仅是起到了接管 Request,发送 Response 的作用,其解决 Request 的具体业务逻辑是由 KafkaApis 层负责的,并且两者之间是通过 RequestChannel 互相分割起来的。
总结可得,SocketServer 负责上面三个方面:
(1)建设 Socket,放弃和客户端的通信;
(2)转发客户端的 Request;
(3)返回 Response 给客户端。最初通过 RequestChannel 与其余模块解耦。
2.KafkaRequestHandlerPool
KafkaRequestHandlerPool 实质上就是一个线程池,外面蕴含了 num.io.threads 个 IO 解决线程,默认 为 8 个。KafkaRequestHandlerPool 在外部启动了若干个 KafkaRequestHandler 解决线程,并将 RequestChannel 对象和 KafkaApis 对象传递给了 KafkaRequestHandler 解决线程,因为 KafkaRequestHandler 须要从前者的 requestQueue 中取出 Request,并且利用后者来实现具体的业务逻辑。
3.KafkaApis
KafkaApis 负责具体的业务逻辑,它次要和 Producer、Consumer、Broker Server 交互。KafkaApis 次要依赖以下四个组件来实现具体的业务逻辑:
LogManager 提供针对 Kafka 的 topic 日志的读取和写入性能。
ReplicaManager 提供针对 topic 分区正本数据的同步性能。
OffsetManager 提供针对提交至 Kafka 偏移量的治理性能。
KafkaSchedule 为其余模块提供定时的调度和治理性能。
3.1LogManager
LogManager 负责提供 Broker Server 上 topic 的分区数据读取和写入性能,负责读取和写入位于 Broker Server 上的所有分区正本数据;如果 Partition 有多个 Replica,则每个 Broker Server 不会存在雷同 Partition 的 Replica;如果存在的话,一旦遇到 Broker Server 下线,则会立即失落 Partition 的多份正本,失去 了肯定的可靠性。
Topic、Partition 和 Replica 三者之间的关联关系:
3.2ReplicaManager
ReplicaManager 负责提供针对 topic 的分区正本数据的同步性能,须要针对不同的变动做出及时响应,例如 Partition 的 Replicas 发送 Leader 切换时,Partition 的 Replicas 所在的 Broker Server 离线的时候,Partition 的 Replicas 产生 Follower 同步 Leader 数据异样的时候,等等。
分区两个名词:AR 和 ISR
AR 是 Assign Replicas 的缩写,代表曾经调配给 Partition 的正本。
ISR 是 In-Sync Replicas 的缩写,代表处于同步状态的正本。
并不是所有的 AR 都是 ISR,尤其是当 Broker Server 离线的时候会导致对应 TopicAndPartition 的 Replica 没有及时同步 Leader 状态的 Replica,从而该 Replica 不是 ISR。
a.ReplicaManager 是如何实现 Replica 数据的同步?
次要利用 ReplicaFetcherThread(正本数据拉取线程)和 Height Watermark Mechanism(高水位线机制)来实现数据的同步治理。
b. 什么是高水位?
实质上代表的是 ISR 中的所有 replicas 的 last commited message 的最小起始偏移量,即在这偏移之前的数据都被 ISR 所有的 replicas 所接管,然而在这偏移之后的数据被 ISR 中的局部 replicas 所接管。
其中 RecoverPoint 代表的是 recover-point-offset-checkpoint 文件中记录的偏移量,LogEndOffset 代表的是以后 TopicAndPartition 的 replica 所接管到音讯的最大偏移量,HeightWatermark 代表的是曾经同步给所有 ISR 的最小偏移量。Replica 的 HeightWatermark 产生更新在以下两种状况:
(1)Leader 状态的 Replica 接管到其余 Follower 状态的 Replica 的 FetchRequest 申请时,会选择性得更新 HeightWatermark。
(2)Follower 状态的 Replica 接管到来自 Leader 状态的 Replica 的 FetchResponse 时,会选择性更新 HeightWatermark,即 ReplicaFetcherThread 外部的 processPartitionData 流程。
4.OffsetManager
4.1Kafka 提供两种保留 Consumer 偏移量的办法:
(1)将偏移量保留到 Zookeeper 中。
(2)将偏移量保留至 Kafka 外部一个名为_consumer_offsets 的 Topic 外面。
将偏移量保留至 Zookeeper 中是 kafka 始终就反对的,然而思考到 zookeeper 并不太适宜大批量的频繁写入操作,大数据培训因而 kafka 开始反对将 Consumer 的偏移量保留再 Kafka 外部的 topic 中,即_consumer_offsets Topic。当用户配置 offsets.storage=kafka 时,高级消费者会将偏移量保留至 Topic 外面,同时通过 OffsetManager 提供对这些偏移量的治理。
4.2 OffsetManager 次要性能
缓存最新的偏移量。
提供对偏移量的查问。
Compact,保留最新的偏移量,以此来管制 Topic 日志的大小。
Kafka 如何将 Consumer Group 产生的偏移量信息保留在_consumer_offsets 的不同分区?
实质是通过计算不同 Consumer Group 的 hash 值和_consumer_offsets 的分区数的模数,其后果作为指定分区的索引。
5.KafkaScheduler
KafkaScheduler 为其余模块提供定时工作的调度和治理,例如 LogManager 外部的 cleanupLogs 定时工作,flushDirtyLogs 定时工作和 checkpointRecoverPointOffsets 定时工作;ReplicaManager 模块外部的 maybeShrinkIsr 定时工作;OffsetManager 外部的 offsets-cache-compactor 定时工作等等。KafkaScheduler 外部是基于 ScheduledThreadPoolExecutor 实现的,对外封装了任务调度的接口 schedule,线程个数由参数 background.threads 决定,默认值为 10。
6.KafkaHealthcheck
KafkaHealthcheck 次要提供 Broker Server 衰弱状态的上报。Broker Server 衰弱状态实质上就是指 Broker Server 是否在线,如果 Broker Server 在线,阐明处于衰弱状态,如果 Broker Server 离线,阐明处于死亡状态。
Broker Server 如何上报衰弱状态?
BrokerChangeListener 通过监听目录为 /brokers/ids 的 zookeeper 门路,当产生有数据变动时,则获取当前目录下的数据,从而获取以后集群的在线 Broker Server 列表。而 KafkaHealthcheck 正是提供了在目录为 /brokers/ids 的 Zookeeper 门路上注册节点的能力,该节点所在门路为 EphemeralPath(非永恒门路), 当 Broker Server 因为异常情况导致下线时,此 EphemeralPath 随着 Broker Server 和 zookeeper 链接的断开而隐没。
7.TopicConfigManager
kafka 提供对 topic 配置参数的在线批改能力,批改实现之后无需重新启动 kafka 集群,在线失效。Topic 配置参数包含:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;
Topic 的配置参数位于门路为 /config/topics/[topic]的 zookeeper 上,Broker Server 外部为了防止针对每个 Topic 都在相干门路上建设监听器,对外提供了一个被告诉的门路,其位于 /brokers/config_changes,如果检测到该门路 上发生变化,则读取该门路上的数据,获取配置文件待更新的 Topic,而后再从 /config/topics/[topic]上加载最新的配置文件。