共计 9380 个字符,预计需要花费 24 分钟才能阅读完成。
摘要:本文整顿自京东高级技术专家韩飞在 Flink Forward Asia 2021 流批一体专场的分享。次要内容包含:
- 整体思考
- 技术计划及优化
- 落地案例
- 将来瞻望
点击查看直播回放 & 演讲 PDF
一、整体思考
提到流批一体,不得不提传统的大数据平台 —— Lambda 架构。它可能无效地撑持离线和实时的数据开发需要,但它流和批两条数据链路割裂所导致的高开发保护老本以及数据口径不统一是无奈漠视的缺点。
通过一套数据链路来同时满足流和批的数据处理需要是最现实的状况,即流批一体。此外咱们认为流批一体还存在一些两头阶段,比方只实现计算的对立或者只实现存储的对立也是有重大意义的。
以只实现计算对立为例,有一些数据利用的实时性要求比拟高,比方心愿端到端的数据处理延时不超过一秒钟,这对目前开源的、适宜作为流批对立的存储来说是一个很大的挑战。以数据湖为例,它的数据可见性与 commit 的距离相干,进而与 Flink 做 checkpoint 的工夫距离相干,此个性联合数据处理链路的长度,可见做到端到端一秒钟的解决并不容易。因而对于这类需要,只实现计算对立也是可行的。通过计算对立去升高用户的开发及保护老本,解决数据口径不统一的问题。
在流批一体技术落地的过程中,面临的挑战能够总结为以下 4 个方面:
- 首先是 数据实时性。如何把端到端的数据时延升高到秒级别是一个很大的挑战,因为它同时波及到计算引擎及存储技术。它实质上属于性能问题,也是一个长期指标。
- 第二个挑战是 如何兼容好在数据处理畛域曾经广泛应用的离线批处理能力。此处波及开发和调度两个层面的问题,开发层面次要是复用的问题,比方如何复用曾经存在的离线表的数据模型,如何复用用户曾经在应用的自定义开发的 Hive UDF 等。调度层面的问题次要是如何正当地与调度零碎进行集成。
- 第三个挑战是 资源及部署问题。比方通过不同类型的流、批利用的混合部署来进步资源利用率,以及如何基于 metrics 来构建弹性伸缩能力,进一步提高资源利用率。
- 最初一个挑战也是最艰难的一个:用户观点。大多数用户对于比拟新的技术理念通常仅限于技术交换或者验证,即便验证之后感觉能够解决理论问题,也须要期待适合的业务来试水。这个问题也催生了一些思考,平台侧肯定要多站在用户的视角对待问题,正当地评估对用户的现有技术架构的改变老本以及用户收益、业务迁徙的潜在危险等。
上图是京东实时计算平台的全景图,也是咱们实现流批一体能力的载体。两头的 Flink 基于开源社区版本深度定制。基于该版本构建的集群,内部依赖蕴含三个局部,JDOS、HDFS/CFS 和 Zookeeper。
- JDOS 是京东的 Kubernetes 平台,目前咱们所有 Flink 计算工作容器化的,都运行在这套平台之上;
- Flink 的状态后端有 HDFS 和 CFS 两种抉择,其中 CFS 是京东自研的对象存储;
- Flink 集群的高可用是基于 Zookeeper 构建的。
在利用开发方式方面,平台提供 SQL 和 Jar 包两种形式,其中 Jar 的形式反对用户间接上传 Flink 利用 Jar 包或者提供 Git 地址由平台来负责打包。除此之外咱们平台化的性能也绝对比较完善,比方根底的元数据服务、SQL 调试性能,产品端反对所有的参数配置,以及基于 metrics 的监控、工作日志查问等。
连贯数据源方面,平台通过 connector 反对了丰盛的数据源类型,其中 JDQ 基于开源 Kafka 定制,次要利用于大数据场景的音讯队列;JMQ 是京东自研,次要利用于在线零碎的音讯队列;JimDB 是京东自研的分布式 KV 存储。
在以后 Lambda 架构中,假如实时链路的数据存储在 JDQ,离线链路的数据存在 Hive 表中,即使计算的是同一业务模型,元数据的定义也经常是存在差别的,因而咱们引入对立的逻辑模型来兼容实时离线两边的元数据。
在计算环节,通过 FlinkSQL 联合 UDF 的形式来实现业务逻辑的流批对立计算,此外平台会提供大量的专用 UDF,同时也反对用户上传自定义 UDF。针对计算结果的输入,咱们同样引入对立的逻辑模型来屏蔽流批两端的差别。对于只实现计算对立的场景,能够将计算结果别离写入流批各自对应的存储,以保证数据的实时性与先前保持一致。
对于同时实现计算对立和存储对立的场景,咱们能够将计算的后果间接写入到流批对立的存储。咱们抉择了 Iceberg 作为流批对立的存储,因为它领有良好的架构设计,比方不会绑定到某一个特定的引擎等。
在兼容批处理能力方面,咱们次要进行了以下三个方面的工作:
第一,复用离线数仓中的 Hive 表。
以数据源端为例,为了屏蔽上图左侧图中流、批两端元数据的差别,咱们定义了逻辑模型 gdm_order_m 表,并且须要用户显示地指定 Hive 表和 Topic 中的字段与这张逻辑表中字段的映射关系。这里映射关系的定义十分重要,因为基于 FlinkSQL 的计算只需面向这张逻辑表,而无需关怀理论的 Hive 表与 Topic 中的字段信息。在运行时通过 connector 创立流表和批表的时候,逻辑表中的字段会通过映射关系被替换成理论的字段。
在产品端,咱们能够给逻辑表别离绑定流表和批表,通过拖拽的形式来指定字段之间的映射关系。这种模式使得咱们的开发方式与之前有所差别,之前的形式是先新建一个工作并指定是流工作还是批工作,而后进行 SQL 开发,再去指定工作相干的配置,最初公布工作。而在流批一体模式下,开发模式变为了首先实现 SQL 的开发,其中包含逻辑的、物理的 DDL 的定义,以及它们之间的字段映射关系的指定,DML 的编写等,而后别离指定流批工作相干的配置,最初公布成流批两个工作。
第二,与调度零碎买通。
离线数仓的数据加工根本是以 Hive/Spark 联合调度的模式,以上图中居中的图为例,数据的加工被分为 4 个阶段,别离对应数仓的 BDM、FDM、GDM 和 ADM 层。随着 Flink 能力的加强,用户心愿把 GDM 层的数据加工工作替换为 FlinkSQL 的批工作,这就须要把 FlinkSQL 批工作嵌入到以后的数据加工过程中,作为两头的一个环节。
为了解决这个问题,除了工作自身反对配置调度规定,咱们还买通了调度零碎,从中继承了父工作的依赖关系,并将工作本身的信息同步到调度零碎中,反对作为上游工作的父工作,从而实现了将 FlinkSQL 的批工作作为原数据加工的其中一个环节。
第三,对用户自定义的 Hive UDF、UDAF 及 UDTF 的复用。
对于现存的基于 Hive 的离线加工工作,如果用户曾经开发了 UDF 函数,那么最现实的形式是在迁徙 Flink 时对这些 UDF 进行间接复用,而不是依照 Flink UDF 定义从新实现。
在 UDF 的兼容问题上,针对应用 Hive 内置函数的场景,社区提供了 load hive modules 计划。如果用户心愿应用本人开发的 Hive UDF,能够通过应用 create catalog、use catalog、create function,最初在 DML 中调用的形式来实现, 这个过程会将 Function 的信息注册到 Hive 的 Metastore 中。从平台治理的角度,咱们心愿用户的 UDF 具备肯定的隔离性,限度用户 Job 的粒度,缩小与 Hive Metastore 交互以及产生脏函数元数据的危险。
此外,当元信息曾经被注册过,心愿下次能在 Flink 平台端失常应用,如果不应用 if not exist 语法,通常须要先 drop function,再进行 create 操作。然而这种形式不够优雅,同时也对用户的应用形式有限度。另一种解决办法是用户能够注册长期的 Hive UDF,在 Flink1.12 中注册长期 UDF 的形式是 create temporary function,然而该 Function 须要实现 UserDefinedFunction 接口后能力通过前面的校验,否则会注册失败。
所以咱们并没有应用 create temporary function,而是对 create function 做了一些调整,扩大了 ExtFunctionModule,将解析进去的 FunctionDefinition 注册到 ExtFunctionModule 中,做了一次 Job 级别的长期注册。这样的益处就是不会净化 Hive Metastore,提供了良好的隔离性,同时也没有对用户的应用习惯产生限度,提供了良好的体验。
不过这个问题在社区 1.13 的版本曾经失去了综合的解决。通过引入 Hive 解析器等扩大,曾经能够把实现 UDF、GenericUDF 接口的自定义 Hive 函数通过 create temporary function 语法进行注册和应用。
资源占用方面,流解决和批处理是人造错峰的。对于批处理,离线数仓每天 0 点开始计算过来一整天的数据,所有的离线报表的数据加工会在第二天下班前全副实现,所以通常 00:00 到 8:00 是批计算工作大量占用资源的时间段,而这个时间段通常在线的流量都比拟低。流解决的负载与在线的流量是正相干的,所以这个时间段流解决的资源需要是比拟低的。上午 8 点到早晨 0 点,在线的流量比拟高,而这个时间段批处理的工作大部分都不会被触发执行。
基于这种人造的错峰,咱们能够通过在专属的 JDOS Zone 中进行不同类型的流批利用的混部来晋升资源的使用率,并且如果对立应用 Flink 引擎来解决流批利用,资源的使用率会更高。
同时为了使利用能够基于流量进行动静调整,咱们还开发了主动弹性伸缩的服务 (Auto-Scaling Service)。它的工作原理如下:运行在平台上的 Flink 工作上报 metrics 信息到 metrics 零碎,Auto-Scaling Service 会基于 metrics 零碎中的一些要害指标,比方 TaskManager 的 CPU 使用率、工作的背压状况等来断定工作是否须要增减计算资源,并把调整的后果反馈给 JRC 平台,JRC 平台通过内嵌的 fabric 客户端将调整的后果同步到 JDOS 平台,从而实现对 TaskManager Pod 个数的调整。此外,用户能够在 JRC 平台上通过配置来决定是否为工作开启此性能。
上图右侧图表是咱们在 JDOS Zone 中进行流批混部并联合弹性伸缩服务试点测试时的 CPU 应用状况。能够看到 0 点流工作进行了缩容,将资源开释给批工作。咱们设置的新工作在 2 点开始执行,所以从 2 点开始直到早上批工作完结这段时间,CPU 的使用率都比拟高,最高到 80% 以上。批工作运行完结后,在线流量开始增长时,流工作进行了扩容,CPU 的使用率也随之回升。
二、技术计划及优化
流批一体是以 FlinkSQL 为外围载体,所以咱们对于 FlinkSQL 的底层能力也做了一些优化,次要分为维表优化、join 优化、window 优化和 Iceberg connector 优化几个方面。
首先是维表相干的几个优化。目前社区版本的 FlinkSQL 只反对局部数据源 sink 算子并行度的批改,并不反对 source 以及两头解决算子的并行度批改。
假如一个 FlinkSQL 工作生产的 topic 有 5 个分区,那么上游算子的理论并行度是 5,算子之间是 forward 的关系。对于数据量比拟大的维表 join 场景,为了提高效率,咱们心愿并行度高一些,心愿能够灵便设置它的并行度而不与上游的分区数绑定。
基于此,咱们开发了预览拓扑的性能,不论是 Jar 包、SQL 工作都能够解析并生成 StreamGraph 进行预览,进一步还能反对批改分组、算子 chain 的策略、并行度、设置 uid 等。
借助这个性能,咱们还能够调整维表 join 算子的并行度,并且将分区策略由 forward 调整为 rebalance,而后把这些调整后的信息更新到 StreamGraph。此外咱们还实现了动静 rebalance 策略,能够基于 backLog 去判断上游分区中的负载状况,从而抉择最优的分区进行数据散发。
为了晋升维表 join 的性能,咱们对所有平台反对的维表数据源类型都实现了异步 IO 并反对在内存中做缓存。不论是原生的 forward 形式还是 rebalance 形式,都存在缓存生效和替换的问题。那么,如何进步维表缓存的命中率以及如何升高维表缓存淘汰的操作?
以原生的 forward 形式为例,forward 意味着每个 subtask 缓存着随机的维表数据,与 joinkey 的值无关。对维表的 joinkey 做哈希,就能保障上游每一个算子缓存着与 joinkey 相干的、不同的维表数据,从而无效地晋升缓存的命中率。
在实现层面咱们新增了一条叫 StreamExecLookupHashJoinRule 的优化规定,并且把它增加到物理 rewrite 的阶段。在最底层的扫描数据 StreamExecTableSourceScan 和维表 join StreamExecLookupJoin 之间减少了一个 StreamExecChange 节点,由它来实现对维表数据的哈希操作。能够通过在定义维表 DDL 时指定 lookup.hash.enable=true 来开启这个性能。
咱们对于 forward、rebalance、哈希三种形式开启缓存,进行了雷同场景的性能测试。主表一亿条数据去 join 维表的 1 万条数据,在不同的计算资源下,rebalance 相较于原生的 forward 形式有数倍的性能晋升,而哈希相较于 rebalance 的形式又有数倍的性能晋升,整体成果是比拟可观的。
针对维表 join 单条查问效率比拟低的问题,解决思路也很简略,就是攒批,依照微批的形式去拜访 (mini-batch)。能够在 DDL 的定义中通过设置 lookup.async.batch.size 的值来指定批次的大小。除此之外,咱们还在工夫维度上引入了 Linger 机制来做限度,避免极其场景呈现迟迟无奈攒够一批数据而导致时延比拟高的状况,能够通过在 DDL 的定义中设置 lookup.async.batch.linger 的值来指定等待时间。
通过测试,mini-batch 的形式可能带来 15% ~ 50% 的性能晋升。
Interval join 也是生产上一个应用比拟频繁的场景,这类业务的特点是流量十分大,比方 10 分钟百 GB 级别。Interval join 两条流的数据都会缓存在外部 state 中,任意一边的数据达到都会获取对面流相应工夫范畴的数据去执行 join function,所以这种大流量的工作会有十分大的状态。
对此咱们选用了 RocksDB 来做状态后端,然而进行了调参优化后成果仍不现实,工作运行一段时间之后会呈现背压,导致 RocksDB 的性能降落,CPU 的使用率也比拟高。
通过剖析咱们发现,根本原因与 Flink 底层扫描 RocksDB 是基于前缀的扫描形式无关。因而解决思路也很简略,依据查问条件,准确地构建查问的上下界,把前缀查问变为范畴查问。查问条件依赖的具体上下界的 key 变为了 keyGroup+joinKey+namespace+timestamp[lower,upper],能够准确地只查问某些 timestamp 之间的数据,工作的背压问题也失去了解决。而且数据量越大,这种优化带来的性能晋升越显著。
Regular join 应用状态来保留所有历史数据,所以如果流量大也会导致状态数据比拟大。而它保留状态是依赖 table.exec.state.ttl 参数,这个参数值比拟大也会导致状态大。
针对这种场景,咱们改为应用内部存储 JimDB 存储状态数据。目前只做了 inner join 的实现,实现机制如下:两边的流对 join 到的数据进行下发的同时,将所有数据以 mini-batch 的形式写入到 JimDB,join 时会同时扫描内存中以及 JimDB 中对应的数据。此外,能够通过 JimDB ttl 的机制来实现 table.exec.state.ttl 性能,从而实现对过期数据的清理。
上述实现形式优缺点都比拟显著,长处是能够反对十分大的状态,毛病是目前无奈被 Flink checkpoint 笼罩到。
对于 window 的优化,首先是窗口偏移量。需要最早来源于一个线上场景,比方咱们想统计某个指标 2021 年 12 月 4 日 0 点 ~ 2021 年 12 月 5 日 0 点的后果,但因为线上集群是东 8 区工夫,所以理论统计的后果是 2021 年 12 月 4 日早上 8 点 ~ 2021 年 12 月 5 日早上 8 点的后果,这显然不合乎预期。因而这个性能最早是为了修复非本地时区跨天级别的窗口统计谬误的问题。
在咱们减少了窗口偏移量参数后,能够非常灵活地设置窗口的起始工夫,可能反对的需要也更宽泛。
其次,还存在另外一个场景:尽管用户设定了窗口大小,然而他心愿更早看到窗口以后的计算结果,便于更早地去做决策。因而咱们新增了增量窗口的性能,它能够依据设置的增量距离,触发执行输入窗口的以后计算结果。
对于端到端实时性要求不高的利用,能够抉择 Iceberg 作为上游的对立存储。然而鉴于计算自身的个性、用户 checkpoint 距离的配置等起因,可能导致产生大量的小文件。Iceberg 的底层咱们选用 HDFS 作为存储,大量的小文件会对 Namenode 产生较大的压力,所以就有了合并小文件的需要。
Flink 社区自身提供了基于 Flink batch job 的合并小文件的工具能够解决这个问题,但这种形式有点重,所以咱们开发了算子级别的小文件合并的实现。思路是这样的,在原生的 global commit 之后,咱们新增了三个算子 compactCoordinator、compactOperator 和 compactCommitter,其中 compactCoordinator 负责获取待合并的 snapshot 并下发,compactOperator 负责 snapshot 的合并操作的执行,并且能够多个 compactOperator 并发执行,compactCommitter 负责合并后 datafiles 的提交。
咱们在 DDL 的定义中新增了两个参数,auto-compact 指定是否开启合并文件的性能,compact.delta.commits 指定每提交多少次 commit 来触发一次 compaction。
在理论的业务需要中,用户可能会从 Iceberg 中读取嵌套数据,尽管能够在 SQL 中指定读取嵌套字段外部的数据,然而在理论读取数据时是会将蕴含以后嵌套字段的所有字段都读取到,再去获取用户须要的字段,而这会间接导致 CPU 和网络带宽负载的增高,所以就产生了如下需要:如何只读取到用户真正须要的字段?
解决这个问题,要满足两个条件,第一个条件是读取 Iceberg 的数据结构 schema 只蕴含用户须要的字段,第二个条件是 Iceberg 反对按列名去读取数据,而这个自身曾经满足了,所以咱们只须要实现第一个条件即可。
如上图右侧所示,联合之前的 tableSchema 和 projectFields 信息重构,生成了一个只蕴含用户须要字段的新的数据结构 PruningTableSchema,并且作为 Iceberg schema 的输出,通过这样的操作实现了依据用户的理论应用状况对嵌套构造进行列裁剪。图中左下部的示例展现了用户优化前后读取嵌套字段的比照,能够看到基于 PruningTablesSchema 可能对无用的字段进行无效的裁剪。
通过上述优化,CPU 使用率升高了 20%~30%。而且,在雷同的数据量下,批工作的执行工夫缩短了 20%~30%。
此外,咱们还实现了一些其余优化,比方修复了 interval outer join 数据晚于 watermark 下发、且上游有工夫算子时会导致的数据失落问题,UDF 的复用问题,FlinkSQL 扩大 KeyBy 语法,维表数据预加载以及 Iceberg connector 从指定的 snapshot 去读取等性能。
三、落地案例
京东目前 FlinkSQL 线上工作 700+,占 Flink 总任务数的 15% 左右,FlinkSQL 工作累计峰值解决能力超过 1.1 亿条 / 秒。目前次要基于社区的 1.12 版本进行了一些定制优化。
3.1 案例一
实时通用数据层 RDDM 流批一体化的建设。RDDM 全称是 real-time detail data model – 实时明细数据模型,它波及订单、流量、商品、用户等,是京东实时数仓的重要一环,服务了十分多的外围业务,例如黄金眼 / 商智、JDV、广告算法、搜推算法等。
RDDM 层的实时业务模型与离线数据中 ADM 和 GDM 层的业务加工逻辑统一。基于此,咱们心愿通过 FlinkSQL 来实现业务模型的流批计算对立。同时这些业务也具备十分显明的特点,比方订单相干的业务模型都波及大状态的解决,流量相干的业务模型对于端到端的实时性要求比拟高。此外,某些非凡场景也须要一些定制化的开发来反对。
RDDM 的实现次要有两个外围诉求:首先它的计算须要关联的数据比拟多的,大量的维度数据都存储在 HBase 中;此外局部维度数据的查问存在二级索引,须要先查问索引表,从中取出符合条件的 key 再去维度表中获取真正的数据。
针对上述需要,咱们通过联合维表数据预加载的性能与维表 keyby 的性能来晋升 join 的效率。针对二级索引的查问需要,咱们定制了 connector 来实现。
维表数据预加载的性能指在初始化的阶段就将维表数据加载到内存中,这个性能联合 keyby 应用能够十分无效地缩小缓存的数量,进步命中率。
局部业务模型关联的历史数据比拟多,导致状态数据比拟大,目前咱们是依据场景进行定制的优化。咱们认为基本的解决方案是实现一套高效的基于 KV 的 statebackend,对于此性能的实现正在布局中。
3.2 案例二
流量交易黑产的舆情剖析。它的次要流程如下:源端通过爬虫获取相干信息并写入到 JMQ,数据同步到 JDQ 当前,通过 Flink 解决而后持续写上游的 JDQ。与此同时,通过 DTS 数据传输服务,将上游 JDQ 的数据同步到 HDFS,而后通过 Hive 表进行离线的数据加工。
此业务有两个特点:首先,端到端的实时性要求不高,能够承受分钟级别的延时;第二,离线和实时的加工逻辑统一。因而,能够间接把中间环节的存储从 JDQ 换成 Iceberg,而后通过 Flink 去增量读取,并通过 FlinkSQL 实现业务逻辑加工,即实现了流批两套链路的齐全对立。其中 Iceberg 表中的数据也能够供 OLAP 查问或离线做进一步的加工。
上述链路端到端的时延在一分钟左右,基于算子的小文件合并性能无效地晋升了性能,存储计算成本有了显著的升高,综合评估开发保护老本升高了 30% 以上。
四、将来布局
将来布局次要分为以下两个方面:
首先,业务拓展 方面。咱们会加大 FlinkSQL 工作的推广,摸索更多流批一体的业务场景,同时对产品状态进行打磨,减速用户向 SQL 的转型。同时,将平台元数据与离线元数据做更深度的交融,提供更好的元数据服务。
其次,平台能力 方面。咱们会持续深挖 join 场景和大状态场景,同时摸索高效 KV 类型的状态后端实现,并在对立计算和对立存储的框架下一直优化设计,以升高端到端时延。
点击查看直播回放 & 演讲 PDF
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…