download:破解JavaScript高级玩法
解决问题后的好奇
如果每个目录都会产生一个Bucket
,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题,社区的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:
public void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt = activeBuckets.entrySet().iterator(); LOG.info( "Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId); while (activeBucketIt.hasNext()) { final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue(); bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) { // We've dealt with all the pending files and the writer for this bucket is not // currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); notifyBucketInactive(bucket); } } }
做Checkpoint
后的提交时,这里会依据Bucket
是否处于沉闷状态来决定是否移除在内存中保护的数据结构。