乐趣区

关于flink:有赞-Flink-实时任务资源优化探索与实践

作者|沈磊

随着 Flink K8s 化以及实时集群迁徙实现,有赞越来越多的 Flink 实时工作运行在 K8s 集群上,Flink K8s 化晋升了实时集群在大促时弹性扩缩容能力,更好的升高大促期间机器扩缩容的老本。同时,因为 K8s 在公司外部有专门的团队进行保护,Flink K8s 化也可能更好的减低公司的运维老本。

不过以后 Flink K8s 工作资源是用户在实时平台端进行配置,用户自身对于实时工作具体配置多少资源教训较少,所以存在用户资源配置较多,但理论应用不到的情景。比方一个 Flink 工作实际上 4 个并发可能满足业务解决需要,后果用户配置了 16 个并发,这种状况会导致实时计算资源的节约,从而对于实时集群资源水位以及底层机器老本,都有肯定影响。基于这样的背景,本文从 Flink 工作内存以及音讯能力解决方面,对 Flink 工作资源优化进行摸索与实际。

一、Flink 计算资源类型与优化思路

1.1 Flink 计算资源类型

一个 Flink 工作的运行,所须要的资源我认为可能分为 5 类:

  1. 内存资源
  2. 本地磁盘(或云盘)存储
  3. 依赖的内部存储资源。比方 HDFS、S3 等(工作状态 / 数据),HBase、MySQL、Redis 等(数据)
  4. CPU 资源
  5. 网卡资源

目前 Flink 工作应用最次要的还是内存和 CPU 资源,本地磁盘、依赖的内部存储资源以及网卡资源个别都不会是瓶颈,所以本文咱们是从 Flink 工作的内存和 CPU 资源,两个方面来对 Flink 实时工作资源进行优化。

1.2 Flink 实时工作资源优化思路

对于 Flink 实时工作资源剖析思路,咱们认为次要蕴含两点:

  • 一是从工作内存视角,从堆内存方面对实时工作进行剖析。
  • 另一方面则是从实时工作音讯解决能力动手,保障满足业务方数据处理需要的同时,尽可能正当应用 CPU 资源。

之后再联合实时工作内存剖析所得相干指标、实时工作并发度的合理性,得出一个实时工作资源预设值,在和业务方充沛沟通后,调整实时工作资源,最终达到实时工作资源配置合理化的目标,从而更好的升高机器应用老本。

■ 1.2.1 工作内存视角

那么如何剖析 Flink 工作的堆内存呢?这里咱们是联合 Flink 工作 GC 日志来进行剖析。GC 日志蕴含了每次 GC 堆内不同区域内存的变动和应用状况。同时依据 GC 日志,也可能获取到一个 Taskmanager 每次 Full GC 后,老年代残余空间大小。能够说,获取实时工作的 GC 日志,使咱们进行实时工作内存剖析的前提。

GC 日志内容分析,这里咱们借助开源的 GC Viewer 工具来进行具体分析,每次剖析完,咱们可能获取到 GC 相干指标,上面是通过 GC Viewer 剖析一次 GC 日志的局部后果:

下面通过 GC 日志剖析出单个 Flink Taskmanager 堆总大小、年老代、老年代调配的内存空间、Full GC 后老年代残余大小等,当然还有很多其余指标,相干指标定义能够去 Github 具体查看。

这里最重要的还是 Full GC 后老年代残余大小这个指标,依照《Java 性能优化权威指南》这本书 Java 堆大小计算法令,设 Full GC 后老年代残余大小空间为 M,那么堆的大小倡议 3 ~ 4 倍 M,新生代为 1 ~ 1.5 倍 M,老年代应为 2 ~ 3 倍 M,当然,实在对内存配置,你能够依照理论状况,将相应比例再调大些,用以避免流量暴涨情景。

所以通过 Flink 工作的 GC 日志,咱们能够计算出实时工作举荐的堆内存总大小,当发现举荐的堆内存和理论实时工作的堆内存大小相差过大时,咱们就认为可能去升高业务方实时工作的内存配置,从而升高机器内存资源的应用。

■ 1.2.2 工作音讯解决能力视角

对于 Flink 工作音讯解决能力剖析,咱们次要是看实时工作生产的数据源单位工夫的输出,和实时工作各个 Operator / Task 音讯解决能力是否匹配。Operator 是 Flink 工作的一个算子,Task 则是一个或者多个算子 Chain 起来后,一起执行的物理载体。

数据源咱们外部个别应用 Kafka,Kafka Topic 的单位工夫输出能够通过调用 Kafka Broker JMX 指标接口进行获取,当然你也能够调用 Flink Rest Monitoring 相干 API 获取实时工作所有 Kafka Source Task 单位工夫输出,而后相加即可。不过因为反压可能会对 Source 端的输出有影响,这里咱们是间接应用 Kafka Broker 指标 JMX 接口获取 Kafka Topic 单位工夫输出。

在获取到实时工作 Kafka Topic 单位工夫输出后,上面就是判断实时工作的音讯解决能力是否与数据源输出匹配。一个实时工作整体的音讯解决能力,会受到解决最慢的 Operator / Task 的影响。打个比方,Flink 工作生产的 Kafka Topic 输出为 20000 Record / S,然而有一个 Map 算子,其并发度为 10,Map 算子中业务方调用了 Dubbo,一个 Dubbo 接口从申请到返回为 10 ms,那么 Map 算子解决能力 1000 Record / S (1000 ms / 10 ms * 10),从而实时工作解决能力会降落为 1000 Record / S。

因为一条音讯记录的解决会在一个 Task 外部流转,所以咱们试图找出一个实时工作中,解决最慢的 Task 逻辑。如果 Source 端到 Sink 端全副 Chain 起来的话,咱们则是会找出解决最慢的 Operator 的逻辑。在源码层,咱们针对 Flink Task 以及 Operator 减少了单条记录解决工夫的自定义 Metric,之后该 Metric 能够通过 Flink Rest API 获取。咱们会遍历一个 Flink 工作中所有的 Task , 查询处理最慢的 Task 所在的 JobVertex(JobGraph 的点),而后获取到该 JobVertex 所有 Task 的总输入,最终会和 Kafka Topic 单位工夫输出进行比对,判断实时工作音讯解决能力是否正当。

设实时工作 Kafka Topic 单位工夫的输出为 S,解决最慢的 Task 代表的 JobVertex 的并发度为 P,解决最慢的 Task 所在的 JobVertex 单位工夫输入为 O,解决最慢的 Task 的最大音讯解决工夫为 T,那么通过上面逻辑进行剖析:

  1. 当 O 约等于 S,且 1 second / T * P 远大于 S 时,会思考减小工作并发度。
  2. 当 O 约等于 S,且 1 second / T * P 约等于 S 时,不思考调整工作并发度。
  3. 当 O 远小于 S,且 1 second / T * P 远小于 S 时,会思考减少工作并发度。

目前次要是 1 这种状况在 CPU 应用方面不合理,当然,因为不同时间段,实时工作的流量不同,所以咱们会有一个周期性检测的的工作,如果检测到某个实时工作间断屡次都合乎 1 这种状况时,会主动报警提醒平台管理员进行资源优化调整。
下图是从 Flink 工作的内存以及音讯解决能力两个视角剖析资源逻辑图:

二、从内存视角对 Flink 剖析实际

2.1 Flink 工作垃圾回收器抉择

Flink 工作实质还是一个 Java 工作,所以也就会波及到垃圾回收器的抉择。抉择垃圾回收器个别须要从两个角度进行参考:

  1. 吞吐量,即单位工夫内,工作执行工夫 / (工作执行工夫 + 垃圾回收工夫),当然并不是说升高 GC 进展工夫就能晋升吞吐量,因为升高 GC 进展工夫,你的 GC 次数也会回升。
  2. 提早。如果你的 Java 程序波及到与内部交互,提早会影响内部的申请应用体验。

Flink 工作我认为还是并重吞吐量的一类 Java 工作,所以会从吞吐量角度进行更多的考量。当然并不是说齐全不思考提早,毕竟 JobManager、TaskManager、ResourceManager 之间存在心跳,提早过大,可能会有心跳超时的可能性。

目前咱们 JDK 版本为外部 JDK 1.8 版本,新生代垃圾回收器应用 Parallel Scavenge,那么老年代垃圾回收器只能从 Serial Old 或者 Parallel Old 中抉择。因为咱们 Flink k8s 工作每个 Pod 的 CPU 限度为 0.6 – 1 core,最大也只能应用 1 个 core,所以老年代的垃圾回收器咱们应用的是 Serial Old,多线程垃圾回收在单 Core 之间,可能会有线程切换的耗费。

2.2 实时工作 GC 日志获取

设置完垃圾回收器后,下一步就是获取 Flink 工作的 GC 日志。Flink 工作形成个别是单个 JobManager + 多个 TaskManger,这里须要获取到 TaskManager 的 GC 日志进行剖析。那是不是要对所有 TaskManager 进行获取呢。这里咱们依照 TaskManager 的 Young GC 次数,依照次数大小进行排序,取排名前 16 的 TaskManager 进行剖析。YoungGC 次数能够通过 Flink Rest API 进行获取。

Flink on Yarn 实时工作的 GC 日志,间接点开 TaskManager 的日志链接就可能看到,而后通过 HTTP 拜访,就能下载到本地。Flink On k8s 工作的 GC 日志,会先写到 Pod 所挂载的云盘,基于 k8s hostpath volume 进行挂载。咱们外部应用 Filebeat 进行日志文件变更监听和采集,最终输入到上游的 Kafka Topic。咱们外部会有自定义日志服务端,它会生产 Kafka 的日志记录,主动进行落盘和治理,同时向外提供日志下载接口。通过日志下载的接口,便可能下载到须要剖析的 TaskManager 的 GC 日志。

2.3 基于 GC Viewer 剖析 Flink 工作内存

GC Viewer 是一个开源的 GC 日志剖析工具。应用 GC Viewer 之前,须要先把 GC Viewer 我的项目代码 clone 到本地,而后进行编译打包,就能够应用其性能。

在对一个实时工作堆内存进行剖析时,先把 Flink TaskManager 的日志下载到本地,而后通过 GC Viewer 对日志进行。如果你感觉多个 Taskmanager GC 日志剖析较慢时,能够应用多线程。下面所有这些操作,能够将其代码化,自动化产出剖析后果。上面是通过 GC Viewer 剖析的命令行:

java -jar gcviewer-1.37-SNAPSHOT.jar gc.log summary.csv

下面参数 gc.log 示意一个 Taskmanager 的 GC 日志文件名称,summary.csv 示意日志剖析的后果。上面是咱们平台对于某个实时工作内存剖析的后果:

上面是下面截图中,局部参数阐明:

  1. RunHours,Flink 工作运行小时数
  2. YGSize,一个 TaskManager 新生代堆内存最大调配量,单位兆
  3. YGUsePC,一个 TaskManager 新生代堆最大使用率
  4. OGSize,一个 TaskManager 老年代堆内存最大调配量,单位兆
  5. OGUsePC,一个 TaskManager 老生代堆最大使用率
  6. YGCoun,一个 TaskMnager Young GC 次数
  7. YGPerTime,一个 TaskMnager Young GC 每次进展工夫,单位秒
  8. FGCount,一个 TaskMnager Full GC 次数
  9. FGAllTime,一个 TaskMnager Full GC 总工夫,单位秒
  10. Throught,Task Manager 吞吐量
  11. AVG PT(剖析后果 avgPromotion 参数),均匀每次 Young GC 降职到老年代的对象大小
  12. Rec Heap,举荐的堆大小
  13. RecNewHeap,举荐的新生代堆大小
  14. RecOldHeap,举荐的老年代堆大小

上述大部分内存剖析后果,通过 GC Viewer 剖析都能失去,不过举荐堆大小、举荐新生代堆大小、举荐老年代堆大小则是依据 1.2.1 大节的内存优化规定来设置。

三、从音讯解决视角对 Flink 剖析实际

3.1 实时工作 Kafka Topic 单位工夫输出获取

想要对 Flink 工作的音讯解决能力进行剖析,第一步便是获取该实时工作的 Kafka 数据源 Topic,目前如果数据源不是 Kafka 的话,咱们不会进行剖析。Flink 工作总体分为两类:Flink Jar 工作和 Flink SQL 工作。Flink SQL 工作获取 Kafka 数据源比较简单,间接解析 Flink SQL 代码,而后获取到 With 前面的参数,在过滤掉 Sink 表之后,如果 SQLCreateTable 的 Conector 类型为 Kafka,就可能通过 SQLCreateTable with 后的参数,拿到具体 Kafka Topic。

Flink Jar 工作的 Kafka Topic 数据源获取绝对繁琐一些,咱们外部有一个实时工作血统解析服务,通过对 Flink Jar 工作主动构建其 PackagedProgram,PackagedProgram 是 Flink 外部的一个类,而后通过 PackagedProgram,咱们能够获取一个 Flink Jar 工作的 StreamGraph,StreamGraph 外面有 Source 和 Sink 的所有 StreamNode,通过反射,咱们能够获取 StreamNode 外面具体的 Source Function,如果是 Kafka Source Sunction,咱们就会获取其 Kafka Topic。上面是 StreamGraph 类截图:

获取到 Flink 工作的 Kafka Topic 数据源之后,下一步便是获取该 Topic 单位工夫输出的音讯记录数,这里能够通过 Kafka Broker JMX Metric 接口获取,咱们则是通过外部 Kafka 治理平台提供的内部接口进行获取。

3.2 自动化检测 Flink 音讯解决最慢 Task

首先,咱们在源码层减少了 Flink Task 单条记录解决工夫的 Metric,这个 Metric 能够通过 Flink Rest API 获取。接下来就是借助 Flink Rest API,遍历要剖析的 Flink 工作的所有的 Task。Flink Rest Api 有这样一个接口:

base_flink_web_ui_url/jobs/:jobid

这个接口可能获取一个工作的所有 Vertexs,一个 Vertex 能够简略了解为 Flink 工作 JobGraph 外面的一个 JobVertex。JobVertex 代表着实时工作中一段执行逻辑。

获取完 Flink 工作所有的 Vertex 之后,接下来就是获取每个 Vertex 具体 Task 解决单条记录的 metric,能够应用上面的接口:

须要在上述 Rest API 链接 metrics 之后增加 ?get=(具体 meitric),比方:metrics?get=0.Filter.numRecordsOut,0 示意该 Vertex Task 的 id,Filter.numRecordsOut 则示意具体的指标名称。咱们外部应用 taskOneRecordDealTime 示意 Task 解决单条记录时间 Metric,而后用 0.taskOneRecordDealTime 去获取某个 Task 的单条记录解决工夫的指标。下面接口反对多个指标查问,即 get 前面应用逗号隔开即可。

最终自动化检测 Flink 音讯解决最慢 Task 整体步骤如下:

  1. 获取一个实时工作所有的 Vertexs
  2. 遍历每个 Vertex,而后获取这个 Vertex 所有并发度 Task 的 taskOneRecordDealTime,并且记录其最大值
  3. 所有 Vertex 单条记录解决 Metric 最大值进行比照,找出解决工夫最慢的 Vertex。

上面是咱们实时平台对于一个 Flink 实时任务分析的后果:

四、有赞 Flink 实时工作资源优化实际

既然 Flink 工作的内存以及音讯解决能力剖析的形式曾经有了,那接下来就是在实时平台端进行具体实际。咱们实时平台每天会定时扫描所有正在运行的 Flink 工作,在工作内存方面,咱们可能联合 实时工作 GC 日志,同时依据内存优化规定,计算出 Flink 工作举荐的堆内存大小,并与理论调配的 Flink 工作的堆内存进行比拟,如果两者相差的倍数过大时,咱们认为 Flink 工作的内存配置存在节约的状况,接下来咱们会报警提醒到平台管理员进行优化。

平台管理员再收到报警提醒后,同时也会断定实时工作音讯能力是否正当,如果音讯解决最慢的 Vertex (某段实时逻辑),其所有 Task 单位工夫解决音讯记录数的总和约等于实时工作生产的 Kafka Topic 单位工夫的输出,但通过 Vertex 的并发度,以及单条音讯解决 Metric,算出该 Vertex 单位工夫解决的音讯记录数远大于 Kafka Topic 的单位输出时,则认为 Flink 工作能够适当调小并发度。具体调整多少,会和业务方沟通之后,在进行调整。整体 Flink 工作资源优化操作流程如下:

五、总结

目前有赞实时计算平台对于 Flink 工作资源优化摸索曾经走出第一步。通过自动化发现可能优化的实时工作,而后平台管理员染指剖析,最终判断是否可能调整 Flink 工作的资源。在整个实时工作资源优化的链路中,目前还是不够自动化,因为在后半段还须要人为因素。将来咱们打算 Flink 工作资源的优化全副自动化,会联合实时工作历史不同时段的资源应用状况,自动化揣测和调整实时工作的资源配置,从而达到晋升整个实时集群资源利用率的目标。

同时将来也会和元数据平台的同学进行单干,一起从更多方面来剖析实时工作是否存在资源优化的可能性,他们在原来离线工作资源方面积攒了很多优化教训,将来也能够参考和借鉴,利用到实时工作资源的优化中。

当然,最理想化就是实时工作的资源应用可能本人主动弹性扩缩容,之前听到过社区同学有这方面的声音,同时也欢送你可能和我一起探讨。

退出移动版