概述
上一节讲到Cluster主要用来存放Node节点信息,TopicPartition分区信息和PartitionInfo分区详细信息,本节要讲的Metadata是将Cluster封装,提供一个统一的查询入口,此外还提供了对集群信息的更新操作。
类注释
通过阅读Metadata注释可以得出以下几点结论:
- Metadata由客户端线程(指使用KafkaProducer的线程)和后台发送方线程(指Sender线程,Producer初始化后启动的线程)共享。
- Metadata仅针对Topic的子集进行维护,可以随时间添加。使用元数据时如果没有则会触发更新操作。
- 如果Metadata启用了topic expiry(主题到期),就会在更新后删除在到期时间间隔内未使用的任何主题。
- 因为消费者严格管理Topic,所以消费者禁用了topic expiry。(这块我还没看到)
- 生产者依赖topic expiry来限制元信息数据集的刷新。
线程安全
上面讲到了Metadata会在多线程环境下使用,因此Metadata通过synchronized修饰几乎所有方法来保证线程安全。
Metadata更新api解析
Metadata主要提供两个方法来实现集群信息的更新requestUpdate,awaitUpdate
public synchronized int requestUpdate()
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException;
requestUpdate
/**
* Request an update of the current cluster metadata info, return the current version before the update
*/
public synchronized int requestUpdate() {
//将 更新位 置为true
this.needUpdate = true;
//返回当前元信息的版本
return this.version;
}
needUpdate和version是Metadata的成员属性,needUpdate用来标记集群信息是否需要更新,
version用来表示元信息的当前版本。
awaitUpdate
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
//如果等待时间小于0,抛出非法参数异常
if (maxWaitMs < 0)
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");
//当前时间
long begin = System.currentTimeMillis();
//剩余等待时间
long remainingWaitMs = maxWaitMs;
//如果当前Metadata版本小于等于前一版本并且Metadata没有关闭,那么继续循环等待
//如果时间差大于剩余等待时间则抛出超时异常
//如果Metadata实例已经关闭,抛出自定义异常
while ((this.version <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
throw ex;
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
if (isClosed())
throw new KafkaException("Requested metadata update after close");
}
KafkaProducer更新Metadata流程解析
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic);
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
// In case we already have cached metadata for the topic, but the requested partition is greater
// than expected, issue an update request only once. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
发表回复