Kafka 动静配置实现
Kafka 的动静配置基于 Zookeeper 实现,本文次要梳理了 Kafka(version:2.8)中动静配置的实现逻辑。
背景信息
在 Kafka 中,Zookeeper 客户端没有应用常见的客户端工具(如:Curator),而是间接基于原生的客户端实现了本人的 KafkaZkClient,将一些通用或特有的 Zookeeper 操作封装在内。因而,对于 Zookeeper 的应用及回调等逻辑也齐全是独立实现的。另外,因为 Zookeeper 中一个节点下的 Watcher 程序触发,如果同一个节点下有大量的 Watcher,将会产生性能瓶颈。上面将基于这些背景信息来介绍 Kafka 是如何基于 Zookeeper 实现高效的动静配置管理的。
Kafka 动静配置 Zookeeper 目录构造
在以后版本的 Kafka 中,动静配置类型有 5 种:topic, client, user, broker, ip。Kafka 动静配置的目录构造为:/config/entityType/entityName
,entityType 代表配置的类型,entityName 代表具体某个实体,比方:Topic 类型对应的实体就是具体某个 Topic,某个配置类型下所有实体的默认配置的 entityName 为 <deafult>。具体来说,所有 Topic 的默认动静配置会放在 /config/topic/<default>
的节点信息中,Topic AAA 的所有动静配置会放在 /config/topic/AAA
的节点信息中。
Listener 实现
上面介绍 Kafka 中 Zookeeper Listener 的实现。既然是基于 Zookeeper 实现,必然少不了 Zookeeper 的 Watcher 告诉机制,然而,如背景信息中所说,在 Watcher 数量过多的状况下,会存在性能瓶颈。以 Topic 配置变更为例,在生产环境中,一个 Topic 的 Partition 数量可能多达上千,如果每个 Partition Leader 都去监听这个 Topic 配置信息,那么在一个 Kafka 集群内,仅监听 Topic 配置的 Watcher 就会有上万个甚至更多。Kafka 通过独立的告诉机制来防止了这一问题,即:每次 AdminClient 进行配置变更时,会在 /config/changes/
目录下创立以 config_change_
为前缀的程序节点,Wather 只监听 /config/changes/
目录的孩子节点变动,所以对于动静配置来说,所有 Broker 只监听 /config/changes/
这一个目录,大大减少集群整体的 Watcher 数量。
Kafka 中动静配置的 Zookeeper Listener 的实现在 ZkNodeChangeNotificationListener
类中,该类监听指定目录下的程序节点增加动作,在收到子节点变动告诉后,ZkNodeChangeNotificationListener
一方面执行告诉动作,告诉对应的 Handler 解决配置变更,另一面会革除所有曾经解决过的配置变更。
上面对 ZkNodeChangeNotificationListener
类的实现进行介绍,次要分为以下几个局部:
a. 初始化:注册 zk 连贯状态变更的 Handler 和 zk 子节点变更的 Handler;调用一次 `addChangeNotification()` 触发一次配置变更的解决,用来初始化动静配置;启动用来解决配置变更事件的线程 `ChangeEventProcessThread`。b. 配置变更解决:每次 zk 状态变更或者动静配置变更都会向 queue 中放入一个处理事件,与此同时,`ChangeEventProcessThread` 会继续一直的从 queue 中取出事件,执行对应的解决动作,即:`processNotifications()`。c. 革除过期告诉:每次执行完 `processNotifications()`,都会调用 `purgeObsoleteNotifications` 执行过期告诉的清理动作,删除所有进行本次 `processNotifications()` 之前创立的所有变更告诉。
class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val queue = new LinkedBlockingQueue[ChangeNotification]
private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
private val isClosed = new AtomicBoolean(false)
def init(): Unit = {// ZkStateChangeHandler 和 ChangeNotificationHandler 都是 addChangeNotification()
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
addChangeNotification()
thread.start()}
def close() = {···}
/**
* Process notifications
*/
private def processNotifications(): Unit = {
try {val notifications = zkClient.getChildren(seqNodeRoot).sorted
if (notifications.nonEmpty) {info(s"Processing notification(s) to $seqNodeRoot")
val now = time.milliseconds
for (notification <- notifications) {val changeId = changeNumber(notification)
// 只解决更新的变更信息
if (changeId > lastExecutedChange) {// 调用 notificationHandler.processNotification() 进行配置变更的解决
processNotification(notification)
lastExecutedChange = changeId
}
}
purgeObsoleteNotifications(now, notifications)
}
} catch {···}
}
···
···
private def addChangeNotification(): Unit = {if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification)
}
class ChangeNotification {def process(): Unit = processNotifications()}
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]): Unit = {for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
val (data, stat) = zkClient.getDataAndStat(notificationNode)
if (data.isDefined) {if (now - stat.getCtime > changeExpirationMs) {debug(s"Purging change notification $notificationNode")
zkClient.deletePath(notificationNode)
}
}
}
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {override def doWork(): Unit = queue.take().process()
}
···
···
}
Handler 实现
下面介绍了配置变更告诉是如何接管的,理论的解决在 NotificationHandler.processNotification()
中进行,对于动静配置来说,NotificationHandler
接口的实现是类 ConfigChangedNotificationHandler
, ConfigChangedNotificationHandler
的 processNotification
会依据配置变更告诉版本对配置变更告诉内容进行解析,而后调用对应的类型的 ConfigHandler
进行配置更新。
- 在以后版本中的配置变更告诉分为 version1 和 version2 两个版本,不同版本格局不同,以某个 topic 下的某个 clientId 的动静配置为例,version1 的内容为
{"version" : 1, "entity_type":"topic/client", "entity_name" : "<topic_name>/<client_id>"}
, version2 的内容为{"version" : 2, "entity_path":"topic/<topic_name>/client/<client_id>"}
- 所有动静配置有默认的 entity_name:
<default>
,当 entity_name 为<default>
,示意所有 entity 的默认配置,例如:/config/topics/<default>
中的配置示意所有 topic 的默认动静配置。
上面对 ConfigHandler
的实现类进行简略介绍:
- TopicConfigHandler:
次要解决 3 类配置:
a.LogManager
中治理的 topic 配置
b. 正本限流配置
c. controller 中动静开关配置"unclean.leader.election.enable"
- ClientIdConfigHandler 和 UserConfigHandler:
都继承自QuotaConfigHandler
,用来更新客户端侧的限流配置,ClientIdConfigHandler
负责 client id 维度的限流配置更新,UserConfigHandler
用来负责用户维度的限流配置更新 - IpConfigHandler:
负责连贯维度ConnectionQuotas
的限流配置更新 - BrokerConfigHandler:
一方面负责 broker 相干的 quota 配置,另一方面负责 broker 动静配置的更新。broker 的动静配置逻辑在类DynamicBrokerConfig
中实现,次要逻辑是依据以下优先级程序进行 broker 配置的更新和笼罩:
a. DYNAMIC_BROKER_CONFIG:存储在 ZK 中的/configs/brokers/{brokerId}
b. DYNAMIC_DEFAULT_BROKER_CONFIG:存储在 ZK 中的/configs/brokers/<default>
c. STATIC_BROKER_CONFIG:broker 启动配置,通常来自 server.properties 文件
d. DEFAULT_CONFIG:KafkaConfig
中硬编码的默认配置
其它补充
在 broker 初始化 partition 的过程中,该 topic 的配置可能会发生变化,为了防止漏掉这部分配置的更新,会在 createLog
过程中记录配置变更的状况,在 createLog
完结后处理这部分配置的更新,具体能够参考:https://issues.apache.org/jir…