关于flink:Flink的八种分区策略源码解读

31次阅读

共计 9098 个字符,预计需要花费 23 分钟才能阅读完成。

Flink 蕴含 8 中分区策略,这 8 中分区策略 (分区器) 别离如上面所示,本文将从源码的角度一一解读每个分区器的实现形式。

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

继承关系图

接口

名称

ChannelSelector

实现

public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * 初始化 channels 数量,channel 能够了解为上游 Operator 的某个实例(并行算子的某个 subtask).
     */
    void setup(int numberOfChannels);

    /**
     * 依据以后的 record 以及 Channel 总数,* 决定应将 record 发送到上游哪个 Channel。* 不同的分区策略会实现不同的该办法。*/
    int selectChannel(T record);

    /**
    * 是否以播送的模式发送到上游所有的算子实例
     */
    boolean isBroadcast();}

抽象类

名称

StreamPartitioner

实现

public abstract class StreamPartitioner<T> implements
        ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
    private static final long serialVersionUID = 1L;

    protected int numberOfChannels;

    @Override
    public void setup(int numberOfChannels) {this.numberOfChannels = numberOfChannels;}

    @Override
    public boolean isBroadcast() {return false;}

    public abstract StreamPartitioner<T> copy();}

继承关系图

GlobalPartitioner

简介

该分区器会将所有的数据都发送到上游的某个算子实例(subtask id = 0)

源码解读

/**
 * 发送所有的数据到上游算子的第一个 task(ID = 0)
 * @param <T>
 */
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        // 只返回 0,即只发送给上游算子的第一个 task
        return 0;
    }

    @Override
    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "GLOBAL";}
}

图解

ShufflePartitioner

简介

随机抉择一个上游算子实例进行发送

源码解读

/**
 * 随机的抉择一个 channel 进行发送
 * @param <T>
 */
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {// 产生 [0,numberOfChannels) 伪随机数,随机发送到上游的某个 task
        return random.nextInt(numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {return new ShufflePartitioner<T>();
    }

    @Override
    public String toString() {return "SHUFFLE";}
}

图解

BroadcastPartitioner

简介

发送到上游所有的算子实例

源码解读

/**
 * 发送到所有的 channel
 */
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    /**
     * Broadcast 模式是间接发送到上游的所有 task,所以不须要通过上面的办法抉择发送的通道
     */
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {return true;}

    @Override
    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "BROADCAST";}
}

图解

RebalancePartitioner

简介

通过循环的形式顺次发送到上游的 task

源码解读

/**
 * 通过循环的形式顺次发送到上游的 task
 * @param <T>
 */
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {super.setup(numberOfChannels);
        // 初始化 channel 的 id,返回 [0,numberOfChannels) 的伪随机数
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {// 循环顺次发送到上游的 task,比方:nextChannelToSendTo 初始值为 0,numberOfChannels(上游算子的实例个数,并行度)值为 2
        // 则第一次发送到 ID = 1 的 task,第二次发送到 ID = 0 的 task,第三次发送到 ID = 1 的 task 上... 顺次类推
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "REBALANCE";}
}

图解

RescalePartitioner

简介

基于上下游 Operator 的并行度,将记录以循环的形式输入到上游 Operator 的每个实例。
举例: 上游并行度是 2,上游是 4,则上游一个并行度以循环的形式将记录输入到上游的两个并行度上; 上游另一个并行度以循环的形式将记录输入到上游另两个并行度上。
若上游并行度是 4,上游并行度是 2,则上游两个并行度将记录输入到上游一个并行度上;上游另两个并行度将记录输入到上游另一个并行度上。

源码解读

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "RESCALE";}
}

图解

尖叫提醒

Flink 中的执行图能够分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

StreamGraph:是依据用户通过 Stream API 编写的代码生成的最后的图。用来示意程序的拓扑构造。

JobGraph:StreamGraph 通过优化后生成了 JobGraph,提交给 JobManager 的数据结构。次要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样能够缩小数据在节点之间流动所须要的序列化 / 反序列化 / 传输耗费。

ExecutionGraph:JobManager 依据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最外围的数据结构。

物理执行图:JobManager 依据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后造成的“图”,并不是一个具体的数据结构。

而 StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitioner 和 RescalePartitioner 列为 POINTWISE 分配模式,其余的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,

               // 上游算子 (生产端) 的实例 (subtask) 连贯上游算子 (生产端) 的一个或者多个实例(subtask)
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子 (生产端) 的实例 (subtask) 连贯上游算子 (生产端) 的所有实例(subtask)
                DistributionPattern.ALL_TO_ALL,
                resultPartitionType);
        }

ForwardPartitioner

简介

发送到上游对应的第一个 task,保障上下游算子并行度统一,即上有算子与上游算子是 1:1 的关系

源码解读

/**
 * 发送到上游对应的第一个 task
 * @param <T>
 */
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}

    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "FORWARD";}
}

图解

尖叫提醒

在上下游的算子没有指定分区器的状况下,如果上下游的算子并行度统一,则应用 ForwardPartitioner,否则应用 RebalancePartitioner,对于 ForwardPartitioner,必须保障上下游算子并行度统一,否则会抛出异样

// 在上下游的算子没有指定分区器的状况下,如果上下游的算子并行度统一,则应用 ForwardPartitioner,否则应用 RebalancePartitioner
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();
            }

            if (partitioner instanceof ForwardPartitioner) {
                // 如果上下游的并行度不统一,会抛出异样
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow" +
                        "change of parallelism. Upstream operation:" + upstreamNode + "parallelism:" + upstreamNode.getParallelism() +
                        ", downstream operation:" + downstreamNode + "parallelism:" + downstreamNode.getParallelism() +
                        "You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

KeyGroupStreamPartitioner

简介

依据 key 的分组索引抉择发送到绝对应的上游 subtask

源码解读

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/**
 * 依据 key 的分组索引抉择发送到绝对应的上游 subtask
 * @param <T>
 * @param <K>
 */
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {throw new RuntimeException("Could not extract key from" + record.getInstance().getValue(), e);
        }
        // 调用 KeyGroupRangeAssignment 类的 assignKeyToParallelOperator 办法, 代码如下所示
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
...

    /**
     * 依据 key 调配一个并行算子实例的索引,该索引即为该 key 要发送的上游算子实例的路由信息,* 即该 key 发送到哪一个 task
     */
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     * 依据 key 调配一个分组 id(keyGroupId)
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");
        // 获取 key 的 hashcode
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
     
    /**
     * 依据 key 调配一个分组 id(keyGroupId),
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {

        // 与 maxParallelism 取余,获取 keyGroupId
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    // 计算分区 index,即该 key group 应该发送到上游的哪一个算子实例
    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}
...

图解

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-uAnrg1Pe-1585574080128)(F:npmmywebsitesource_postsFlink 的八种分区策略源码解读 key.png)]

CustomPartitionerWrapper

简介

通过 Partitioner 实例的 partition 办法 (自定义的) 将记录输入到上游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;

    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {throw new RuntimeException("Could not extract key from" + record.getInstance(), e);
        }
// 实现 Partitioner 接口,重写 partition 办法
        return partitioner.partition(key, numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {return this;}

    @Override
    public String toString() {return "CUSTOM";}
}

比方:

public class CustomPartitioner implements Partitioner<String> {
      // key: 依据 key 的值来分区
      // numPartitions: 上游算子并行度
      @Override
      public int partition(String key, int numPartitions) {return key.length() % numPartitions;// 在此处定义分区策略
      }
  }

小结

本文次要从源码层面对 Flink 的 8 中分区策略进行了一一剖析,并对每一种分区策略给出了绝对应的图示,不便疾速了解源码。如果你感觉本文对你有用,能够关注我的公众号,理解更多精彩内容。微信搜寻 大数据技术与数仓

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

正文完
 0