乐趣区

关于实时计算:流批一体的近实时数仓的思考与设计

摘要:基于对数据工夫旅行的思考,引出了对目前三种数仓状态和两种数仓架构的思考。联合数据湖在 Flink 的利用和数据湖元数据类型的思考,摸索了基于数据湖的 Flink SQL 流批一体的实际,在流批一体 SQL 表白统一、后果一致性、流批工作拆散、混合调度依赖等进行了设计和摸索。同时,欢送大家多分享具体实际,一起共筑新的数据实际形式。

一、数据的工夫旅行和业务对数据的实质要求

大规模的数据处理衰亡于 Hadoop 生态的倒退,关键在于分布式贮存和分布式计算的倒退,造就了现在近百种无关大数据的生态技术。数仓实践和建模实践基于大数据技术体系得以疾速倒退,其中离线数仓的标准化建设失去了广泛应用。数据的实质是一种行为的具象,业务在对数据的需要,外围在于对行为的可摸索和可察看。基于此,咱们须要明确一点,大数据技术是否齐全满足了业务对数据需要在工夫维度上的确定性了呢,这点是值得思考的。那么咱们先来看一下数据的工夫旅行。

业务冀望的数据:用户空间下的工夫数据,t1 工夫数据,用户天然工夫点或天然时间段的明细或者统计数据。

传输提早:App 用户,数据发送到网关或者日志服务零碎,或者 Server 日志落文件系统所产生的提早。Event 进入到存储空间,能够代表数据曾经是确定的,根本可察看,个别状况下,这个提早很小。然而,在某些状况,比方 APP 的日志产生之后,然而因为网络等问题始终没有发送,或者 Server 宕机,导致提早发送或者最终失落。总体而言,传输提早属于不可控提早,临时没有什么好的技术计划来解决。

存储空间:数据承载于理论的存储中,离线数仓承载于具体的分布式文件系统,实时数仓基于 Kafka 的音讯队列零碎,近实时数仓承载于数据湖存储中。这里能够形象来看离线数仓,Event 承载于分布式文件系统,以小时分区为例,某个小时的分区实质是天然工夫产生的文件的汇合,工夫精度进化为小时级别。

计算提早:数据进入存储之后,与进入计算空间的时间差,t3-t2。实时数仓中,计算提早是数据的 ProcessTime-IngestTime。离线数仓中,计算提早是调度产生实例运行工夫 - 数据进入存储空间的时间差。实质离线数仓和实时数仓的计算提早在形象上看是统一的。计算提早在不同的数仓体系下,产生的时效不同,咱们会划分为三种支流的数仓体系,秒级的实时数仓,分钟级的近实时数仓,小时级的离线数仓。能够看出,数仓的时效性差别,因为传输提早的不可控,进化为计算提早的差别。

二、离线、近实时、实时三种数仓在工夫维度下的成因

在离线数仓和实时数仓,经常会提到数据的有界和无界,认为离线数仓的数据是有界的,实时数仓的音讯流是无界的。精确与否在于数据的确定性考量。

离线数仓的确定性,在于文件天然生成工夫的确定性和不可更改性,某个小时的天然文件生成,近似等于事件工夫在天然工夫的确定性,反例就是咱们能看到数据漂移的状况,事件工夫会或多或少落入上个小时或者下个小时的天然文件生成工夫。那么离线数仓的确定性,本质是数据的 IngestTime 的确定性,具备人造的文件属性,易于宰割。当咱们说离线数仓计算的数据是精确的时候,默认了传输提早带来的影响很小或者默认了以后小时的数据指标的规范是文件的天然造成工夫。

实时数仓,经常会提及不确定性或者说 Lambda 架构理论是对实时数仓的不确定性的代替计划。这种不确定性的起因是什么呢?这里分为四类状况阐明,一是 ETL 的解决,从窗口上来说,是单条数据即为一个窗口,窗口的产生和销毁在一个 Event 中实现,y=window(data)。二是基于 EventTime 的工夫窗口,如果再定义延迟时间,y=window(datas, datas.EventTime, delay),第三种和第四种别离就是 IngestTime 和 ProcessTime 的工夫窗口函数。比照离线数仓,能够看出,基于 IngestTime 的工夫窗口和离线数仓的工夫语义最为统一。离线数仓在工夫窗口上,能够看做为数据进入文件的天然工夫所对应的小时窗口,数据所承载的文件的确定性,保障了小时窗口的数据确定性,y=window(files)。

近实时数仓,比方基于 Iceberg 的数据湖建设的近实时数仓,在于离线数仓比照中,理论是将基于小时文件细分到分钟级别的快照文件上来,y=window(snapshots)。比照实时数仓,因为 Kafka 的 IngestTime 目前在精确性上是不准确的,基于快照的文件划分,在精确性上有肯定的保障,同时在升高时效水平,从秒进化为分钟,很多场景是能够容忍的。

三种在工夫维度比照上看,一是在某个工夫,统计的实质对业务的需要都是近似的,这个实质是传输提早所带来的,然而这个在实践中,不影响数据的可用性和统计学意义。二是不同数仓的划分,是存储和计算技术倒退所带来的。三是离线数仓的确定性含糊了传输提早,实时数仓的不确定性,是对传输提早的一种取舍,人为的限定了 EventTime 的最大延迟时间,从而保障了指标的时效性,都是具备实际的意义所在。

三、Lambda 和 Kappa 架构在工夫维度下的取舍

当离线数仓刚刚倒退的时候,只有一种数仓架构,也是基于大数据分布式解决刚刚倒退的起因。随着实时技术的倒退,大家在时效性上有了更多要求,然而同离线数仓比照的时候,在数据的准确性上,因为统计的窗口不同,必然会导致某个时刻的指标后果的不严格统一。

为了解决这种不严格统一的状况,Lambda 架构(由 Storm 的作者 Nathan Marz 提出的)产生的,实时确保时效,离线确保精确。最终会以确保离线三个工夫窗口的统计一个事件工夫窗口的后果,来回补实时数仓认为 EventTime 窗口,因为时效性抛弃的提早数据的后果,从而保障业务上对 EventTime 窗口的要求,或者默认为离线的 IngestTime 所产生的文件分区近似认为 EventTime 的工夫窗口。这种带来的弊病,保护两套数据路线,而大家总在想方法解决。

Kappa 架构的提出,得益于实时计算的效率晋升,然而因为在批处理技术短板,生产实践推广受限。Kappa 架构是基于实时 EventTime 的一种数据窗口解决,因为 Kafka 的 IngestTime 不准确和为了同离线数仓比照而衡量思考,EventTime 在传输提早上的不可控,导致 Kappa 架构的准确性就会呈现折扣。尽管是业务上最精确的工夫范畴,可行性上确不佳。

近些年来,一直倒退的 MPP 架构的 OLAP 查问引擎,并不会波及到工夫窗口的计算取舍,OLAP 引擎实质是基于 ProcessTime 来减速查问的一种技术手段,是数仓不可分割的一部分,然而传输提早的不可控没有解决,然而将计算提早下推到了查问时,通过疾速查问来解决尽可能减少计算提早,同时保障了查问的灵活性,自助剖析摸索上有着宽泛的利用。

从数仓架构的倒退上看,一直在围绕后果的确定性,技术的可行性,数据的时效性,查问的灵活性上,一直的衡量,各个组件也是根据理论需要而倒退起来的。

四、数仓一体的可行性思考

基于三种数仓体系和两种架构的思考,每个设计都是兼顾一种或多种考量,那么能不能实现一种机制,可能较好的满足数仓需要体系建设呢?从目前的技术倒退上看,是有肯定的可能性的。架构体系的倒退一是基于技术根底,二是一直排汇组件的长处,做加法。

除去实时、近实时、离线数仓的划分,从技术的视角去看数仓建设的可行性。那么咱们就要选取一些重要的点,取舍掉一些不可能的实现。

第一点是后果的确定性,这点是基于离线数仓倒退的思考。不确定性带来的问题是信息的不对称,确定性的后果是能够含糊肯定的指标含意的。

第二点是数据的时效性,高时效必然可能满足低时效,反之不然。另外数据的时效性,自身是根底组件的技术倒退所限度的。

第三点是开发的便利性,排在时效性前面的思考是,便利性是基于利用层面建设的,难度个别是弱于根底组件的,能够通过一直实际优化,达到一个良好的应用体验。

第四点是查问的灵活性和高响应,OLAP 的根底设计保障了查问速度,那么 OLAP 的技术架构体现是能够复用或者拓展的。

那么基于下面四点思考,能够在实时数仓的根底上,优先解决掉确定性问题。这个是很重要的一个命题,要保障计算结果同离线数仓的一致性。这一点的实现方面,能够参考离线数仓,含糊 EventTime 和 IngestTime,用文件的 start 和 end 作为确定性的根据,文件的两头实时计算,确保时效性。那么基于 Flink,就须要实现一种基于文件天然宰割的 Watermark 机制,作为计算窗口划分的根据。

在确定性问题之后,须要解决计算的老本和应用的老本,这里比拟重要的是存储层,实时数仓依赖 Kafka,Kafka 倒退不具备数仓一些重要的点,老本是一个方面,查问是一个方面,Kafka 无奈架构在各种 OLAP 引擎或者计算引擎下面。这里,近实时数仓的依赖,比方数据湖或者 Paimon,数据湖分钟级的时效。不过,从倒退的角度上看,是一种可行的解决方案。数据湖兼顾了流计算和批计算,同时,如果将来 OLAP 引擎如果可能在数据湖上实现相似 MPP 架构的查问效率,这也是有可能的,比方短期能够用数据冗余,将数据湖格局的数据转换一份到 OLAP 对应的引擎上实现减速查问。

第三个方面,流式计算的治理和依赖机制,借鉴于离线数仓的治理形式,须要一套齐备的数据依赖治理,工作容错回跑机制。实时数仓个别是基于单个工作式的治理,离线数仓是基于工作流的治理,那么实时数仓的倒退,也必然要实现工作流的治理形式,笼罩整个开发链路。

为了实现一种统计的数仓架构,那么须要的倒退工作如下:一是着重倒退存储层,比方数据湖,既要比拟好的适应流和批引擎,又要可能高度适应 OLAP 查问引擎。二是在实时数仓或者近实时数仓,引入相似离线数仓的调度依赖治理和补数和容错回跑机制,或者在离线调度上兼容流工作依赖调度,实现工作流级别的治理和流批一体的数仓实现。三是在引擎层着重倒退 Flink 批处理能力。

最终的工作运行形式同时蕴含三种:实时模式、离线模式、业务模式,别离对应着不同的数据准确性级别。也能够任选其一或者其二作为运行形式。

五、基于 Flink 和数据湖的流批一体近实时数仓设计示例

数仓工作在离线调度和实时工作的简略形象示例:

数据源 => 同步工作 / 实时工作 =>

stg_table(partition=hour) => 计算工作(insert overwrite partition=hour)=>

dwd_table(partition=hour)=> 计算工作(insert overwrite partition=hour)=>

dws_table(partition=hour)=> 同步工作 =>OLAP 减速 => 数据服务

如果存储层是基于数据湖(以 Paimon 为例):

离线调度产生的表的版本信息,commit_kind: insert overwrite 类型的。同时离线工作的驱动,是基于调度依赖的驱动,one by one 的调度。

如果是基于流式计算,比方分钟级生成 snapshot 那么会演变为:

数据源 => 同步工作 / 实时工作 =>

stg_table(version=snapshot_id) => 计算工作(insert into version=snapshot_id)=>

dwd_table(version=snapshot_id)=> 计算工作(insert into version=snapshot_id)=>

dws_table(version=snapshot_id)=> 同步工作 =>OLAP 减速 => 数据服务

那么启动多个工作,工作是继续的运行。commit_kind: insert into 类型的。

那么要想实现流批一体的近实时数仓,须要解决如下问题:

1.Flink 工作反对批量计算能力要继续一直的增强

从 Flink 1.16/1.17 的版本公布状况,在批处理能力上有比拟大的晋升,同时,社区也在继续一直的增强批处理能力以及同 hive 的兼容能力。

2. 如何应用同一份 Flink SQL,既能够用于批任务调度,又能够用于流工作运行呢

两张表:dwd_partition_word_count,dws_partition_word_count,计算 word count

CREATE TABLE tablestore.tablestore_test.dwd_partition_word_count (
    logdate String,
    user_id bigint
) PARTITIONED BY (logdate)
WITH ('bucket' = '3');

CREATE TABLE tablestore.tablestore_test.dws_partition_word_count (
    logdate String,
    user_id bigint,
    cnt BIGINT,
    PRIMARY KEY (logdate,user_id) NOT ENFORCED
) PARTITIONED BY (logdate)
WITH ('bucket' = '3');

批工作的 Flink SQL:

insert overwrite tablestore.tablestore_test.dws_partition_word_count PARTITION(logdate=${start_date}) 
select user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate=${start_date} group by user_id;
-- 或者
insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate=${start_date} group by logdate,user_id;

流工作的 Flink SQL:

insert into tablestore.tablestore_test.dws_partition_word_count 
select logdate,user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count group by logdate,user_id;

如何用一个 Flink SQL 来实现流批模型下的不同呢?

不同点:Insert into 和 Insert overwrite 的问题,这个通过在提交运行模式的时候,如果是批工作,则是 Insert Overwrite,如果是流工作,则转为 Insert into,这个在技术上没有什么难点。

不同点:Where 条件的数据范畴问题。形象来看,流工作和批工作的工夫范畴在表白上是能够对立的

insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>=${start_date} and logdate<=${end_date} group by logdate,user_id;

比方跑 4 月 22 号一天的数据,执行的批 SQL 为:

insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>='20230422' and logdate<='20230422' group by logdate,user_id;

如果用流模式跑,执行的 SQL 能够为:

select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>='19700101' and logdate<='99990101' group by logdate,user_id;

insert overwrite/into 和工夫范畴,能够由平台执行的时候主动转换和参数输出。

3. 批工作的调度和流工作的计算如何拆散

工作实现开发,在批模式下,用调度工作验证了逻辑无误,那么之后能够用流模式,始终继续一直的运行。一是计算逻辑变更或者历史数据修复怎么办,二是可不可以反对流批双跑。其实实质是一个问题。如果计算逻辑变更,那么能够批改流批一体的 SQL 逻辑,而后流工作重启利用新的计算逻辑。同时,流批一体的 SQL,在调度上回跑历史数据,从新刷写数据。

重刷历史数据的时候,流工作会不会读取到重刷的历史数据进行计算。

这个问题次要是通过上述说的数据湖版本 commit kind 解决。批工作只利用 insert overwrite,流工作利用 insert into. 如果流工作检测到 insert overwrite 的版本提交,间接跳过,不做理论的数据读取和解决。只解决 insert into 的数据。理论批工作的执行,对流工作不会产生影响。

目前在数据湖流式读取上,只须要加个开关选项就能够实现。

4. 流工作的 Insert into 如何实现主键写入

如果流工作的 Insert into 不能实现主键写入,那么分区数据的重复性无奈解决,那么就只能流批双跑来解决数据的重复性问题。也就是,上游如果是主键幂等写入,insert into 和 insert overwrite 语义等同。

这个能够通过数据湖主键表 (比方 Paimon 的主键表) 实现。Paimon 的主键表已初步具备生产可用性。

5. 流批工作的调度依赖

如果一个流工作,上游接的是批任务调度,如果实现调度依赖呢?

比拟优雅的实现能够是,在流工作写入上游表的时候,如果数据的 Watermark 写入到上游表的属性中,如果最晚的数据曾经是以后小时的 05 分,那么以后小时的上游调度工作,通过检查表的属性工夫,就能够判断批工作的调度实例是否应该拉起。或者也能够基于流工作的运行提早做查看依赖。

基于上述的实现和解决,咱们根本就能够实现流批一体的 Flink SQL 在批模式和流模式下运行,如果调度依赖做的比较完善的状况下,能够实现流批混跑。同时补数或者双跑对流工作的稳定性不会产生影响。

理论开发,就能够用批工作先开发验证,而后用流模式拉起,数据产出根本是分钟级别的。出问题能够用批工作修改。

作者|张剑

原文链接

本文为阿里云原创内容,未经容许不得转载。

退出移动版