关于flink:基于-Flink-的典型-ETL-场景实现方案

12次阅读

共计 12271 个字符,预计需要花费 31 分钟才能阅读完成。

作者:买蓉 · 美团点评高级技术专家
整顿:赵阳(Flink 社区志愿者)
校对:苗浩冲(Flink 社区志愿者)

本文将从数仓诞生的背景、数仓架构、离线与实时数仓的比照着手,综述数仓倒退演进,而后分享基于 Flink 实现典型 ETL 场景的几个计划。

1. 实时数仓的相干概述

1.1 实时数仓产生背景

咱们先来回顾一下数据仓库的概念。

数据仓库的概念是于 90 年代由 Bill Inmon 提出, 过后的背景是传统的 OLTP 数据库无奈很好的反对长周期剖析决策场景,所以数据仓库概念的 4 个外围点,咱们要联合着 OLTP 数据库过后的状态来比照了解。

  1. 面向主题的:数据仓库的数据组织形式与 OLTP 面向事务处理不同。因为数据仓库是面向剖析决策的,所以数据常常按剖析场景或者是剖析对象等主题模式来组织。
  2. 集成的:对于数据仓库来说,常常须要去汇合多个扩散的、异构的数据源,做一些数据荡涤等 ETL 解决,整合成一块数据仓库,OLTP 则不须要做相似的集成操作。
  3. 绝对稳固的:OLTP 数据库个别都是面向业务的,它次要的作用是把以后的业务状态精准的反映进去,所以 OLTP 数据库须要反对大量的增、删、改的操作。然而对于数据仓库来说,只有是入仓存下来的数据,个别应用场景都是查问,因而数据是绝对稳固的。
  4. 反映历史变动:数据仓库是反映历史变动的数据汇合,能够了解成它会将历史的一些数据的快照存下来。而对于 OLTP 数据库来说,只有反映过后的最新的状态就能够了。

以上这 4 个点是数据仓库的一个外围的定义。咱们也能够看出对于实时数据仓库来说,传统数据仓库也就是离线数据仓库中的一些定义会被弱化,比如说在反映历史变动这一点。介绍完数据仓库的基本概念,简略说下数据仓库建模这块会用到一些经典的建模办法,次要有范式建模、维度建模和 Data Vault。在互联网大数据场景下,用的最多的是维度建模办法。

而后先看一下离线数仓的经典架构。如下图:

这个数仓架构次要是偏差互联网大数据的场景计划,由上图能够看出有三个外围环节。

  1. 第一个环节是数据源局部,个别互联网公司的数据源次要有两类:
  • 第 1 类是通过在客户端埋点上报,收集用户的行为日志,以及一些后端日志的日志类型数据源。对于埋点行为日志来说,个别会通过一个这样的流程,首先数据会上报到 Nginx 而后通过 Flume 收集,而后存储到 Kafka 这样的音讯队列,而后再由实时或者离线的一些拉取的工作,拉取到咱们的离线数据仓库 HDFS。
  • 第 2 类数据源是业务数据库,而对于业务数据库的话,个别会通过 Canal 收集它的 binlog,而后也是收集到音讯队列中,最终再由 Camus 拉取到 HDFS。

这两局部数据源最终都会落地到 HDFS 中的 ODS 层,也叫贴源数据层,这层数据和原始数据源是保持一致的。

  1. 第二个环节是离线数据仓库,是图中蓝色的框展现的局部。能够看到它是一个分层的构造,其中的模型设计是根据维度建模思路。
  • 最底层是 ODS 层,这一层将数据放弃无信息损失的寄存在 HDFS,根本放弃原始的日志数据不变。
  • 在 ODS 层之上,个别会进行对立的数据荡涤、归一,就失去了 DWD 明细数据层。这一层也蕴含对立的维度数据。
  • 而后基于 DWD 明细数据层,咱们会依照一些剖析场景、剖析实体等去组织咱们的数据,组织成一些分主题的汇总数据层 DWS。
  • 在 DWS 之上,咱们会面向利用场景去做一些更贴近利用的 APP 利用数据层,这些数据应该是高度汇总的,并且可能间接导入到咱们的应用服务去应用。

在两头的离线数据仓库的生产环节,个别都是采纳一些离线生产的架构引擎,比如说 MapReduce、Hive、Spark 等等,数据个别是存在 HDFS 上。

  1. 通过前两个环节后,咱们的一些应用层的数据会存储到数据服务里,比如说 HBase、Redis、Kylin 这样的一些 KV 的存储。并且会针对存在这些数据存储上的一些数据,封装对应的服务接口,对外提供服务。在最外层咱们会去产出一些面向业务的报表、面向剖析的数据产品,以及会反对线上的一些业务产品等等。这一层的话,称之为更贴近业务端的数据利用局部。

以上是一个根本的离线数仓经典架构的介绍。

大家都理解到当初随着挪动设施的遍及,咱们逐步的由制造业时代过渡到了互联网时代。在制造业的时代,传统的数仓,次要是为了去反对以前的一些传统行业的企业的业务决策者、管理者,去做一些业务决策。那个时代的业务决策周期是比拟长的,同时过后的数据量较小,Oracle、DB2 这一类数据库就曾经足够存了。

但随着分布式计算技术的倒退、智能化技术倒退、以及整体算力的晋升、互联网的倒退等等因素,咱们当初在互联网上收集的数据量,曾经呈指数级的增长。并且业务不再只依赖人做决策,做决策的主体很大部分已转变为计算机算法,比方一些智能举荐场景等等。所以这个时候决策的周期,就由原来的天级要求晋升到秒级,决策工夫是十分短的。在场景上的话,也会面对更多的须要实时数据处理的场景,例如实时的个性化举荐、广告的场景、甚至一些传统企业曾经开始实时监控加工的产品是否有品质问题,以及金融行业重度依赖的反作弊等等。因而在这样的一个背景下,实时数仓就必须被提出来了。

1.2 实时数仓架构

首先跟大家介绍一下实时数仓经典架构 – Lambda 架构:

这个架构是 Storm 的作者提出来的,其实 Lambda 架构的次要思路是在原来离线数仓架构的根底上叠加上实时数仓的局部,而后将离线的存量数据与咱们 t+0 的实时的数据做一个 merge,就能够产生数据状态实时更新的后果。

  • 和上述 1.1 离线数据仓库架构图比拟能够显著的看到,实时数仓减少的局部是上图黄色的这块区域。咱们个别会把实时数仓数据放在 Kafka 这样的音讯队列上,也会有维度建模的一些分层,然而在汇总数据的局部,咱们不会将 APP 层的一些数据放在实时数仓,而是更多的会移到数据服务侧去做一些计算。
  • 而后在实时计算的局部,咱们常常会应用 Flink、Spark-streaming 和 Storm 这样的计算引擎,时效性上,由原来的天级、小时级能够晋升到秒级、分钟级。

大家也能够看到这个架构图中,两头数据仓库环节有两个局部,一个是离线的数据仓库,一个是实时的数据仓库。咱们必须要运维两套(实时计算和离线计算)引擎,并且在代码层面,咱们也须要去实现实时和离线的业务代码。而后在合并的时候,咱们须要保障施行和离线的数据一致性,所以凡是咱们的代码做变更,咱们也须要去做大量的这种实时离线数据的比照和校验。其实这对于不论是资源还是运维老本来说都是比拟高的。这是 Lamda 架构上比拟显著和突出的一个问题。因而就产生了 Kappa 构造。

Kappa 架构的一个次要的思路就是在数仓局部移除了离线数仓,数仓的生产全副采纳实时数仓。从上图能够看到方才两头的局部,离线数仓模块曾经没有了。

对于 Kappa 架构,相熟实时数仓生产的同学,可能会有一个疑难。因为咱们常常会面临业务变更,所以很多业务逻辑是须要去迭代的。之前产出的一些数据,如果口径变更了,就须要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?

Kappa 架构在这一块的思路是:首先要筹备好一个可能存储历史数据的音讯队列,比方 Kafka,并且这个音讯对列是能够反对你从某个历史的节点从新开始生产的。接着须要新起一个工作,从原来比拟早的一个工夫节点去生产 Kafka 上的数据,而后当这个新的工作运行的进度曾经可能和当初的正在跑的工作齐平的时候,你就能够把当初工作的上游切换到新的工作下面,旧的工作就能够停掉,并且原来产出的后果表也能够被删掉。

随着咱们当初实时 OLAP 技术的一些晋升,有一个新的实时架构被提了进去,这里暂且称为实时 OLAP 变体。

这个思路是把大量的聚合、剖析、计算由实时 OLAP 引擎来承当。在实时数仓计算的局部,咱们不须要做的特地重,尤其是聚合相干的一些逻辑,而后这样就能够保障咱们在数据应用层能灵便的面对各种业务剖析的需要变更,整个架构更加灵便。

最初咱们来整体比照一下,实时数仓的这几种架构:

这是整体三个对于实时数仓架构的一个比照:

  • 从计算引擎角度:Lamda 架构它须要去保护批流两套计算引擎,Kappa 架构和实时 OLAP 变体只须要保护流计算引擎就好了。
  • 开发成本:对 Lamda 架构来说,因为它须要保护实时离线两套代码,所以它的开发成本会高一些。Kappa 架构和实时 OLAP 变体只用保护一套代码就能够了。
  • 剖析灵活性:实时 OLAP 变体是绝对最灵便的。
  • 在实时 OLAP 引擎依赖上:实时 OLAP 变体是强依赖实时 OLAP 变体引擎的能力的,前两者则不强依赖。
  • 计算资源:Lamda 架构须要批流两套计算资源,Kappa 架构只须要流计算资源,实时 OLAP 变体须要额定的 OLAP 资源。
  • 逻辑变更重算:Lamda 架构是通过批处理来重算的,Kappa 架构须要依照后面介绍的形式去从新生产音讯队列重算,实时 OLAP 变体也须要从新生产音讯队列,并且这个数据还要从新导入到 OLAP 引擎里,去做计算。

1.3 传统数仓 vs 实时数仓

而后咱们来看一下传统数仓和实时数仓整体的差别。

  1. 首先从 时效性 来看:离线数仓是反对小时级和天级的,实时数仓到秒级分钟级,所以实时数仓时效性是十分高的。
  2. 数据存储形式 来看:离线数仓它须要存在 HDFS 和 RDS 下面,实时数仓个别是存在音讯队列,还有一些 kv 存储,像维度数据的话会更多的存在 kv 存储上。
  3. 生产加工过程 方面,离线数仓须要依赖离线计算引擎以及离线的调度。但对于实时数仓来说,次要是依赖实时计算引擎。

2. 基于 Flink 实现典型的 ETL 场景

这里咱们次要介绍两大实时 ETL 场景:维表 join 和双流 join。

  • 维表 join

    • 预加载维表
    • 热存储关联
    • 播送维表
    • Temporal table function join
  • 双流 join

    • 离线 join vs. 实时 join
    • Regular join
    • Interval join
    • Window join

2.1 维表 join

2.1.1 预加载维表

计划 1:

将维表全量预加载到内存里去做关联,具体的实现形式就是咱们定义一个类,去实现 RichFlatMapFunction,而后在 open 函数中读取维度数据库,再将数据全量的加载到内存,而后在 probe 流上应用算子,运行时与内存维度数据做关联。

这个计划的长处就是实现起来比较简单,毛病也比拟显著,因为咱们要把每个维度数据都加载到内存外面,所以它只反对大量的维度数据。同时如果咱们要去更新维表的话,还须要重启作业,所以它在维度数据的更新方面代价是有点高的,而且会造成一段时间的提早。对于预加载维表来说,它实用的场景就是小维表,变更频率诉求不是很高,且对于变更的及时性的要求也比拟低的这种场景。

接下来咱们看一个简略的代码的示例:

在这段代码截取的是要害的一个片段。这里定义了一个 DimFlatMapFunction 来实现 RichFlatMapFunction。其中有一个 Map 类型的 dim,其实就是为了之后在读取 DB 的维度数据当前,能够用于寄存咱们的维度数据,而后在 open 函数外面咱们须要去连贯咱们的 DB,进而获取 DB 里的数据。而后在上面代码能够看到咱们的场景是从一个商品表外面去取出商品的 ID、商品的名字。而后咱们在获取到 DB 外面的维度数据当前会把它寄存到 dim 外面。

接下来在 flatMap 函数外面咱们就会应用到 dim,咱们在获取了 probe 流的数据当前,咱们会去 dim 外面比拟。是否含有同样的商品 ID 的数据,如果有的话就把相干的商品名称 append 到数据元组,而后做一个输入。这就是一个根本的流程。

其实这是一个根本最后版的计划实现。但这个计划也有一个改良的形式,就是在 open 函数外面,能够新建一个线程,定时的去加载维表。这样就不须要人工的去重启 job 来让维度数据做更新,能够实现一个周期性的维度数据的更新。

计划 2:

通过 Distributed cash 的机制去散发本地的维度文件到 task manager 后再加载到内存做关联。实现形式能够分为三步:

  • 第 1 步是通过 env.registerCached 注册文件。
  • 第 2 步是实现 RichFunction,在 open 函数外面通过 RuntimeContext 来获取 cache 文件。
  • 第 3 步是解析和应用这部分文件数据。

这种形式的一个长处是你不须要去筹备或者依赖内部数据库,毛病就是因为数据也是要加载到内存中,所以反对的维表数据量也是比拟小。而且如果这个维度数据须要做更新,也须要重启作业。因而在正规的生产过程中不太倡议应用这个计划,因为其实从数仓角度,心愿所有的数据都可能通过 schema 化形式来治理。把数据放在文件外面去做这样一个操作,不利于咱们做整体数据的治理和规范化。所以这个形式的话,大家在做一些小的 demo 的时候,或者一些测试的时候能够去应用。

那么它实用的场景就是维度数据是文件模式的、数据量比拟小、并且更新的频率也比拟低的一些场景,比如说咱们读一个动态的码表、配置文件等等。

2.1.2 热存储关联

维表 join 里第二类大的实现思路是热存储关联。具体是咱们把维度数据导入到像 Redis、Tair、HBase 这样的一些热存储中,而后通过异步 IO 去查问,并且叠加应用 Cache 机制,还能够加一些淘汰的机制,最初将维度数据缓存在内存里,来加重整体对热存储的拜访压力。

如上图展现的这样的一个流程。在 Cache 这块的话,比拟举荐谷歌的 Guava Cache,它封装了一些对于 Cache 的一些异步的交互,还有 Cache 淘汰的一些机制,用起来是比拟不便的。

方才的试验计划外面有两个重要点,一个就是咱们须要用异步 IO 形式去拜访存储,这里也跟大家一起再回顾一下同步 IO 与异步 IO 的区别:

  • 对于同步 IO 来说,收回一个申请当前,必须期待申请返回当前能力持续去发新的 request。所以整体吞吐是比拟小的。因为实时数据处理对于提早特地关注,这种同步 IO 的形式,在很多场景是不太可能承受的。
  • 异步 IO 就是能够并行收回多个申请,整个吞吐是比拟高的,提早会绝对低很多。如果应用异步 IO 的话,它对于内部存储的吞吐量回升当前,会使得内部存储有比拟大的压力,有时也会成为咱们整个数据处理上提早的瓶颈。所以引入 Cache 机制是心愿通过 Cache 来去缩小咱们对外部存储的访问量。

方才提到的 Cuava Cache,它的应用是非常简单的,下图是一个定义 Cache 样例:

能够看到它的应用接口非常简单,大家能够去尝试应用。对于热存储关联计划来说,它的长处就是维度数据因为不必全量加载在内存里,所以就不受限于内存大小,维度数据量能够更多。在美团点评的流量场景外面,咱们的维度数据能够反对到 10 亿量级。另一方面该计划的毛病也是比拟显著的,咱们须要依赖热存储资源,而且维度的更新反馈到后果是有肯定提早的。因为咱们首先须要把数据导入到热存储,而后同时在 Cache 过期的工夫上也会有损失。

总体来说这个办法实用的场景是维度数据量比拟大,又可能承受维度更新有肯定提早的状况。

2.1.3 播送维表

第三个大的思路是播送维表,次要是利用 broadcast State 将维度数据流播送到上游 task 做 join。

实现形式:

  • 将维度数据发送到 Kafka 作为播送原始流 S1
  • 定义状态描述符 MapStateDescriptor。调用 S1.broadcast(),取得 broadCastStream S2
  • 调用非播送流 S3.connect(S2), 失去 BroadcastConnectedStream S4
  • 在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 实现关联解决逻辑,并作为参数调用 S4.process()

这个计划,它的长处是维度的变更能够及时的更新到后果。而后毛病就是数据还是须要保留在内存中,因为它是存在 state 里的,所以反对维表数据量依然不是很大。实用的场景就是咱们须要时时的去感知维度的变更,且维度数据又能够转化为实时流。

上面是一个小的 demo:

咱们这外面用到的播送流 pageStream,它其实是定义了一个页面 ID 和页面的名称。对于非播送流 probeStream,它是一个 json 格局的 string,外面蕴含了设施 ID、页面的 ID、还有工夫戳,咱们能够了解成用户在设施上做 PV 拜访的行为记录。

整个实现来看,就是遵循上述 4 个步骤:

  • 第 1 步骤是要定义播送的状态描述符。
  • 第 2 步骤咱们这里去生成 broadCastStream。
  • 第 3 步骤的话咱们就须要去把两个 stream 做 connect。
  • 第 4 步最次要的一个环节就是须要实现 BroadcastProcessFunction。第 1 个参数是咱们的 probeStream,第 2 个参数是播送流的数据,第 3 个参数就是咱们的要输入的数据,能够看到次要的数据处理逻辑是在 processElement 外面。

在数据处理过程中,咱们首先通过 context 来获取咱们的 broadcastStateDesc,而后解析 probe 流的数据,最终获取到对应的一个 pageid。接着就在咱们方才拿到了 state 外面去查问是否有同样的 pageid,如果可能找到对应的 pageid 话,就把对应的 pagename 增加到咱们整个 json stream 去做输入。

2.1.4 Temporal table function join

介绍完了下面的办法当前,还有一种比拟重要的办法是用 Temporal table function join。首先阐明一下什么是 Temporal table?它其实是一个概念:就是可能返回继续变动表的某一时刻数据内容的视图,继续变动表也就是 changingtable,能够是一个实时的 changelog 的数据,也能够是放在内部存储上的一个物化的维表。

它的实现是通过 UDTF 去做 probe 流和 Temporal table 的 join,称之 Temporal table function join。这种 join 的形式,它实用的场景是维度数据为 changelog 流的模式,而且咱们有须要按工夫版本去关联的诉求。

首先来看一个例子,这里应用的是官网对于汇率和货币交易的一个例子。对于咱们的维度数据来说,也就是刚刚提到的 changelog stream,它是 RateHistory。它反映的是不同的货币绝对于日元来说,不同时刻的汇率。

第 1 个字段是工夫,第 2 个字段是 currency 货币。第 3 个字段是绝对日元的汇率,而后在咱们的 probe table 来看的话,它定义的是购买不同货币的订单的状况。比如说在 10:15 购买了两欧元,该表记录的是货币交易的一个状况。在这个例子外面,咱们要求的是购买货币的总的日元交易额,如何通 Temporal table function join 来去实现咱们这个指标呢?

  • 第 1 步首先咱们要在 changelog 流下面去定义 TemporalTableFunction,这外面有两个要害的参数是必要的。第 1 个参数就是可能帮咱们去辨认版本信息的一个 time attribute,第 2 个参数是须要去做关联的组件,这里的话咱们抉择的是 currency。
  • 接着的话咱们在 tableEnv 外面去注册 TemporalTableFunction 的名字。

而后咱们来看一下咱们注册的 TemporalTableFunction,它可能起到什么样的成果。

比如说如果咱们应用 rates 函数,去获取 11:50 的状态。能够看到对于美元来说,它在 11:50 的状态其实落在 11:49~11:56 这个区间的,所以选取的是 99。而后对于欧元来说,11:50 的时刻是落在 11:15 和 12:10 之间的,所以咱们会选取 119 这样的一条数据。它其实实现的是咱们在一刚开始定义的 TemporalTable 的概念,可能获取到 changelog 某一时刻无效数据。定义好 TemporalTableFunction 当前,咱们就要须要应用这个 Function,具体实现业务逻辑。

大家留神这里须要去指定咱们具体须要用到的 join key。比如说因为两个流都是在始终继续更新的,对于咱们的 order table 外面 11:00 的这一条记录来说,关联到的就是欧元在 10:45 这一条状态,而后它是 116,所以最初的后果就是 232。

方才介绍的就是 Temporal table function join 的用法。

2.1.5 维表 join 的比照

而后来整体回顾一下在维表 join 这块,各个维度 join 的一些差别,便于咱们更好的去了解各个办法实用的场景。

  • 在实现复杂度下面的:除了热存储关联略微简单一些,其它的实现形式基本上复杂度是比拟低的。
  • 在维表数据量上:热存储关联和 Temporal table function join 两种形式能够反对比拟多的数据量。其它的形式因为都要把维表加载到内存,所以就受限内存的大小。
  • 在维表更新频率下面:因为预加载 DB 数据到内存和 Distributed Cache 在从新更新维表数据的时候都须要重启,所以它们不适宜维表须要常常变更的场景。而对于播送维表和 Temporal table function join 来说,能够实时的更新维表数据并反映到后果,所以它们能够反对维表频繁更新的场景。
  • 对维表更新实时性来说:在播送维表和 Temporal table function join,它们能够达到比拟快的实时更新的成果。热存储关联在大部分场景也是能够满足业务需要的。
  • 在维表模式下面:能够看到第 1 种形式次要是反对拜访 DB 存储大量数据的模式,Distributed Cache 反对文件的模式,热存储关联须要拜访 HBase 和 Tair 等等这种热存储。播送维表和 Temporal table function join 都须要维度数据能转化成实时流的模式。
  • 在内部存储下面:第 1 种形式和热存储关联都是须要依赖内部存储的。

在维表 join 这一块,咱们就先介绍这几个根本办法。可能有的同学还有一些其余计划,之后能够反馈交换,这里次要提了一些比拟罕用的计划,但并不限于这些计划。

2.2 双流 join

首先咱们来回顾一下,批处理是怎么去解决两个表 join 的?个别批处理引擎实现的时候,会采纳两个思路。

一个是基于排序的 Sort-Merge join。另外一个是转化为 Hash table 加载到内存里做 Hash join。这两个思路对于双流 join 的场景是否还同样实用?在双流 join 场景外面要解决的对象不再是这种批数据、无限的数据,而是是无穷数据集,对于无穷数据集来说,咱们没有方法排序当前再做解决,同样也没有方法把无穷数据集全副转成 Cache 加载到内存去做解决。所以这两种形式根本是不可能实用的。同时在双流 join 场景外面,咱们的 join 对象是两个流,数据也是一直在进入的,所以咱们 join 的后果也是须要继续更新的。

那么咱们应该有什么样的计划去解决双流 join 的实现问题?Flink 的一个根本的思路是将两个流的数据持续性的存到 state 中,而后应用。因为须要一直的去更新 join 的后果,之前的数据实践上如果没有任何附加条件的话是不能抛弃的。然而从实现上来说 state 又不能永恒的保留所有的数据,所以须要通过一些形式将 join 的这种全局范畴部分化,就是说把一个有限的数据流,尽可能给它拆分切分成一段一段的有线数据集去做 join。

其实根本就是这样一个大的思路,接下来去看一下具体的实现形式。

2.2.1 离线 join vs. 实时 join

接下来咱们以 inner join 为例看一下,一个简略的实现的思路:

左流是彩色标出来的这一条,右流是蓝色标出来的,这条两流须要做 inner join。首先左流和右流在元素进入当前,须要把相干的元素存储到对应的 state 下面。除了存储到 state 下面以外,左流的数据元素到来当前须要去和左边的 Right State 去做比拟看能不能匹配到。同样左边的流元素到了当前,也须要和右边的 Left State 去做比拟看是否可能 match,可能 match 的话就会作为 inner join 的后果输入。这个图是比拟粗的展现进去一个 inner join 的大略细节。也是让大家大略的领会双流 join 的实现思路。

2.2.2 Regular join

咱们首先来看一下第 1 类双流 join 的形式,Regular join。这种 join 形式须要去保留两个流的状态,持续性地保留并且不会去做革除。两边的数据对于对方的流都是所有可见的,所以数据就须要持续性的存在 state 外面,那么 state 又不能存的过大,因而这个场景的只适宜有界数据流。它的语法能够看一下,比拟像离线批处理的 SQL:

在上图页面外面是当初 Flink 反对 Regular join 的一些写法,能够看到和咱们一般的 SQL 根本是统一的。

2.2.3 Interval join

在双流 join 外面 Flink 反对的第 2 类 join 就是 Interval join 也叫区间 join。它是什么意思呢?就是退出了一个工夫窗口的限定,要求在两个流做 join 的时候,其中一个流必须落在另一个流的工夫戳的肯定工夫范畴内,并且它们的 join key 雷同才可能实现 join。退出了工夫窗口的限定,就使得咱们能够对超出工夫范畴的数据做一个清理,这样的话就不须要去保留全量的 State。

Interval join 是同时反对 processing time 和 even time 去定义工夫的。如果应用的是 processing time,Flink 外部会应用零碎工夫去划分窗口,并且去做相干的 state 清理。如果应用 even time 就会利用 Watermark 的机制去划分窗口,并且做 State 清理。

上面咱们来看一些示例:

上图这个示例用的数据是两张表:一个是订单表,另外一个是配送表。这里定义的工夫限定是配送的工夫必须在下单后的 4 个小时内。

Flink 的作者之前有一个内容十分直观的分享,这里就间接复用了他这部分的一个示例:

咱们能够看到对于 Interval join 来说:它定义一个工夫的上限,就能够使得咱们对于在工夫上限之外的数据做清理。比方在方才的 SQL 外面,其实咱们就限定了 join 条件是 ordertime 必须要大于 shiptime 减去 4 个小时。对于 Shipments 流来说,如果接管到 12:00 点的 Watermark,就意味着对于 Orders 流的数据小于 8:00 点之前的数据工夫戳就能够去做抛弃,不再保留在 state 外面了。

同时对于 shiptime 来说,其实它也设定了一个工夫的上限,就是它必须要大于 ordertime。对于 Orders 流来说如果接管到了一个 10:15 点的 Watermark,那么 Shipments 的 state 10:15 之前的数据就能够摈弃掉。所以 Interval join 使得咱们能够对于一部分历史的 state 去做清理。

2.2.4 Window join

最初来说一下双流 join 的第 3 种 Window join:它的概念是将两个流中有雷同 key 和处在雷同 window 里的元素去做 join。它的执行的逻辑比拟像 Inner join,必须同时满足 join key 雷同,而且在同一个 window 里元素才可能在最终后果中输入。具体应用的形式是这样的:

目前 Window join 只反对 Datastream 的 API,所以这里应用形式也是 Datastream 的一个模式。能够看到咱们首先把两流去做 join,而后在 where 和 equalTo 外面去定义 join key 的条件,而后在 window 中须要去指定 window 划分的形式 WindowAssigner,最初要去定义 JoinFunction 或者是 FlatJoinFunction,来实现咱们匹配元素的具体解决逻辑。

因为 window 其实划分为三类,所以咱们的 Window join 这里也会分为三类:

  • 第 1 类 Tumbling Window join:它是依照工夫区间去做划分的 window。

能够看到这个图外面是两个流(绿色的流年和黄色的流)。在这个例子里咱们定义的是一个两毫秒的窗口,每一个圈是咱们每个流上一个单个元素,下面的工夫戳代表元素对应的工夫,所以咱们能够看到它是依照两毫秒的距离去做划分的,window 和 window 之间是不会重叠的。对于第 1 个窗口咱们能够看到绿色的流有两个元素合乎,而后黄色流也有两个元素合乎,它们会以 pair 的形式组合,最初输出到 JoinFunction 或者是 FlatJoinFunction 外面去做具体的解决。

  • 第 2 类 window 是 Sliding Window Join:这里用的是 Sliding Window。

sliding window 是首先定义一个窗口大小,而后再定义一个滑动工夫窗的大小。如果滑动工夫窗的大小小于定义的窗口大小,窗口和窗口之间会存在重叠的状况。就像这个图里显示进去的,红色的窗口和黄色窗口是有重叠的,其中绿色流的 0 元素同时处于红色的窗口和黄色窗口,阐明一个元素是能够同时处于两个窗口的。而后在具体的 Sliding Window Join 的时候,能够看到对于红色的窗口来说有两个元素,绿色 0 和黄色的 0,它们两个元素是合乎 window join 条件的,于是它们会组成一个 0,0 的 pair。而后对于黄色的窗口符合条件的是绿色的 0 与黄色 0 和 1 两位数,它们会去组合成 0,1、0,0 和 1,0 两个 pair,最初会进入到咱们定义的 JoinFunction 外面去做解决。

  • 第 3 类是 SessionWindow join:这外面用到的 window 是 session window。

session window 是定义一个工夫距离,如果一个流在这个工夫距离内没有元素达到的话,那么它就会从新开一个新的窗口。在上图外面咱们能够看到窗口和窗口之间是不会重叠的。咱们这里定义的 Gap 是 1,对于第 1 个窗口来说,能够看到有绿色的 0 元素和黄色的 1、2 元素都是在同一个窗口内,所以它会组成在 1 ,0 和 2,0 这样的一个 pair。残余的也是相似,符合条件的 pair 都会进入到最初 JoinFunction 外面去做解决。

整体咱们能够回顾一下,这一节次要是介绍了维表 join 和双流 join 两大类场景的 Flink ETL 实现办法。在维表 join 上次要介绍了预加载维表、热存储关联、播送维表、Temporal table function join 这 4 种形式。而后在双流 join 上咱们介绍了 Regular join、Interval join 和 Window join。

作者介绍:

买蓉,美团点评高级技术专家,用户全景数据建设负责人,负责过美团点评流量数据仓库、流量治理剖析零碎等流量数据体系的搭建,专一于海量数据处理、数据仓库建设、用户及流量剖析等畛域。

正文完
 0