download:破解JavaScript高级玩法
解决问题后的好奇
如果每个目录都会产生一个Bucket
,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题,社区的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:
public void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();
LOG.info(
"Subtask {} received completion notification for checkpoint with id={}.",
subtaskIndex,
checkpointId);
while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket is not
// currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
}
做Checkpoint
后的提交时,这里会依据Bucket
是否处于沉闷状态来决定是否移除在内存中保护的数据结构。
发表回复