关于kafka:聊聊-KafkaProducer-Metadata-读取与更新机制

42次阅读

共计 5511 个字符,预计需要花费 14 分钟才能阅读完成。

一、前言

咱们上一篇说了 聊聊 Kafka:Producer 源码解析,这一篇咱们来说下 Producer Metadata 的读取与更新机制。上一篇从宏观上介绍了 Producer 的宏观模型,其中通过 waitOnMetadata() 办法获取 topic 的 metadata 信息这一块货色很多,所以独自拎一篇进去讲。

二、Metadata

2.1 什么是 Metadata

Metadata 是指 Kafka 集群的元数据,蕴含了 Kafka 集群的各种信息,间接看源码便可知:

public class Metadata implements Closeable {
    private final Logger log;
    // retry.backoff.ms: 默认值为 100ms,它用来设定两次重试之间的工夫距离,防止有效的频繁重试。private final long refreshBackoffMs;
    // metadata.max.age.ms: 默认值为 300000,如果在这个工夫内元数据没有更新的话会被强制更新。private final long metadataExpireMs;
    // 更新版本号,每更新胜利 1 次,version 自增 1, 次要是用于判断 metadata 是否更新
    private int updateVersion;
    // 申请版本号,每发送一次申请,version 自增 1
    private int requestVersion;
    // 上一次更新的工夫(蕴含更新失败)private long lastRefreshMs;
    // 上一次更新胜利的工夫
    private long lastSuccessfulRefreshMs;
    private KafkaException fatalException;
    // 非法的 topics
    private Set<String> invalidTopics;
    // 未认证的 topics
    private Set<String> unauthorizedTopics;
    // 元数据信息的 Cache 缓存
    private MetadataCache cache = MetadataCache.empty();
    private boolean needFullUpdate;
    private boolean needPartialUpdate;
    // 会收到 metadata updates 的 Listener 列表
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean isClosed;
    // 存储 Partition 最近一次的 leaderEpoch
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}

MetadataCache:Kafka 集群中对于 node、topic 和 partition 的信息。(是只读的)

public class MetadataCache {
    private final String clusterId;
    private final Map<Integer, Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
    private Cluster clusterInstance;
}

对于 topic 的详细信息(leader 所在节点、replica 所在节点、isr 列表)都是在 Cluster 实例中保留的。

// 保留了 Kafka 集群中局部 nodes、topics 和 partitions 的信息
public final class Cluster {
    private final boolean isBootstrapConfigured;
    // node 列表
    private final List<Node> nodes;
    // 未认证的 topics
    private final Set<String> unauthorizedTopics;
    // 非法的 topics
    private final Set<String> invalidTopics;
    // kafka 内置的 topics
    private final Set<String> internalTopics;
    private final Node controller;
    // partition 对应的信息,如:leader 所在节点、所有的正本、ISR 中的正本、offline 的正本
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // topic 和 partition 信息的对应关系
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // topic 和可用 partition(leader 不为 null)的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // node 和 partition 信息的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    // 节点 id 与节点的对应关系
    private final Map<Integer, Node> nodesById;
    // 集群信息,外面只有一个 clusterId
    private final ClusterResource clusterResource;
}
// topic-partition: 蕴含 topic、partition、leader、replicas、isr
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;}

看源码不难理解 Metadata 的次要数据结构,咱们大略总结下蕴含哪些信息:

  • 集群中有哪些节点;
  • 集群中有哪些 topic,这些 topic 有哪些 partition;
  • 每个 partition 的 leader 正本调配在哪个节点上,follower 正本调配在哪些节点上;
  • 每个 partition 的 AR 有哪些正本,ISR 有哪些正本;

2.2 Metadata 的利用场景

Metadata 在 Kafka 中十分重要,很多场景中都须要从 Metadata 中获取数据或更新数据,例如:

  • KafkaProducer 发送一条音讯到指定的 topic 中,须要晓得分区的数量,要发送的指标分区,指标分区的 leader,leader 所在的节点地址等,这些信息都要从 Metadata 中获取。
  • 当 Kafka 集群中产生了 leader 选举,节点中 partition 或正本产生了变动等,这些场景都须要更新 Metadata 中的数据。

三、Producer 的 Metadata 更新流程

Producer 在调用 doSend() 办法时,第一步就是通过 waitOnMetadata 办法获取该 topic 的 metadata 信息。


总结一下以上代码:

  • 首先会从缓存中获取 cluster 信息,并从中获取 partition 信息,如果能够取到则返回以后的 cluster 信息,如果不含有所须要的 partition 信息时就会更新 metadata;
  • 更新 metadata 的操作会在一个 do ….while 循环中进行,直到 metadata 中含有所需 partition 的信息,该循环中次要做了以下事件:

    • 调用 metadata.requestUpdateForTopic() 办法来获取 updateVersion,即上一次更新胜利时的 version,并将 needUpdate 设为 true,强制更新;
    • 调用 sender.wakeup() 办法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 申请进行操作,待会上面会具体介绍;
    • 调用 metadata.awaitUpdate(version, remainingWaitMs) 办法来期待 metadata 的更新,通过比拟以后的 updateVersion 与步骤 1 中获取的 updateVersion 来判断是否更新胜利;

3.1 org.apache.kafka.clients.NetworkClient#poll

下面提到调用 sender.wakeup() 办法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 申请进行操作。在 NetworkClient 中真正解决申请的是 NetworkClient.poll() 办法,接下来让咱们通过剖析源码来看下 NetworkClient 是如何解决申请的。


3.2 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long)

咱们来看下 metadata 是如何更新的

这里你可能会问,老周啊,最小负载节点是啥呀?

别急,咱们来看上面这张图,你就了解了。


LeastLoadedNode 指 Kafka 集群中所有 Node 中负载最小的那一个 Node,它是由每个 Node 在 InFlightRequests 中还未确定的申请数决定的,未确定的申请越少则负载越小。如上图所示,Node1 即为 LeastLoadedNode。

3.3 org.apache.kafka.clients.Metadata#updateRequested

下次更新元数据信息的工夫:以后 metadata 信息行将到期的工夫 即 timeToExpire 和 间隔容许更新 metadata 信息的工夫 即 timeToAllowUpdate 中的最大值。

timeToExpire:needUpdate 为 true,示意强制更新,此时该值为 0;否则的话,就依照定时更新工夫,即元数据信息过期工夫(默认是 300000 ms 即 5 分钟)进行周期性更新。

timeToAllowUpdate:默认就是 refreshBackoffMs 的默认值,即 100 ms。

3.4 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)

咱们持续跟一下 maybeUpdate 办法:


因而,每次 producer 申请更新 metadata 时,会有以下几种状况:

  • 通道曾经 ready,node 能够发送申请,那么就间接发送申请。
  • 如果该 node 正在建设连贯,则间接返回。
  • 如果该 node 还没建设连贯,则向 broker 初始化连贯。

而 KafkaProducer 线程始终是阻塞在两个 while 循环中的,直到 metadata 更新:

  • sender 线程第一次调用 poll,初始化与 node 的连贯。
  • sender 线程第二次调用 poll,发送 metadata 申请。
  • sender 线程第三次调用 poll,获取 metadataResponse,并更新 metadata。

3.5 接管 Server 端的响应,更新 Metadata 信息

handleCompletedReceives 是如何解决任何已实现的接管响应,如下:

之后进一步调用 handleSuccessfulResponse。

四、总结

Metadata 会在上面两种状况下进行更新:

  • 强制更新:调用 Metadata.requestUpdate() 将 needFullUpdate 置为 true 来强制更新。
  • 周期性更新:通过 Metadata 的 lastSuccessfulRefreshMs 和 metadataExpireMs 来实现,个别状况下,默认周期时间就是 metadataExpireMs,5 分钟时长。

在 NetworkClient 的 poll() 办法调用时,会去查看两种更新机制,只有达到一种,就会触发更新操作。

Metadata 的强制更新会在以下几种状况下进行:

  • initConnect 办法调用时,初始化连贯;
  • poll() 办法中对 handleDisconnections() 办法调用来解决连贯断开的状况,这时会触发强制更新;
  • poll() 办法中对 handleTimedOutRequests() 来解决申请超时时;
  • 发送音讯时,如果无奈找到 partition 的 leader;
  • 解决 Producer 响应(handleProduceResponse),如果返回对于 Metadata 过期的异样,比方:没有 topic-partition 的相干 meta 或者 client 没有权限获取其 metadata。

强制更新次要是用于解决各种异常情况。

好了,Producer Metadata 读取与更新机制就说到这,咱们下一期再见。

正文完
 0