download:破解JavaScript高级玩法

解决问题后的好奇

如果每个目录都会产生一个Bucket,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题,社区的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:

public void commitUpToCheckpoint(final long checkpointId) throws IOException {        final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =                activeBuckets.entrySet().iterator();        LOG.info(                "Subtask {} received completion notification for checkpoint with id={}.",                subtaskIndex,                checkpointId);        while (activeBucketIt.hasNext()) {            final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();            bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);            if (!bucket.isActive()) {                // We've dealt with all the pending files and the writer for this bucket is not                // currently open.                // Therefore this bucket is currently inactive and we can remove it from our state.                activeBucketIt.remove();                notifyBucketInactive(bucket);            }        }    }

Checkpoint后的提交时,这里会依据Bucket是否处于沉闷状态来决定是否移除在内存中保护的数据结构。