简介

披露一个大数据处理技术。

现在咱们构建很多的数据流水线(data pipeline)来把数据从一处挪动到另一处,并且能够在其中做一些转换解决。数据的挪动是有老本的,对此老本的优化能够为数据流水线的拥有者带来老本效益。

数据流水线个别至多蕴含一个Source组件和一个Sink组件,有时在Source和Sink两头还有一或多个顺次执行的两头计算组件(Flume称之为Channel,Flink称之为Transform),这些两头计算组件个别被建模为函数式“算子”(Operator)。


有一些算子能缩小传给下一个组件的数据量,例如过滤器(Filter)算子,这就是老本优化的要害。在分布式数据库畛域有一个查问优化技术叫做“谓词下推”(Predicate Push-down),咱们所做的与之相像,是把算子往上游Source的方向推。或者可称之为“算子上推”,这与“谓词下推”不矛盾,因为在分布式数据库查问的图示中,数据是从下向上流动的,Source在下,下推也就是往Source推。

实践上来说,算子越凑近Source越好,因为能缩小从Source一路传下来的数据量。然而实际上要思考算子的效率。Source可能是可伸缩性不强的资源(例如数据库),部署太多的计算在下面会使其变慢,因而不应该把低效率的算子推到Source。

咱们对算子效率的定义是:效率=抉择度/老本。算子的抉择度越高,老本越低,那么就认为其效率越高。抉择度的定义是“数据量的缩小率”:有的算子类型并不能缩小数据量(例如大多数Transformer算子),而即便Filter算子也不肯定能无效缩小数据量(若数据100%通过Filter,就没有缩小数据量)。若一个Filter只容许20%的数据通过,则它让数据缩小了5倍,它的抉择度是1/0.2=5。Filter不是惟一能缩小数据量的算子类型,咱们已知的还有Projector(通过去掉几个字段来缩小数据量)和Compressor(通过压缩来缩小数据量)。老本的定义是“计算所消耗的资源”,例如算子的执行所用的CPU工夫。基于对算子效率的监控,很容易想到能够配置一个效率阈值,只把效率高于阈值的算子推到Source。

这么做的实际效果还不够好,因为效率不是惟一的考量,Source的资源利用率也很重要。如果Source的资源不够(例如CPU使用率100%),即便是高效率算子也可能要被推到上游去,以加重Source的压力。如果Source的资源富余,那么即便是低效率算子也能够被放在上游以进步整体效率。

设计

依据以上的需要,能够设计一个算子调度框架来动静治理数据流水线中的算子散布了。我会用一些示例代码来展现相应设计。

监控

第一步是监控。

对于如下的算子API:

interface Operator<T, R> {    R apply(T t);}

作如下批改,增加一个SelectivityCalculator工厂办法,每一种算子能够提供一个适宜本人的SelectivityCalculator实例:

interface Operator<T, R> {    R apply(T t);    // 若返回null,示意此算子不反对计算抉择度,也无奈被调度    default SelectivityCalculator<T, R> selectivityCalculator() {        return null;    }}interface SelectivityCalculator<T, R> {    // 为每次调用的输入输出作记录    void record(T input, R output);    // 此办法会被定时调用,返回依据已记录数据计算失去的抉择度    double selectivity();}

这个新办法怎么在算子上实现呢?例如Filter会这么定义SelectivityCalculator:

interface Filter<T> extends Operator<T,T> {    FilterSelectivityCalculator<T> selectivityCalculator() {        return new FilterSelectivityCalculator<>();    }}class FilterSelectivityCalculator<T> implements SelectivityCalculator<T, Boolean> {    private long inputTotal;    private long outputTotal;    void record(T input, Boolean output) {      inputTotal++;      if (output == true) {        outputTotal++;      }    }      double selectivity() {      if (outputTotal == 0) {        return Double.MAX_VALUE;      }      return ((double) inputTotal) / outputTotal;    }}

能够编写这样的监控程序,记录每一种算子的执行次数和累计执行工夫:

// 某个算子的指标记录器class MetricsRecorder {    private long callCount;    private long totalNanos;    private SelectivityCalculator selectivityCalculator;      MetricsRecorder(SelectivityCalculator selectivityCalculator) {      this.selectivityCalculator = selectivityCalculator;    }    void record(long nano, Object input, Object output) {        callCount++;        totalNanos += nano;        selectivityCalculator.record(input, output);    }      // 计算以后状态的快照    MetricsSnapshot snapshot() {      return new MetricsSnapshot(callCount, totalNanos, selectivityCalculator.selectivity());    }    // getters ......}class MetricsSnapshot {    private long callCount;    private long totalNanos;    private double selectivity;    // constructor and getters ......}// 每个线程领有这么一个算子执行器,它持有各种算子的指标记录器class OperatorExecutor {    // Because each thread will have an executor instance, no need to make it thread-safe    private Map<Class<?>, MetricsRecorder> operatorMetricsRecorders = new HashMap<>();    <T, R> R execute(Operator<T, R> operator, T t) {        long startTime = System.nanoTime();        R r = operator.apply(t);        long duration = System.nanoTime() - startTime;        var recorder = operatorMetricsRecorders.computeIfAbsent(operator.getClass(), k -> new MetricsRecorder(operator.selectivityCalculator()));        recorder.record(duration, t, r);    }    // 计算以后状态的快照    Map<Class<?>, MetricsSnapshot> snapshot() {      Map<Class<?>, MetricsSnapshot> snapshotMap = new HashMap<>();      operatorMetricsRecorders.forEach((k, v) -> {        snapshotMap.put(k, v.snapshot());      });      return snapshotMap;    }      // 被动把快照写入指定的队列    void offerSnapshot(SnapshotQueue queue) {      queue.offer(snapshot());    }}

请留神这里并没有应用ConcurrentHashMap,也没有任何波及多线程同步的解决。因为大多数算子是很轻量级的,监控程序也要做到足够轻量级,不拖慢算子的性能。大家如果深刻用过Profiler就晓得,对程序性能的细粒度的Profiling可能会减慢性能,使得性能测算后果不精确。在这里,咱们的做法是不在记录数据时时做多线程同步,免得同步所致的锁定和缓存驱赶升高性能。会有多个线程来执行算子,咱们让每个线程领有一个独立的无需同步的OperatorExecutor实例。有一个专门的线程来定时告诉每个OperatorExecutor实例,令其生成一份以后状态的快照并写入指定的队列,每个实例在本人的线程里生成快照是不须要额定做同步的。这种基于队列的异步通信比拟像Go channel(Go社区的敌人很相熟这格调吧,其实Java也能够做到)。这种定时获取指标数据的格调比拟像Prometheus。

同时也定时获取CPU使用率,这个能够用JDK自带的OperatingSystemMXBean来实现,就不多讲了。

调度

第二步就是实现调度策略,有这些要点:

  1. 所谓的“挪动”算子,其实是在两处都部署了算子,在另一处启用算子,同时在原处禁用算子,如果做不到“同时”,就先启用再禁用,算子在流水线上会”至多执行一次“,须要实现幂等性。
  2. 流水线上的算子大多是有执行程序的(一步接一步),不可能只把后面的算子推到上游,却把前面的算子留在上游。这给咱们的调度带来了束缚。(对于这个能够有一些进一步的优化技术。)
  3. 调度规定最好能反对动静更新,这样有助于依据理论状况灵便调整。

尽管不是必须的,但咱们倡议部署一个专门的调度器(Scheduler)服务。它近程对立操控整个流水线的算子散布,调度规定也只须要由这个服务来集中管理,它与Source和Sink等节点通过REST API通信就能够了。例如,当Source CPU使用率太高,会引起一个性能事件(performance event),Scheduler收到此事件,依据调度规定决定把一个算子推到上游,它会先在上游节点启用此算子,再在Source节点禁用此算子。如果算子不是幂等的,咱们须要给被传输的数据加一个标记,这个标记只须要一个序数值来阐明它已达到流水线的第几步,这样上游收到它就能够间接从下一步开始。

待续……