乐趣区

关于flink:数仓实时化改造Hudi-on-Flink-在顺丰的实践应用

作者 | 蔡适择(顺丰大数据平台负责人)
整顿 | 赵阳(Flink 社区志愿者)

本文次要介绍顺丰在数据仓库的数据实时化、数据库 CDC、Hudi on Flink 上的实际利用及产品化教训。文章次要分为以下几局部:

● 顺丰业务介绍
● Hudi on Flink
● 产品化反对
● 后续打算

1、顺丰业务

1.1 顺丰大数据的利用

先来看一下顺丰大数据业务的全景图。

大数据平台,两头的根底局部是大数据平台,这块是顺丰联合开源组件自行搭建的。与之相干的是大数据分析与人工智能,顺丰有一个十分强的地面部队,就是线下的快递小哥以及运输车辆,须要应用 AI 以及大数据分析来辅助治理,晋升整体效率。

区块链,顺丰对接了很多客户与商家,对于商家来说,首先须要确保快件是可信的可能做货物的交易与替换。这块波及的基本上都是品牌商家,溯源与存证的业务顺丰也有波及。

IoT,就像之前提及到的,因为顺丰地面部队较多,相应须要采集的数据也会比拟多。咱们的局部包裹中是有传感器的,车辆也有相干的传感器,如车辆的摄像头,以及快递小哥的手环(蕴含地理位置、员工的衰弱状态,对应做一些关心的行动)。同时,还有一些工作场景既有叉车,也有分拣设施,这些就须要大数据平台来做一些联动,因而 IoT 的利用绝对较多。

智慧供应链和智慧物流,这两块更多的是指如何用大数据的伎俩辅助业务做一些经营上的决策。比方咱们有很多 B 端客户,对于他们来说如何在每个仓库里备货,如何协调以及相互调拨,这部分就由智慧物流来实现。

上面这块就是 IOT 实际中的一部分:

从下面能够看出物流自身的环节是十分多的,下单、小哥收件、分拣、陆运直达等整个过程,红色解释局部是指咱们会做的一些 IoT 与大数据联合的利用,这里其实大部分都是基于 Flink 来实现的。

1.2 顺丰大数据技术矩阵

上面这张图是顺丰目前大数据整体的架构概览:

1、数据集成层:最上面为数据集成层,因为顺丰的历史起因,所以蕴含了很多数据存储引擎,如 Oracle、MySQL、MongoDB 等,并且局部引擎仍会持续反对。右下物联网设施绝对较新,次要是进行蕴含一般文本、网络数据库、图像、音频、视频等的数据采集。

2、数据存储计算:实时这块顺丰目前用的最多的还是 Flink,Storm 没有标示进去,目前咱们在做迁徙。消息中间件解决目前次要应用 Kafka。而后左边存储构造的品种就绝对丰盛,因为不同的场景有不同的解决形式,比方数据分析须要性能比拟强的 Clickhouse;数仓和离线计算这块还是比拟传统,以 Hive 为主联合 Spark,目前咱们是联合 Flink 与 Hudi 去实现离线实时化。

3、数据产品,咱们偏向的还是首先降门槛,让外部开发与用户更容易上手。外部同学如果要把握如此多的组件,老本是十分高的,再加上规范化会导致沟通、保护以及运维的高额老本,所以咱们肯定要去做一些产品化、规范化的事件。

1.3 顺丰科技数据采集组成

上图就是咱们大数据整体数据采集的概览,数据采集以后包含微服务的利用,局部数据直发到 Kafka,还有些会落成日志,而后咱们本人做了一个日志采集工具,相似于 Flume,更加的轻量化,达到不丢、不重、以及近程的更新、限速。另外咱们也会将 Kafka 中的数据通过 Flink 放到 HDFS,以 Hudi 的模式去做。上面会具体介绍。

1.4 顺丰数据利用架构

上图是一个简略的利用架构,方才所说的大数据平台数据咱们会按需推送到 OLAP 剖析引擎、数据库,这部分数据推送过来之后,达到数据服务平台。该数据服务平台次要是思考到用户或研发对接数据库更便捷,以往在应用时,外部用户首先须要理解大数据组件的应用,而当初通过咱们的数据服务产品以配置化的形式配置查问条件、聚合条件即可,最终把后果生成一个 restful 接口,业务零碎可间接调用。比方研发用户须要做搜寻,只须要关注入参、出参,两头的过程不须要理解,这样的话就可能最大化的把技术门槛降下来,应用时也会更高效简便。

两头局部咱们是基于 Kong 做的网关,在 Kong 外面能够加很多种通用的能力,包含监控、限流、缓存等都能够在外面实现。

左边的 Graphql,是 Facebook 开源的一个组件。前端用户常常会呈现需要的变更,后盾接口须要相应地进行调整,这种状况就能够应用 Graphql 来反对。这里其实是有两个货色:apollo、graphql_Java,两条线,apollo 实用于前端的研发用户,用 node_js 来实现管制层的内容;graphql_Java 实用于后端的用户,次要提供一些接口。

2、Hudi on Flink

2.1 Hudi 介绍

接下来咱们次要介绍 Hudi on Flink 在顺丰的利用实际。Hudi 的外围劣势次要分为两局部:

● 首先,Hudi 提供了一个在 Hadoop 中更新删除的解决方案,所以它的外围在于可能增量更新,同时增量删除。增量更新的益处是国内与国内当初对隐衷数据的爱护要求比拟高,比方在 Hive 中清理删除某一个用户的数据是比拟艰难的,相当于从新荡涤一遍数据。应用 Hudi 能够依据主键疾速抓取,并将其删除掉。

● 另外,工夫漫游。之前咱们有很多利用须要做准实时计算。如果要找出半个小时内的增量到底是什么,变动点在哪,必须要把一天的数据全捞进去,过滤一遍能力找进去。Hudi 提供工夫漫游能力,只须要相似 SQL 的语法就能疾速地把全副增量捞进去,而后后盾利用应用时,就可能间接依据外面的数据做业务的更新,这是 Hudi 工夫漫游里最重要的能力。

Hudi 有两种的写的办法:

● copy on write。
◎ copy on write 这种模式更多是在每次写的时候,可能重写历史中对于更新记录所在的文件,把它重写并且把增量局部再从新记录下来,相当于把历史状态也给记录下来。惟一的不足之处在于,写的时候性能会稍强劲,然而读的性能是很强的,和失常应用 Hive 没有什么区别。这个也是 Hudi 自身的长处。实时性略低,这部分取决于写的文件合并的频率。不过批量的话,写也不会影响到多少性能,所以自身也是批量的去写。比方每隔几分钟写一次,这个其实也不会产生很高的性能损耗,这就是 copy on write。

● merge on read
◎ merge on read 就是写的时候实时会把 log 以 append 形式写到 HDFS 中并写成文件,而后在读的时候将曾经生成的文本,再加上增量的局部合并,做一个 merge 操作。益处在于查问的时候数据都是实时的,然而因为查问工作的确较多,相当于是说每次查的时候,都要把两局部数据取出来并做一个合并,因而也会造成损耗。

以上是 Hudi 状况的简略介绍。

2.2 Hudi on Flink 组成部分 – 数据库实时化

上图是咱们将数据实时化 CDC 的过程。数据库的 CDC,基本上都是只能到库级别、库粒度。后面的 source 撑持必定也还是库粒度,两头会通过两个过程:

● 一部分是 DML,它会有过滤,当库外面有 100 张表时,很多时候有些表是不须要的,这部分咱们会间接过滤掉,过滤就次要是通过产品化来买通它。

● 另一部分是 DDl,可能实时更新 schema。比方库表字段的减少或者变更,再或者可能加了个表或者改了一个表,这部分会在实时程序中买通数据直通车,只有有任何变更,就会生成一个新的版本,而后将元数据信息记录到直通车里,同时也会包装到 binlog kafka sink 里记录,每一行会打上相应的版本号。这样的话就对于前面的应用就可能间接对应该条记录,应用十分不便,不会有出错的状况。

2.3 Hudi on Flink 组成部分 – 数仓实时化

这部分次要分享咱们数仓实时化的过程,咱们的指标是实现 Kafka 里的数据在以后离线数仓中也能真正用起来,包含很多做准实时计算的用户也可能真正用起来。Hudi on Flink 就是咱们尝试的计划。以前 Hudi 这块也做了 Hudi on Spark 计划,是官网举荐应用的计划,其实相当于多保护一个组件,然而咱们大方向上还是心愿所有实时的货色都可能让 Flink 去实现,另外也心愿是 Flink 的利用生态可能做得更加全面,在这部分就真正去把它落地下来,并且在生产中利用起来。

其实整个过程,比方做表数据实时化的时候,它是分为两部份,一部分数据初始化,在启动的时候,会把数据从新做批量的拉取,这个是用 Flink batch 来做的,其实社区自身也有提供这种能力。另外 Hudi 自身也具备把存量的 Hive 表 Hudi 化的能力,这是 Hudi 最新才进去的性能。这部分咱们会用 Flink batch 的形式从新抽一遍,当然也有存量,对于存量的一些表,能够间接用存量表来转化,而后用 Flink batch 做初始化。

另外一部分是增量更新,增量更新是指有个 DB connect 对接 Kafka,从 Kafka 的 source 拿到数据库增量 CDC 的 binlog,而后把 binlog 进行加工,同时再利用 Flink 自身的 checkpoint 机制(Flink 自身的 checkpoint 整体频率能够管制)进行 snapshot 的过程。其中所做的内容也咱们本人能够管制的,所以采纳 checkpoint 的模式能够把 Hudi 所须要做的 upsert 的操作全副在 checkpoint 中更新到线上,最终造成 Hudi 外面的实时数据。

2.4 Hudi 数仓宽表计划

间接将 Kafka 数据扔到 Hudi 里绝对容易,真正艰难的点在于宽表。对于整个 Hudi 来说,宽表是波及到很多维表,当很多维表或者事实表更新的时候,会由多个事实表做一个关联。但不是每个事实表都能抓到宽表的真正主键,因而 Hudi 没法做这种更新。所以如何把宽表做数据实时化是一个难题。

上图是顺丰的宽表计划。

● 第一层,对于 ODS,能够间接连贯 Kafka,用 Hudi on Flink 的框架就可能实现。

● 第二层,DWD,这里也有两种方法:
一种是用 Flink SQL 先把实时的 Kafka 宽表做完,不过这种方法老本会高一点,相当于再次引入了 Kafka,整个数据链路变长,如果真正须要去用实时宽表能够小局部去推,但如果不存在纯实时数据的需要,就没有必要去做 DWD 的实时 Kafka 宽表。
另外,在没有 DWD 的实时 Kafka 宽表的状况下,如何实现上述离线层的 DWD 实时化?这里有几个步骤,首先创立一个维表的 UDF 做表关联,也是最不便的形式。其次,能够思考间接用 join 的形式,用两个实时表来做关联,但可能存在关联不到的状况。

当然,做维表关联,就波及到外键主键的映射。外键主键映射是为了让咱们可能在另一个事实表更新时,疾速找到主键在哪,即外键主键的映射。另外主键索引,主键索引其实也是跟外键主键的映射相干。至于外键主键的映射,相当于把它建成一个新的表主键索引获取,这样增量更新 Hudi 跟原来的 ODS 层就基本上统一了,这就是宽表实时加工的过程。下图为运单的宽表举例。

3、产品化反对

上述从技术层面剖析了顺丰当下业务架构的相干状况,以下将分享咱们在产品化上所做的一些反对工作。

3.1 数据直通车

上图是咱们的数据直通车,可能做到让用户本人在产品中操作,不须要写代码即可实现,能够实现低门槛的疾速简便的利用。比方配置数据接入仅需 1 分钟左右,整个过程就是在产品上以配置化的伎俩就可能将数据最终落在数据库,咱们的离线表、数仓、做数据分析都可能间接疾速的使用起来。

另外,数据接入进来之后,须要有数据管理的能力。上图是数据管理能力测试环境的简略状况,咱们须要让用户可能治理相干的数据,首先谁用它了,其次它波及什么字段,有哪些具体的内容,同时它外面的血缘关系又是怎么样的,这个就是咱们数据资产管理所具备的性能。

3.2 实时数据应用

上图是咱们 binlog 的 SDK,其实像 binlog 这种 avro 的格局,对用户来说应用有肯定门槛。但还是有一些编码的用户,对于这些用户咱们提供具体的 SDK,所以在 SDK 里真正应用时都做到简便。右边看起来是 json,实际上是 avro 格局。左边的内容就是在 Java 上的应用状况,这个是在代码层面辅助研发疾速利用的工具。

咱们在平台上也做一些简化的内容,首先有一部分是对于拖拽的,拖拽是指封装一些组件,用户能够通过拖拽来疾速实现其需要。这个产品上线后,很多之前没有任何实时计算的教训,甚至连离线开发的教训也没有的用户都可能做实时的数据开发。

上图为实时指标采集,产品上线之后有很多监控的需要,Flink 自身提供很多 Metric,用户也有很多 Metric,咱们心愿为用户提供一个高效的解决方案,把 Metric 全副采集进去,让用户可能疾速利用。

这里在监控外面也做了几个工作,一个是爬虫计划,实现一个 akka 的客户端,Flink 自身是 akka 的框架,每个 jobmannager 都有 akka 的服务、接口,这样只有实现一个 akka 的客户端,就可能以 akka 的 API 模式获取具体的 Metric 状况。这部分采集完之后发到 Kafka,最终存到 TDengine 再到 Grafana,提供给用户。Grafana 也会整合到咱们的实时计算平台产品外面来,在面对存量的状况时,不须要重启用户的工作,就可能间接做数据采集。

但在面对增量状况时,就须要补充一些 Metric,比方 CPU 使用率、内存的使用率等。这部分咱们以 Reporter 计划来满足,Reporter 计划也是社区以后主推的计划。Reporte r 计划的原理其实是在 Flink 的 Metrics Reporter 里进行插件开发,而后发到 gateway,这个 gateway 其实就是为了防止 Kafka 客户端过多的问题,所以这里两头做一个网关,前面还是和下面的统一,这个就是 Flink 的工作监控状况。

4、后续打算

上述曾经分享了咱们在外部曾经落地、理论利用的过程,后续咱们还会做什么?

4.1 弹性计算

首先,弹性计算。目前像监控工作,用户申请的资源远远超过理论须要应用的资源,会造成重大的资源节约,内存也一样。解决相似状况时,咱们应用了 Flink 延长的框架 Metrics monitor,联合采集的 Metrics,可能做到当整个使用率过低或过高的时候,及时调整达到资源扩缩容或者并发扩容。

4.2 Flink 替换 Hive 演进

下面提到咱们存量是有十分多的 Hive 工作,包含 Spark 工作须要进行替换,但怎么去做呢?

首先咱们用 Flink 来替换,因为强制或平台主动举荐都有难度,所以咱们做了一些折中计划。比方埋点,当须要把数据写到 Hive 的某个表,它会通过 Hiveserver,SQL 解析之后,此时将表进行替换,执行两个路线:一个是失常的 table 这样执行会写到 Hive 外面去。另外也会埋点把写的表替换成另一个表,而后同时再以 Flink 的模式去执行一遍,不过会产生额定的资源耗费,执行大略生成两个表,须要主动计算两者是否统一。如统一测试稳固后就能以计算框架来去替换它。

大部分工作是兼容的可替换的,但也有小局部不兼容的状况,这部分能够采取人工解决,以尽量实现整个技术上的对立,这部分是后续须要实现的。

4.3 批流一体化

上图是咱们做批流一体化的过程,批流一体化在元数据管理与权限治理局部都曾经有一些落地。

除此之外咱们联合刚刚所说替换的过程,上图就是 SQL 的兼容测试。因为这几者都做完之后,其实批流一体化能够同步去做,相当于同一个接口,加一个参数,即可实现流批处理底层引擎的疾速切换,有助于整个数据开发可能保持一致,所以批流一体化也是前面须要尝试的。

上图实际上是咱们一体化整个框架的最终模式。首先下面有一层 IDE 可能让所有的用户应用。而后上面各种根底性能反对,包含主动补全的 SQL 语法解析性能的反对,再往下就是一些资源管理、调度治理和常识治理,这些也是为了辅助开发而用的。再上面一层是计算引擎,要把这些计算引擎跟用户做一个大的隔离,让用户不必再关注底层技术的实现和应用,这是咱们前面的要继续去做的事件。

退出移动版