最近在做大数据量的实时数据迁徙, 频繁应用到了keyby hash去平衡数据, 然而却发现subtask执行的数据量不是很平衡, 导致checkpoint频繁超时, 于是开始寻找解决办法.

问题背景

应用keyby进行分区, 自定义KeySelector, 进行hash%并行度来进行分区, 比方应用的并行度是8, 最初会失去分区key

0, 1, 2, 3, 4, 5, 6, 7

run起我的项目后, 关上监督后盾, 发现8个subtask中有一个task没有数据, 另一个task会有双倍的数据.

起因

具体参考官网文档

起因很简略, flink无奈预估你会有多少key, 所以会基于最大并行度(默认128)进行一个key分组, 在这个范畴内的才会调配到task中.

以下是相干代码

public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {    return MathUtils.murmurHash(keyHash) % maxParallelism;}

那么咱们对下面本人的key, 去运行这段代码, 会失去以下后果

0 861 542 273 334 45 796 197 115

咱们是8个并行, 相当于每个subtask占有16个key, 会失去以下分组:

0~15    416~31   19   2732~47   3348~63   5464~79   7980~95   8696~111  112~127 115

会发现有个分区的确取得了两个key, 而一个分区轮空.

将6换成murmurhash后在 96~111 中的key, 比方 6666(hash为106)

重启之后调配不均的问题解决.

終わり。