1. 排查思路
这个问题报到我这边的时候,有同学曾经排查过一轮了。依据网上搜寻,会告知你可能是 yarn 的压力过大、网络短暂不稳固等,能够调大 heartbeat.timeout
来缓解这个问题,经调整改问题并未解决。
另外一个说法会告知你是 GC 频繁
的起因。倡议 调整内存,调整后,确实有肯定的成果(使出问题的工夫变慢)。那很显然和代码有关系了。
因为之前一个版本同步数据都没有出问题,因而开始寻找最近代码的改变,找了几圈下来并没有找到可疑的代码。登时感觉有点头皮发麻。于是让现场的同学切换到上个版本持续做全量,景象依旧会产生。
这时我就有点狐疑生产环境的个性了——比方数据个性,但现场的同学告知我数据并没有什么非凡之处。于是我要了一份现场的 HeapDump
,丢到了剖析软件上进行查看,发现org.apache.flink.streaming.api.functions.sink.filesystem.Bucket 的对象特地多。
于是看了一下 Bucket 对象的定义:
/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
* <p>For each incoming element in the {@code StreamingFileSink}, the user-specified {@link
* BucketAssigner} is queried to see in which bucket this element should be written to.
*/
@Internal
public class Bucket<IN, BucketID> {
好家伙。一个目录一个对象,此时此刻我曾经对现场的同学告知我的“数据没有什么非凡之处
”产生了狐疑,不过为了实锤,我还是跟了一遍代码:
|-- HiveTableSink
\-- createStreamSink
|-- StreamingFileSink
\-- initializeState
|-- StreamingFileSinkHelper
\-- constructor
|-- HadoopPathBasedBulkFormatBuilder
\-- createBuckets
|-- Buckets
\-- onElement
\-- getOrCreateBucketForBucketId
过了一遍代码当前,心里便有了数。问了下现场,同步的数据时间跨度是不是特地大,现场同学确认后,时间跨度为 3 年多。于是倡议升高时间跨度,或者升高分区工夫。最终将全量批次进行切分后解决了这个问题。
2. 解决问题后的好奇
如果每个目录都会产生一个Bucket
,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题,社区的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:
public void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();
LOG.info("Subtask {} received completion notification for checkpoint with id={}.",
subtaskIndex,
checkpointId);
while (activeBucketIt.hasNext()) {final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket is not
// currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
}