关于大数据:数据流水线的成本自适应算子

6次阅读

共计 4374 个字符,预计需要花费 11 分钟才能阅读完成。

简介

披露一个大数据处理技术。

现在咱们构建很多的数据流水线 (data pipeline) 来把数据从一处挪动到另一处,并且能够在其中做一些转换解决。数据的挪动是有老本的,对此老本的优化能够为数据流水线的拥有者带来老本效益。

数据流水线个别至多蕴含一个 Source 组件和一个 Sink 组件,有时在 Source 和 Sink 两头还有一或多个顺次执行的两头计算组件(Flume 称之为 Channel,Flink 称之为 Transform),这些两头计算组件个别被建模为函数式“算子”(Operator)。

有一些算子能缩小传给下一个组件的数据量,例如过滤器(Filter)算子,这就是老本优化的要害。在分布式数据库畛域有一个查问优化技术叫做“谓词下推”(Predicate Push-down),咱们所做的与之相像,是把算子往上游 Source 的方向推。或者可称之为“算子上推”,这与“谓词下推”不矛盾,因为在分布式数据库查问的图示中,数据是从下向上流动的,Source 在下,下推也就是往 Source 推。

实践上来说,算子越凑近 Source 越好,因为能缩小从 Source 一路传下来的数据量。然而实际上要思考算子的效率。Source 可能是可伸缩性不强的资源(例如数据库),部署太多的计算在下面会使其变慢,因而不应该把低效率的算子推到 Source。

咱们对算子效率的定义是:效率 = 抉择度 / 老本。算子的抉择度越高,老本越低,那么就认为其效率越高。抉择度的定义是“数据量的缩小率”:有的算子类型并不能缩小数据量(例如大多数 Transformer 算子),而即便 Filter 算子也不肯定能无效缩小数据量(若数据 100% 通过 Filter,就没有缩小数据量)。若一个 Filter 只容许 20% 的数据通过,则它让数据缩小了 5 倍,它的抉择度是 1 /0.2=5。Filter 不是惟一能缩小数据量的算子类型,咱们已知的还有 Projector(通过去掉几个字段来缩小数据量)和 Compressor(通过压缩来缩小数据量)。老本的定义是“计算所消耗的资源”,例如算子的执行所用的 CPU 工夫。基于对算子效率的监控,很容易想到能够配置一个效率阈值,只把效率高于阈值的算子推到 Source。

这么做的实际效果还不够好,因为效率不是惟一的考量,Source 的资源利用率也很重要。如果 Source 的资源不够(例如 CPU 使用率 100%),即便是高效率算子也可能要被推到上游去,以加重 Source 的压力。如果 Source 的资源富余,那么即便是低效率算子也能够被放在上游以进步整体效率。

设计

依据以上的需要,能够设计一个算子调度框架来动静治理数据流水线中的算子散布了。我会用一些示例代码来展现相应设计。

监控

第一步是监控。

对于如下的算子 API:

interface Operator<T, R> {R apply(T t);
}

作如下批改,增加一个 SelectivityCalculator 工厂办法,每一种算子能够提供一个适宜本人的 SelectivityCalculator 实例:

interface Operator<T, R> {R apply(T t);
    // 若返回 null,示意此算子不反对计算抉择度,也无奈被调度
    default SelectivityCalculator<T, R> selectivityCalculator() {return null;}
}

interface SelectivityCalculator<T, R> {
    // 为每次调用的输入输出作记录
    void record(T input, R output);
    // 此办法会被定时调用,返回依据已记录数据计算失去的抉择度
    double selectivity();}

这个新办法怎么在算子上实现呢?例如 Filter 会这么定义 SelectivityCalculator:

interface Filter<T> extends Operator<T,T> {FilterSelectivityCalculator<T> selectivityCalculator() {return new FilterSelectivityCalculator<>();
    }
}

class FilterSelectivityCalculator<T> implements SelectivityCalculator<T, Boolean> {
    private long inputTotal;
    private long outputTotal;

    void record(T input, Boolean output) {
      inputTotal++;
      if (output == true) {outputTotal++;}
    }
  
    double selectivity() {if (outputTotal == 0) {return Double.MAX_VALUE;}
      return ((double) inputTotal) / outputTotal;
    }
}

能够编写这样的监控程序,记录每一种算子的执行次数和累计执行工夫:

// 某个算子的指标记录器
class MetricsRecorder {
    private long callCount;
    private long totalNanos;
    private SelectivityCalculator selectivityCalculator;
  
    MetricsRecorder(SelectivityCalculator selectivityCalculator) {this.selectivityCalculator = selectivityCalculator;}

    void record(long nano, Object input, Object output) {
        callCount++;
        totalNanos += nano;
        selectivityCalculator.record(input, output);
    }
  
    // 计算以后状态的快照
    MetricsSnapshot snapshot() {return new MetricsSnapshot(callCount, totalNanos, selectivityCalculator.selectivity());
    }

    // getters ......
}

class MetricsSnapshot {
    private long callCount;
    private long totalNanos;
    private double selectivity;
    // constructor and getters ......
}

// 每个线程领有这么一个算子执行器,它持有各种算子的指标记录器
class OperatorExecutor {
    // Because each thread will have an executor instance, no need to make it thread-safe
    private Map<Class<?>, MetricsRecorder> operatorMetricsRecorders = new HashMap<>();

    <T, R> R execute(Operator<T, R> operator, T t) {long startTime = System.nanoTime();
        R r = operator.apply(t);
        long duration = System.nanoTime() - startTime;
        var recorder = operatorMetricsRecorders.computeIfAbsent(operator.getClass(), k -> new MetricsRecorder(operator.selectivityCalculator()));
        recorder.record(duration, t, r);
    }

    // 计算以后状态的快照
    Map<Class<?>, MetricsSnapshot> snapshot() {Map<Class<?>, MetricsSnapshot> snapshotMap = new HashMap<>();
      operatorMetricsRecorders.forEach((k, v) -> {snapshotMap.put(k, v.snapshot());
      });
      return snapshotMap;
    }
  
    // 被动把快照写入指定的队列
    void offerSnapshot(SnapshotQueue queue) {queue.offer(snapshot());
    }
}

请留神这里并没有应用 ConcurrentHashMap,也没有任何波及多线程同步的解决。因为大多数算子是很轻量级的,监控程序也要做到足够轻量级,不拖慢算子的性能。大家如果深刻用过 Profiler 就晓得,对程序性能的细粒度的 Profiling 可能会减慢性能,使得性能测算后果不精确。在这里,咱们的做法是不在记录数据时时做多线程同步,免得同步所致的锁定和缓存驱赶升高性能。会有多个线程来执行算子,咱们让每个线程领有一个独立的无需同步的 OperatorExecutor 实例。有一个专门的线程来定时告诉每个 OperatorExecutor 实例,令其生成一份以后状态的快照并写入指定的队列,每个实例在本人的线程里生成快照是不须要额定做同步的。这种基于队列的异步通信比拟像 Go channel(Go 社区的敌人很相熟这格调吧,其实 Java 也能够做到)。这种定时获取指标数据的格调比拟像 Prometheus。

同时也定时获取 CPU 使用率,这个能够用 JDK 自带的 OperatingSystemMXBean 来实现,就不多讲了。

调度

第二步就是实现调度策略,有这些要点:

  1. 所谓的“挪动”算子,其实是在两处都部署了算子,在另一处启用算子,同时在原处禁用算子,如果做不到“同时”,就先启用再禁用,算子在流水线上会”至多执行一次“,须要实现幂等性。
  2. 流水线上的算子大多是有执行程序的(一步接一步),不可能只把后面的算子推到上游,却把前面的算子留在上游。这给咱们的调度带来了束缚。(对于这个能够有一些进一步的优化技术。)
  3. 调度规定最好能反对动静更新,这样有助于依据理论状况灵便调整。

尽管不是必须的,但咱们倡议部署一个专门的调度器(Scheduler)服务。它近程对立操控整个流水线的算子散布,调度规定也只须要由这个服务来集中管理,它与 Source 和 Sink 等节点通过 REST API 通信就能够了。例如,当 Source CPU 使用率太高,会引起一个性能事件(performance event),Scheduler 收到此事件,依据调度规定决定把一个算子推到上游,它会先在上游节点启用此算子,再在 Source 节点禁用此算子。如果算子不是幂等的,咱们须要给被传输的数据加一个标记,这个标记只须要一个序数值来阐明它已达到流水线的第几步,这样上游收到它就能够间接从下一步开始。

待续……

正文完
 0