1 背景

咱们在应用Flink开发实时工作时,都会用到框架自身提供的DataStream API,这使得用户不能不用Java或者Scala甚至Python来编写业务逻辑;这种形式尽管灵便且表白性强,但对用户具备肯定的开发门槛,并且随着版本的不断更新,DataStream API也有很多老版本不兼容的问题。所以Flink SQL就成了宽广开发用户的最佳抉择,之所以Flink推出SQL API,次要是因为SQL有如下几个重要个性:

  1. 申明式API:
    用户只关怀做什么,而不关怀怎么做;
  2. 主动优化:
    屏蔽底层API的复杂性,主动做优化;
  3. 简略易懂:
    SQL利用于不同行业和畛域,学习老本较低;
  4. 不易变动:
    语法遵循SQL标准规范,不易变动;
  5. 流批对立:
    同样的SQL代码,能够用流和批的形式执行。

尽管Flink提供了SQL能力,但还是有必要基于Flink SQL打造属于本人的平台,目前搭建SQL平台的形式有如下几种:

  1. Flink原生API:
    应用Flink提供的SQL API,封装一个通用的pipeline jar,利用flink shell脚本工具提交sql工作;
  2. Apache Zeppelin:
    一款开源产品,利用notebook形式治理sql工作,目前曾经与Flink集成,且提供了丰盛的SDK;
  3. Flink Sql Gateway:
    Flink官网出品的一个Sql网关,用Rest形式执行Flink Sql。

第一种形式不足灵活性,且大量提交工作时,有性能瓶颈;而Zeppelin尽管功能强大,但页面性能无限,如果要基于Zeppelin打造SQL平台,要么应用SDK,要么对Zeppelin做重度的二次开发;所以Flink Sql Gateway比拟适宜做平台化建设,因为它是一个独立的网关服务,不便与公司现有系统集成,齐全与其它零碎解耦,本文也次要论述Flink Sql Gateway的实际与摸索。

2 Flink Sql Gateway 简介

2.1 架构


如上图所示,Flink Sql Gateway的架构比较简单,次要组件是SqlGatewayEndpoint,它是基于Flink的RestServerEndpoint实现的一个Netty服务,通过自定义实现多种handler来实现sql工作的创立和部署,以及治理的能力。SqlGatewayEndpoint外部次要由SessionManager(会话治理)组成,SessionManager保护了一个session map,而session外部次要是一些上下文配置和环境信息。

  1. SqlGatewayEndpoint:
    基于RestServerEndpoint实现的Netty服务,对外提供Rest Api;
  2. SessionManager :
    会话管理器,治理session创立与删除;
  3. Session:
    一个会话,外面寄存着工作所须要的Flink配置和上下文环境信息,负责工作的执行;
  4. Classpath:
    Flink Sql Gateway启动时会加载flink装置目录的classpath,所以flink sql gateway 基本上没有除flink以外的相干依赖。

2.2 执行流程

sql gateway其实只是一个一般的NIO服务器,每个Handler都会持有SessionManager的援用,因而能够独特拜访同一个SessionManager对象。当申请达到时,Handler会获取申请中的参数,如SessionId等,去SessionManager中查问对应的Session,从而执行提交sql、查问工作状态等工作。申请流程如下图所示:

创立 session,这是应用sql gateway的第一步,SessionManager会把用户传入的工作执行模式、配置、planner引擎形式等参数封装成Session对象,放入map中,并返回sessionid给用户;

用户持有sessionid,发动sql request的申请,gateway依据sessionid找到对应的Session对象,开始部署sql job到yarn / kubernetes;

2.3 性能

2.3.1 工作部署

Flink Sql Gateway作为Flink的客户端,工作部署间接使用了Flink的能力,而 Flink目前反对三种部署模式:

  1. in Application Mode,
  2. in a Per-Job Mode,
  3. in Session Mode。

三种模式有如下两个区别:

  1. 集群生命周期和资源隔离:
    per-job mode的集群生命周期与job雷同,但有较强的资源隔离保障。
  2. 应用程序的main()办法是在客户端还是在集群上执行:
    session mode和per-job mode在客户端上执行,而application mode在集群上执行。

从以上能够看出,Application Mode为每个应用程序创立一个会话集群,并在集群上执行应用程序的 main() 办法,所以它是session mode和per-job的一个折中计划。

目前为止,Flink只反对jar包工作的application mode,所以想要实现sql工作的application mode,须要本人革新实现,前面会讲实现办法。

2.3.2 SQL 能力

Flink Sql Gateway反对的Sql语法如下:

Flink Sql Gateway反对所有Flink Sql语法,但自身也有一些限度:

  1. 不反对多条sql执行,多条insert into执行会产生多个工作;
  2. 不残缺的set反对,对于set语法反对存在bug;
  3. Sql Hit反对不是很敌对,写在sql里比拟容易出错。

3 平台化革新

3.1 SQL 的 application mode 实现

后面说到,flink不反对sql工作的application mode部署,只反对jar包工作。jar 包工作的application mode实现如下图所示:

  1. flink-clients解析出用户的配置和jar包信息;
  2. ApplicationConfiguration里指定了main办法的入口类名和入参;
  3. ApplicationDeployer负责把Jobmanager启动,并且启动时执行Flink Application的main办法。

通过以上流程能够看出,要实现sql的application mode,实现通用执行sql的pipeline jar是要害:

实现一个执行sql的通用pipeline jar包,并且事后传到yarn或者k8s,如下所示:

在ApplicationConfiguration中指定
pepeline jar的main办法入口和参数:

3.2 多 Yarn 集群反对

目前Flink只反对单Yarn环境的工作部署,对于领有多套Yarn环境的场景,须要部署多套Flink环境,每个Flink对应一个Yarn环境配置;尽管这种形式能解决问题,但并不是最优的解决方案。相熟Flink应该都晓得,Flink应用 ClusterClientFactory的SPI来生成与内部资源零碎(Yarn/kubernetes)的拜访介质(ClusterDescriptor),通过ClusterDescriptor能够实现与资源零碎的交互,比方YarnClusterDescriptor,它持有YarnClient对象,能够实现与Yarn的交互;所以对于多Yarn环境,咱们只有保障YarnClusterDescriptor 里持有的YarnClient对象与Yarn环境一一对应即可,代码如下图所示:

作者简介

Zheng OPPO高级数据平台工程师

次要负责基于Flink的实时计算平台开发, 对Flink有较丰盛的研发教训, 也曾参加过Flink社区的奉献。

获取更多精彩内容,扫码关注[OPPO数智技术]公众号