作者:章剑锋(简锋)

去年Flink Forward在讲Flink on Zeppelin这个我的项目的将来时咱们谈到了对Application 模式的反对,明天就有一个好消息要通知大家,社区曾经实现了这一Feature,欢送大家下载最新版来应用这个Feature。

Application mode 是 Flink 1.11 之后引入的新的运行模式,所要解决的问题就是缩小客户端的压力,把用户的main函数运行在JobManager里而不是在用户客户端。这种模式是非常适合Flink on Zeppelin的,因为Flink on Zeppelin的客户端就是Flink interpreter过程,而Flink interpreter是一个long running的main函数,一直承受来自前端的命令,进行相应的操作(比方提交Job,进行Job 等等)。接下来咱们就要具体讲下Zeppelin如何实现了Yarn Application模式,以及如何应用这一模式。

架构

在讲Yarn Application模式架构的时候,咱们顺便来讲下 Flink on Zeppelin的架构演变过程。

一般的Flink on Yarn 运行模式

这种模式的客户端中,Flink Interpreter 运行在 Zeppelin这边,每个客户端对应一个Yarn上的Flink Cluster,如果Flink Interpreter过程很多,会对Zeppelin这台机器造成很大的压力。

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/wt1g3h

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=6

Yarn Interpreter 模式

Yarn Interpreter 把客户端 (Flink Interpreter)移到了 Yarn集群,把资源压力转移到了Yarn集群,解决上下面一般Flink on Yarn 运行模式的一部分问题,这种模式会须要为每个Flink Cluster额定申请一个Yarn Container来运行这个Flink Interpreter,在资源利用方面并不是很高效。

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/gcah8t

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=24

Yarn Application 模式

Yarn Application 模式彻底解决了后面2种模式的问题,把Flink interpreter 跑在了JobManager里,这样既不影响Zeppelin Server这台机器的资源压力,也不会对Yarn集群资源造成任何节约。

如何应用 Yarn Application 模式

配置Yarn Application 模式非常简单,只有把 flink.execution.mode 设为yarn_application 即可。其余所有配置与其余模式没有区别。上面的所有Flink on Zeppelin的个性在Yarn Application模式下都能够照常应用。咱们也借这个机会来Review下Flink on Zeppelin的所有性能。

多语言反对

在同一个Flink Cluster内反对以下3种语言,并且买通这3种语言(共享Catalog,共享ExecutionEnvironment)

  • Scala (%flink)
  • PyFlink (%flink.pyflink)
  • SQL (%flink.ssql, %flink.bsql)

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/pg5s82

https://www.yuque.com/jeffzhangjianfeng/gldg8w/ggxz76

https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=4

Hive整合

简略配置就能够启用Hive:

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/agf94n

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=10

UDF反对

反对以下4种形式定义和应用Flink UDF

  • 在Zeppelin中间接写Scala UDF
  • 在Zeppelin中间接写PyFlink UDF
  • 用SQL 创立 UDF
  • 应用flink.udf.jars 来指定含有udf的jar

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=17

https://www.bilibili.com/video/BV1Te411W73b?p=18

https://www.bilibili.com/video/BV1Te411W73b?p=19

第三方依赖

在Zeppelin里能够用以下2种形式来指定第三方依赖,具体

  • flink.excuetion.packages
  • flink.execution.jars (须要留神的是在Yarn Application 模式下,这里须要指定HDFS门路,因为Flink Interpreter运行在JobManager里,而JobManager是跑在yarn container, 在yarn container那台NodeManager机器上不肯定有你要指定的jar)

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s

参考视频:https://www.bilibili.com/video/BV1Te411W73b?p=15

Checkpoint & Savepoint

Checkpoint 和 Savepoint 照常应用,

参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/mlnswx

SQL高级性能

Zeppelin 对 Flink SQL做了一系列加强性能,这些加强性能都能够照常应用,比方:

  • 同时反对Batch SQL 和 Streaming SQL
  • 多语句反对
  • Comment反对
  • Job并行度反对
  • Multiple insert反对
  • JobName的设置
  • Stream SQL 流式数据可视化
具体参考文档:https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c