kafka源码解析2Metadata集群元信息

5次阅读

共计 4165 个字符,预计需要花费 11 分钟才能阅读完成。

概述

上一节讲到 Cluster 主要用来存放 Node 节点信息,TopicPartition 分区信息和 PartitionInfo 分区详细信息,本节要讲的 Metadata 是将 Cluster 封装,提供一个统一的查询入口,此外还提供了对集群信息的更新操作。

类注释

通过阅读 Metadata 注释可以得出以下几点结论:

  1. Metadata 由客户端线程(指使用 KafkaProducer 的线程)和后台发送方线程(指 Sender 线程,Producer 初始化后启动的线程)共享。
  2. Metadata 仅针对 Topic 的子集进行维护,可以随时间添加。使用元数据时如果没有则会触发更新操作。
  3. 如果 Metadata 启用了 topic expiry(主题到期),就会在更新后删除在到期时间间隔内未使用的任何主题。
  4. 因为消费者严格管理 Topic,所以消费者禁用了 topic expiry。(这块我还没看到)
  5. 生产者依赖 topic expiry 来限制元信息数据集的刷新。

线程安全

上面讲到了 Metadata 会在多线程环境下使用,因此 Metadata 通过 synchronized 修饰几乎所有方法来保证线程安全。

Metadata 更新 api 解析

Metadata 主要提供两个方法来实现集群信息的更新 requestUpdate,awaitUpdate

public synchronized int requestUpdate()
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException;

requestUpdate

    /**
     * Request an update of the current cluster metadata info, return the current version before the update
     */
    public synchronized int requestUpdate() {
        // 将 更新位 置为 true
        this.needUpdate = true;
        // 返回当前元信息的版本
        return this.version;
    }

needUpdate 和 version 是 Metadata 的成员属性,needUpdate 用来标记集群信息是否需要更新,
version 用来表示元信息的当前版本。

awaitUpdate

    /**
     * Wait for metadata update until the current version is larger than the last version we know of
     */
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        // 如果等待时间小于 0,抛出非法参数异常
        if (maxWaitMs < 0)
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");
        // 当前时间
        long begin = System.currentTimeMillis();
        // 剩余等待时间
        long remainingWaitMs = maxWaitMs;
        // 如果当前 Metadata 版本小于等于前一版本并且 Metadata 没有关闭,那么继续循环等待
        // 如果时间差大于剩余等待时间则抛出超时异常
        // 如果 Metadata 实例已经关闭,抛出自定义异常
        while ((this.version <= lastVersion) && !isClosed()) {AuthenticationException ex = getAndClearAuthenticationException();
            if (ex != null)
                throw ex;
            if (remainingWaitMs != 0)
                wait(remainingWaitMs);
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after" + maxWaitMs + "ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

KafkaProducer 更新 Metadata 流程解析

    /**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param partition A specific partition expected to exist in metadata, or null if there's no preference
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The cluster containing topic metadata and the amount of time we waited in ms
     * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
     */
    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        metadata.add(topic);
        Cluster cluster = metadata.fetch();
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        // Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
        // In case we already have cached metadata for the topic, but the requested partition is greater
        // than expected, issue an update request only once. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {log.trace("Requesting metadata update for topic {}.", topic);
            metadata.add(topic);
            int version = metadata.requestUpdate();
            sender.wakeup();
            try {metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException("Failed to update metadata after" + maxWaitMs + "ms.");
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after" + maxWaitMs + "ms.");
            if (cluster.unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null);

        if (partition != null && partition >= partitionsCount) {
            throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
        }

        return new ClusterAndWaitTime(cluster, elapsed);
    }

正文完
 0