简介
披露一个大数据处理技术。
现在咱们构建很多的数据流水线(data pipeline)来把数据从一处挪动到另一处,并且能够在其中做一些转换解决。数据的挪动是有老本的,对此老本的优化能够为数据流水线的拥有者带来老本效益。
数据流水线个别至多蕴含一个Source组件和一个Sink组件,有时在Source和Sink两头还有一或多个顺次执行的两头计算组件(Flume称之为Channel,Flink称之为Transform),这些两头计算组件个别被建模为函数式“算子”(Operator)。
有一些算子能缩小传给下一个组件的数据量,例如过滤器(Filter)算子,这就是老本优化的要害。在分布式数据库畛域有一个查问优化技术叫做“谓词下推”(Predicate Push-down),咱们所做的与之相像,是把算子往上游Source的方向推。或者可称之为“算子上推”,这与“谓词下推”不矛盾,因为在分布式数据库查问的图示中,数据是从下向上流动的,Source在下,下推也就是往Source推。
实践上来说,算子越凑近Source越好,因为能缩小从Source一路传下来的数据量。然而实际上要思考算子的效率。Source可能是可伸缩性不强的资源(例如数据库),部署太多的计算在下面会使其变慢,因而不应该把低效率的算子推到Source。
咱们对算子效率的定义是:效率=抉择度/老本。算子的抉择度越高,老本越低,那么就认为其效率越高。抉择度的定义是“数据量的缩小率”:有的算子类型并不能缩小数据量(例如大多数Transformer算子),而即便Filter算子也不肯定能无效缩小数据量(若数据100%通过Filter,就没有缩小数据量)。若一个Filter只容许20%的数据通过,则它让数据缩小了5倍,它的抉择度是1/0.2=5。Filter不是惟一能缩小数据量的算子类型,咱们已知的还有Projector(通过去掉几个字段来缩小数据量)和Compressor(通过压缩来缩小数据量)。老本的定义是“计算所消耗的资源”,例如算子的执行所用的CPU工夫。基于对算子效率的监控,很容易想到能够配置一个效率阈值,只把效率高于阈值的算子推到Source。
这么做的实际效果还不够好,因为效率不是惟一的考量,Source的资源利用率也很重要。如果Source的资源不够(例如CPU使用率100%),即便是高效率算子也可能要被推到上游去,以加重Source的压力。如果Source的资源富余,那么即便是低效率算子也能够被放在上游以进步整体效率。
设计
依据以上的需要,能够设计一个算子调度框架来动静治理数据流水线中的算子散布了。我会用一些示例代码来展现相应设计。
监控
第一步是监控。
对于如下的算子API:
interface Operator<T, R> { R apply(T t);}
作如下批改,增加一个SelectivityCalculator工厂办法,每一种算子能够提供一个适宜本人的SelectivityCalculator实例:
interface Operator<T, R> { R apply(T t); // 若返回null,示意此算子不反对计算抉择度,也无奈被调度 default SelectivityCalculator<T, R> selectivityCalculator() { return null; }}interface SelectivityCalculator<T, R> { // 为每次调用的输入输出作记录 void record(T input, R output); // 此办法会被定时调用,返回依据已记录数据计算失去的抉择度 double selectivity();}
这个新办法怎么在算子上实现呢?例如Filter会这么定义SelectivityCalculator:
interface Filter<T> extends Operator<T,T> { FilterSelectivityCalculator<T> selectivityCalculator() { return new FilterSelectivityCalculator<>(); }}class FilterSelectivityCalculator<T> implements SelectivityCalculator<T, Boolean> { private long inputTotal; private long outputTotal; void record(T input, Boolean output) { inputTotal++; if (output == true) { outputTotal++; } } double selectivity() { if (outputTotal == 0) { return Double.MAX_VALUE; } return ((double) inputTotal) / outputTotal; }}
能够编写这样的监控程序,记录每一种算子的执行次数和累计执行工夫:
// 某个算子的指标记录器class MetricsRecorder { private long callCount; private long totalNanos; private SelectivityCalculator selectivityCalculator; MetricsRecorder(SelectivityCalculator selectivityCalculator) { this.selectivityCalculator = selectivityCalculator; } void record(long nano, Object input, Object output) { callCount++; totalNanos += nano; selectivityCalculator.record(input, output); } // 计算以后状态的快照 MetricsSnapshot snapshot() { return new MetricsSnapshot(callCount, totalNanos, selectivityCalculator.selectivity()); } // getters ......}class MetricsSnapshot { private long callCount; private long totalNanos; private double selectivity; // constructor and getters ......}// 每个线程领有这么一个算子执行器,它持有各种算子的指标记录器class OperatorExecutor { // Because each thread will have an executor instance, no need to make it thread-safe private Map<Class<?>, MetricsRecorder> operatorMetricsRecorders = new HashMap<>(); <T, R> R execute(Operator<T, R> operator, T t) { long startTime = System.nanoTime(); R r = operator.apply(t); long duration = System.nanoTime() - startTime; var recorder = operatorMetricsRecorders.computeIfAbsent(operator.getClass(), k -> new MetricsRecorder(operator.selectivityCalculator())); recorder.record(duration, t, r); } // 计算以后状态的快照 Map<Class<?>, MetricsSnapshot> snapshot() { Map<Class<?>, MetricsSnapshot> snapshotMap = new HashMap<>(); operatorMetricsRecorders.forEach((k, v) -> { snapshotMap.put(k, v.snapshot()); }); return snapshotMap; } // 被动把快照写入指定的队列 void offerSnapshot(SnapshotQueue queue) { queue.offer(snapshot()); }}
请留神这里并没有应用ConcurrentHashMap,也没有任何波及多线程同步的解决。因为大多数算子是很轻量级的,监控程序也要做到足够轻量级,不拖慢算子的性能。大家如果深刻用过Profiler就晓得,对程序性能的细粒度的Profiling可能会减慢性能,使得性能测算后果不精确。在这里,咱们的做法是不在记录数据时时做多线程同步,免得同步所致的锁定和缓存驱赶升高性能。会有多个线程来执行算子,咱们让每个线程领有一个独立的无需同步的OperatorExecutor实例。有一个专门的线程来定时告诉每个OperatorExecutor实例,令其生成一份以后状态的快照并写入指定的队列,每个实例在本人的线程里生成快照是不须要额定做同步的。这种基于队列的异步通信比拟像Go channel(Go社区的敌人很相熟这格调吧,其实Java也能够做到)。这种定时获取指标数据的格调比拟像Prometheus。
同时也定时获取CPU使用率,这个能够用JDK自带的OperatingSystemMXBean
来实现,就不多讲了。
调度
第二步就是实现调度策略,有这些要点:
- 所谓的“挪动”算子,其实是在两处都部署了算子,在另一处启用算子,同时在原处禁用算子,如果做不到“同时”,就先启用再禁用,算子在流水线上会”至多执行一次“,须要实现幂等性。
- 流水线上的算子大多是有执行程序的(一步接一步),不可能只把后面的算子推到上游,却把前面的算子留在上游。这给咱们的调度带来了束缚。(对于这个能够有一些进一步的优化技术。)
- 调度规定最好能反对动静更新,这样有助于依据理论状况灵便调整。
尽管不是必须的,但咱们倡议部署一个专门的调度器(Scheduler)服务。它近程对立操控整个流水线的算子散布,调度规定也只须要由这个服务来集中管理,它与Source和Sink等节点通过REST API通信就能够了。例如,当Source CPU使用率太高,会引起一个性能事件(performance event),Scheduler收到此事件,依据调度规定决定把一个算子推到上游,它会先在上游节点启用此算子,再在Source节点禁用此算子。如果算子不是幂等的,咱们须要给被传输的数据加一个标记,这个标记只须要一个序数值来阐明它已达到流水线的第几步,这样上游收到它就能够间接从下一步开始。
待续……