共计 835 个字符,预计需要花费 3 分钟才能阅读完成。
download:破解 JavaScript 高级玩法
解决问题后的好奇
如果每个目录都会产生一个 Bucket
,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题, 社区 的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:
public void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();
LOG.info("Subtask {} received completion notification for checkpoint with id={}.",
subtaskIndex,
checkpointId);
while (activeBucketIt.hasNext()) {final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket is not
// currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
}
做 Checkpoint
后的提交时,这里会依据 Bucket
是否处于沉闷状态来决定是否移除在内存中保护的数据结构。
正文完
发表至: javascript
2022-03-14