乐趣区

flink

官方:https://flink.apache.org

逻辑架构

部署架构

基于 yarn 的部署


HA:job manager 单点
Standalone:Zookeeper
对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。

运行架构


client: 当用户提交一个 Flink 程序时,会首先创建一个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink 集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager。Flink 程序 =》JobGraph(Flink Dataflow: 多个 JobVertex 组成的 DAG,一个 JobGraph 包含了一个 Flink 程序的如下信息:JobID、Job 名称、配置信息、一组 JobVertex 等)。
jobmanager: 它负责接收 Flink Job,调度组成 Job 的多个 Task 的执行。同时,JobManager 还负责收集 Job 的状态信息,并管理 Flink 集群中从节点 TaskManager。JobManager 所负责的各项管理功能,它接收到并处理的事件主要包括:
RegisterTaskManager,SubmitJob,CancelJob,UpdateTaskExecutionState,RequestNextInputSplit,JobStatusChanged
worker JVM 进程多线程,task slot 内存隔离资源单位,一个 job 的的多讴歌 subtask 可以共享 slot,

计算模式

在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,并且都会去占用计算资源。对 Flink 来说 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源


https://ci.apache.org/project…
以上有 4 个源 4 个 map3 个 reduce。在 2 个 TM(每个 3 个 slots)的并行执行方式如下

其中每个可并行的有一个 JV 和并行和 EV. 比如 source 会在一个 JV 中保含 4 个 EV,ExecutionGraph 还包含 IntermediateResult 和 IntermediateResultPartition。前者跟踪 IntermediateDataSet 的状态,后者是每个分区的状态。

窗口与时间

倾斜窗口(Tumbling Windows,记录没有重叠)、滑动窗口(Slide Windows,记录有重叠)、会话窗口(Session Windows)
基于时间、数据
基于事件时间(事件创建时间)的水位线 watermark 算法:

当 1、watermark 时间 >= window_end_time(对于 out-of-order 以及正常的数据而言)
&& 2、在 [window_start_time,window_end_time) 中有数据存在 时窗口关闭开始计算
如下图:设定的 maxOutOfOrderness=10000L(10s),窗口 3s

  • 定期水位线
    用户定义 maxOutOfOrderness,两次水位线之间的数据可以用来调用方法生成下一次的时间,再往后推迟 maxOutOfOrderness 的时间即可。比如

    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
        val maxOutOfOrderness = 3500L; // 3.5 seconds
        var currentMaxTimestamp: Long;
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp;
        }
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
    
  • 标点水位线
    数据流中有标记事件才调用 extractTimestamp 生成新的 wartermark
  • 对于 map 等,是输入流事件时间的最小时间
  • 迟到事件:
    重新激活已经关闭的窗口并重新计算以修正结果。要保存上次结果重新计算,可能每个迟到事件都要触发。
    将迟到事件收集起来另外处理。直接返回收集结果
    将迟到事件视为错误消息并丢弃。

容错

快照:https://arxiv.org/pdf/1506.08…
源于 Chandy-Lamport 算法 https://lamport.azurewebsites…
https://ci.apache.org/project…
流障碍被注入流源的并行数据流中,它会将快照 n 的屏障发送到其所有输出流中。一旦接收器操作员(流式 DAG 的末端)从其所有输入流接收到障碍 n,它就向快照 n 确认检查点协调器。在所有接收器确认快照后,它被视为已完成。一旦完成了快照 n,作业将永远不再向源请求来自 Sn 之前的记录,因为此时这些记录(及其后代记录)将通过整个数据流拓扑。
完全一次调的保证:对其 (google 的 millwheel 用的每个数据生成唯一编号,dedup 去重实现 exactly-once(milwheel)) 接收到一个流的 n 后,这个流的数据暂存,直到其他流也到 n,对其发出快照
状态也要存储(转换函数,系统窗口数据缓冲区等等),信息很大,单独 state backend 存储,可存储在 HDFS 中(选项有内存,rocksdb 等)

内部优化

避免特定情况下 Shuffle、排序等昂贵操作,中间结果有必要进行缓存

迭代计算

机器学习和图计算使用
https://ci.apache.org/project…
普通迭代 + 增量迭代

退出移动版