摘要:本文整顿自阿里云资深技术专家,阿里云Hologres负责人姜伟华,在FFA实时湖仓专场的分享。点击查看>>本篇内容次要分为四个局部:

一、实时数仓分层的技术需要

二、阿里云一站式实时数仓Hologres介绍

三、Flink x Hologres:天作之合

四、基于Flink Catalog的Streaming Warehouse实际

点击查看视频回放

一、实时数仓分层的技术需要

首先,咱们讲一讲数仓的分层技术以及分层技术的现状。

1、实时数仓分层技术现状

大数据当初越来越考究实时化,在各种场景下都须要实时,例如春晚实时直播大屏,双11 GMV实时大屏、实时个性化举荐等场景,都对数据的实时性有着十分高的要求。为了满足业务的实时性需要,大数据技术也开始逐渐倒退出实时数仓。

但如何构建实时数仓呢?

相比离线数仓,实时数仓没有明确的方法论体系。因而在实践中,有各种各样的办法,但没有一个办法是万能。最近行业内提出了Streaming Warehouse的概念。Streaming Warehouse的实质是分层之间可能做到实时数据的流动,从而解决实时数仓分层的问题。

上面,咱们先来理解下实时数仓的支流分层计划。

2、实时数仓支流分层计划

实时数仓的支流分层计划次要有4个。

计划1:流式ETL

ETL(Extract- Transform-Load)是比拟传统的数据仓库建设办法,而流式ETL就是指:实时数据通过Flink实时ETL解决之后,将后果写入到KV引擎中,供给用查问。而为了解决中间层不不便排查的问题,也须要将中间层数据同步到实时数仓中供剖析之用。最常见的做法就是数据通过Flink荡涤后,写到Kafka造成ODS层。再从Kafka生产,通过加工造成DWD层。而后Flink加工成DWS层,最初通过加工造成ADS层的数据写到KV引擎并对接下层利用。因为间接应用Kafka数据进行剖析和探查很麻烦,所以也会同步一份Kafka数据到实时数仓,通过实时数仓进行剖析和探查。

这个计划的劣势是档次明确,分工明确。但劣势是须要有大量的同步工作、数据资源耗费很大、数据有很多冗余、解决链路较简单须要很多的组件。除此之外,这个计划构建的实时数仓分层,尤其是Kafka分层,复用性十分差,也没方法响应schema的动态变化。

计划2:流式ELT

而流式ELT则是将计算后置,间接将明细数据写进实时数仓(EL),不须要严格的数仓分层,整个架构只须要一层,下层利用查问的时候进行数据的变换(T)或者分层。常见的做法就是把数据加工荡涤后,写到实时数仓里,造成DWD层,所有的查问都基于DWD层的明细数据进行。

这个计划的益处在于,没有ETL,只有一层;数据订正很不便。但它的弊病有两个方面:

  • 在查问性能方面,因为是明细数据查问,所以在某些场景下不能满足QPS或提早的要求。
  • 因为没有严格的数仓分层,所以数据复用很艰难,很难兼顾各方面的诉求。

计划3:定时调度

既然实时流式无奈实现数据的实时数仓分层,咱们能够将数据实时写入实时数仓的DWD层。DWS层、ADS层用离线的高频调度办法,实现分钟级的调度,从而借用离线数仓,进行分层结构。这个也就是业界罕用的计划3。

这个计划的益处在于能够复用很多离线教训,计划成本低且成熟。但计划也存在如下毛病:

  • 提早大:每一层的提早都跟调度相干,随着档次越多,调度提早越大,实时数仓也变成了准实时数仓。
  • 不能齐全复用离线计划:离线调度个别是小时级或天级,咱们能够应用全量计算。但在分钟级调度时,必须做增量计算,否则无奈及时调度。

计划4:实时物化视图

第4种计划就是通过实时数仓的物化视图能力实现数仓分层。常见的做法就是Flink实时加工后,将数据写到实时数仓造成DWD层,DWS层或ADS层的结构依赖于实时数仓的实时物化视图能力。

当初支流实时数仓都开始提供物化视图的能力,但实质上都是提供了一些简略的聚合类物化视图。如果物化视图的需要比较简单,能够利用实时数仓里的实时物化视图能力,将DWS层到ADS层的构建自动化,从而让物化视图的查问保障较高的QPS。但这个计划最大的毛病在于,当初的实时物化视图技术都还不成熟,能力无限,反对的场景也比拟无限。

二、阿里云一站式实时数仓Hologres介绍

接下来,先介绍一下阿里云一站式实时数仓Hologres产品。Hologres是阿里云自研的一站式实时数仓,它同时蕴含三种能力:

  • OLAP能力:同传统的实时数仓一样,能够反对数据的实时写入、以及简单OLAP实时多维分析疾速响应,满足业务的极致数据摸索能力。
  • 在线服务Serving(KV):能够反对KV查问场景,提供十分高的QPS和毫秒级的低提早。
  • 湖仓一体:可能间接查问数据湖的数据,以及可能减速阿里云离线数仓MaxCompute,助力业务更低成本实现湖仓一体。

上面为具体介绍:

  • 首先,大家能够把Hologres当做一个常见的实时数仓。它的特点在于写入侧反对百万RPS的实时写入,写入即可查,没有提早。同时也反对高性能的实时整行更新和部分更新。其中,整行更新是把整行替换掉,部分更新能够更新一行中的部分字段,二者都是实时更新。
  • 在查问侧,一方面反对简单的OLAP多维分析,能够十分好的反对实时大屏、实时报表等场景。近期Hologres拿到了TPC-H 30TB的性能世界第一的TPC官网认证问题,见>>阿里云 ODPS-Hologres刷新世界纪录,当先第二名23%。其次,Hologres也反对在线服务查问,不仅反对百万QPS KV点查,而且也反对阿里云达摩院的Proxima向量检索引擎,能够反对十分高效的向量检索能力。同时这些能力在Hologres中是全用SQL表白,对用户应用十分敌对。此外,为了兼顾数据服务和实时数仓的需要,Hologres在行存、列存的数据格式根底上,也反对行列共存,即行列共存的表即一份行存,又有一份列存,并且零碎保障这两份数据是强统一的,对于OLAP剖析,优化器会主动抉择列存,对于线上服务,会主动抉择行存,通过行列共存能够十分敌对的实现一份数据撑持多个利用场景。
  • 因为Hologres同时反对OLAP剖析和线上服务,其中线上服务要求十分高的稳定性和SLA。为了保障OLAP剖析和线上服务时不会发生冲突,咱们反对了读写拆散,从而实现OLAP与数据服务的强隔离。
  • 最初,在湖仓数据交互式剖析方面,Hologres对阿里云MaxCompute离线数仓里的数据,数据湖中的数据都能够秒级交互式剖析,且不须要做任何的数据搬迁。
  • 除此之外,Hologres的定位是一站式的企业级实时数仓,所以除了上述能力,咱们还有很多其余能力。包含数据的治理、老本治理、数据血统、数据脱敏、数据加密、IP白名单、数据的备份和复原等等。

三、Flink x Hologres:天作之合

1、Hologres与Flink深度集成

Flink对于实时数仓可能提供十分丰盛的数据处理、数据入湖仓的能力。Hologres与Flink有些十分深度的整合能力,具体包含:

  • Hologres能够作为Flink的维表:在实时计算的场景下,Flink对维表的需要很强,Hologres反对百万级至千万级RPS的KV点查能力,能够间接当做Flink维表应用,且能够做到实时更新,对于像实时特色存储等维表关联场景就也能够十分高效的反对。
  • Hologres能够作为Flink的后果表:Hologres反对高性能的实时写入和整行实时更新的能力,能够联合Flink,输入须要弱小的Update能力,满足数仓场景下的实时更新、笼罩等需要。与此同时,Hologres还有很强的部分更新能力。部分更新能力在很多场景下,能够代替Flink的多流Join,为客户节省成本。
  • Hologres能够作为Flink的源表:Hologres反对Binlog能力,一张表的任何变动,比方insert、update、delete等等,都会产生Binlog事件。Flink能够订阅Hologres Binlog,进行驱动计算。因为Flink反对Hologres的整表读取,二者联合形成了Flink全增量一体化的读取能力。并且,Hologres也对了接Flink CDC,它能够驱动Flink CDC的计算。
  • 反对Hologres Catalog:通过Hologres Catalog的任何操作,都会间接实时反映到Hologres里,用户也不须要在Flink建Hologres表,这样就使得Flink+Hologres就具备了整库同步、Schema Evolution的能力。

2、基于Flink+Hologres的Streaming Warehouse计划

那Flink和Hologres如何构建Streaming Warehouse?

Streaming Warehouse:数据能在数仓之间实时的流动,实质上就是解决实时数仓分层的问题

最开始咱们介绍了常见的数仓分层计划,Flink+Hologres的Streaming Warehouse计划则是能够齐全将Flink+Kafka替换。具体做法如下:

  1. 将Flink写到Hologres里,造成ODS层。Flink订阅ODS层的Hologres Binlog进行加工,将Flink从DWD层再次写入Hologres里。
  2. Flink再订阅DWD层的Hologres Binlog,通过计算造成DWS层,将其再次写入Hologres里。
  3. 最初,由Hologres对外提供利用查问。

该计划相比Kafka有如下长处:

  • 解决了传统中间层Kafka数据不易查、不易更新、不易修改的问题。Hologres的每一层都可查、可更新、可修改。
  • Hologres的每一层都能够独自对外提供服务。因为每一层的数据都是可查的,所以数据的复用会更好,真正实现数仓分层复用的指标。
  • Hologres反对数据复用,模型对立,架构简化。通过Flink+Hologres,就能实现实时数仓分层,简化架构和降低成本。

3、Flink+Hologres外围能力:Binlog、行列共存、资源隔离

下面讲的Flink+Hologres的Streaming Warehouse计划,其强依赖于以下三个Hologres外围能力:

  1. Binlog:因为实时数仓个别没有Binlog,但Hologres提供了Binlog能力,用来驱动Flink做实时计算,正因为有了Binlog,Hologres能力作为流式计算的上游。
  2. 行列共存。一张表既有行存数据,又有列存数据。这两份数据是强统一的。行列共存的个性让中间层的每张表,岂但可能给Flink应用,而且能够给其余利用(比方OLAP、或者线上服务)应用。
  3. 资源强隔离。实时数仓个别是弱隔离或软隔离,通过资源组、资源队列的办法实现资源隔离。如果Flink的资源耗费很大,可能影响中间层的点查性能。但在Hologres强隔离的能力下,Flink对Hologres Binlog的数据拉取,不会影响线上服务。

通过Binlog、行列共存、资源强隔离的三个特点,不仅能让Flink+Hologres造成Streaming Warehouse,并且可能使两头的每层数据复用,被其余利用或线上服务应用,助力企业构建最简略最残缺的实时数仓。

4、基于Flink+Hologres的多流合并

接下来,讲一讲基于Flink+Hologres的多流合并。

因为Hologres有特地弱小的部分更新能力,基于此咱们能够简化Flink的多流Join。比方在风控场景下,咱们须要基于用户ID构建用户的多侧面画像,用户画像来自很多数据源,比方客户的浏览行为、成交行为、履约行为等等。把数据源的数据依照用户ID,把每个用户放到一行里,造成不同的字段,造成用户的残缺画像。

传统的形式须要用Flink多流Join实现,Flink把上游的多个数据源关联到一起,Join后写到Kafka里,而后驱动上游的Flink,加工这行残缺的数据。这就使得多流Join十分耗资源。

所以在Flink+Hologres的Streaming Warehouse计划中,能够利用Hologres的部分更新能力,把一张表定为定义成Hologres的行存表或行列共存表。此时,整个计划就简化成上游每个数据源,同步数据到Hologres表的若干个字段里,若干个工作同时写入这张表,而后利用Hologres的部分更新能力,把数据汇总在一起。

如果关上这张Hologres表的 Binlog,上游任何数据源的变动都会更新这张表,使这张表的Binlog中生成行数据的最新状态,而后驱动上游的Flink持续计算,从而完满匹配常见的风控场景。这种用法下,资源耗费、运维都失去了极大的简化。

四、基于Flink Catalog的Streaming Warehouse实际

Flink+Hologres的Streaming Warehouse计划曾经十分成熟,但惟一的毛病在于,用户须要在两个零碎之间切换,过程比拟繁琐。为了让用户操作更简略,咱们基于Flink Catalog提供了更加简略的应用体验。

上面咱们来看看怎么样基于Flink Catalog去构建基于Flink+Hologres的Streaming Warehouse。咱们会发现,有了Flink Catalog后,整个应用体验会很简略,并能充分发挥Flink和Hologres两个产品的弱小能力。

下图是一个典型的Flink+Hologres实时ETL链路:

  1. ODS层、DWD层、ODS层的数据都存在Hologres中。
  2. 链路中所有的数据加工都是通过Flink SQL实现。在整个ETL链路中,用户不须要任何Hologres SQL,间接写Flink SQL即可。
  3. Flink用户能够通过Flink SQL对每层中的Hologres数据进行数据探查(流模式和批模式都能够)。比方:当咱们发现DWS层的数据后果呈现问题,须要查看哪层的后果有问题或逻辑有谬误。此时,咱们能够复用原来的Flink SQL来进行探查、定位或者数据从新生产。
  4. Hologres中的每层数据都能够对外提供查问和服务(通过Hologres SQL)。

![]()

接下来,以某个电商场景为例,演示一下基于Flink Catalog的Streaming Warehouse。如下图所示,有一个MySQL数据库作为订单库,外面有订单表orders、订单领取表orders_pay、以及产品品类表product_catalog。

  1. 第一步,咱们通过Flink的实时数仓,把数据实时同步到Hologres里,造成ODS层。
  2. 第二步,加工DWD层。将DWD层的数据写到Hologres里。在这个过程中,咱们须要把订单表和订单领取表,合并成一张表,实现多路合并。与此同时,咱们心愿orders表关联商品品类表product_catalog。
  3. 第三步,驱动上游计算,构建DWS层。以用户维度和商店维度,收集统计数据。比方用户每天的订单金额和商店每天的订单金额,从而造成一条残缺的链路。
  4. 第四步,将DWS层的表举荐给零碎应用。作为用户和商店的特色,用做举荐用处。
  5. 第五步,DWD层的表可能间接用来做实时统计分析、统计产品、实时大屏、实时报表。

上图中的绿色链路,全副应用Flink SQL实现。橙色链路对外提供服务,由Hologres SQL实现。

接下来,讲一讲每个步骤是如何运行的。

第一步,在Flink实时数仓,造成ODS层。首先,创立一个Hologres的Catalog。MySQL中存储订单、领取以及商品信息3张表,通过Flink Catalog性能,将MySQL整库的数据实时同步至Hologres,造成ODS。相干代码如下所示。咱们能够看到,MySQL整库同步到Hologres,通过Flink SQL来表白是非常简单的。

-- 创立Hologres CatalogCREATE CATALOG holo WITH ( ‘type’ = ‘hologres’ … ); -- MySQL整库同步到HologresCREATE DATABASE IF NOT EXISTS holo.order_dw AS DATABASE mysql.sw INCLUDING all tables;

第二步,DWD实时构建。数据实时写入ODS层后,Flink读取Hologres Binlog,并用多流合并、维表关联将订单、交易、商品3个表打成一个大宽表,实时写入至Hologres的订单汇总表中,造成DWD层。

如下SQL是DWD层表的建表语句。这张指标表蕴含了来自orders、orders_pay、product_catalog的字段,关联了相干的用户信息、商户信息、订单信息、商品品类信息等等,造成了一张宽表。

CREATE TABLE holo.order_dw.dwd_orders (  order_id bigint not null primary key,  --字段来自order 表  order_user_id bigint,  order_shop_id bigint,  order_product_id string,  order_fee numeric(20,2),  order_create_time timestamp_ltz,  order_update_time timestamp_ltz,  order_state int,  --字段来自product_catalog表  order_product_catalog_name string,  --字段来自orders_pay表  pay_id bigint,  pay_platfrom int,  pay_create_time timestamp_ltz) ;

上面的SQL是真正的计算逻辑,这里蕴含两个INSERT语句:

  • 第一个INSERT语句是从orders表实时打宽后写入。这里用到了Hologres的维表关联能力。实时打宽后,写入指标表的局部字段。
  • 第二个INSERT语句是从orders_pay表实时同步到同一张指标表,更新另外一些字段。

这两个INSERT语句最大的关联在于,它们写的是同一张表,会主动利用指标表的主键ID进行关联。每个INSERT都是做了指标表的部分更新,两者的合力后果是实时更新的指标宽表。

BEGIN STATEMENT SET;-- 从orders表实时打宽后写入INSERT INTO holo.order_dw.dwd_orders (order_id,order_user_id,order_shop_id,order_product_id,order_fee,order_create_time,order_update_time,order_state,order_product_catalog_name)SELECT o.*,dim.catalog_nameFROMholo.order_dw.orders o LEFT JOIN holo.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime () AS dimONo.product_id = dim.product_id;-- 从order_pays表实时写入INSERT INTO holo.order_dw.dwd_orders (pay_id,order_id,pay_platform,pay_create_time)SELECT *FROMholo.order_dw.orders_pay;END;

第三步,DWS层的实时聚合。在DWD的根底上,通过Flink读取Hologres DWD的Binlog数据,进行实时指标聚合计算,比方依照用户维度聚合,依照商户维度聚合等,而后实时写入Hologres,造成DWS层。

  • 先是创立对应的聚合指标表,DDL语句如下
-- 用户维度聚合指标表CREATE TABLE holo.order_dw.dws_users (  user_id bigint not null,  ds string not null,  -- 当日实现领取总金额     payed_buy_fee_sum numeric(20,2) not null,        primary key(user_id,ds) NOT ENFORCED); -- 商户维度聚合指标表CREATE TABLE holo.order_dw.dws_shops (  shop_id bigint not null,  ds string not null,  -- 当日实现领取总金额  payed_buy_fee_sum numeric(20,2) not null,       primary key(shop_id,ds) NOT ENFORCED);
  • 而后将数据写入Hologres中,通过简略的三步后,Flink SQL构建了残缺的Streaming Warehouse分层体系。
--数据写入Hologres BEGIN STATEMENT SET;INSERT INTO holo.order_dw.dws_usersSELECT   order_user_id,  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,  SUM (order_fee)FROM holo.order_dw.dwd_orders cWHERE pay_id IS NOT NULL AND order_fee IS NOT NULLGROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO holo.order_dw.dws_shopsSELECT   order_shop_id,  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,  SUM (order_fee)FROM holo.order_dw.dwd_orders cWHERE pay_id IS NOT NULL AND order_fee IS NOT NULLGROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');END;

第四步,构建利用,基于DWS层,对外提供服务。

数据的分层和加工实现后,业务就能够通过Hologres查问数据并利用。在这个例子里,举荐零碎要求十分高的点查性能,所以要求百万级的QPS查看能力。Hologres的行存表或者行列共存表齐全能够满足。

这个计划和传统的实时数仓最大的差异是:传统的实时数仓只有最初一层的数据,可对外提供服务。而在Hologres里,DWD等中间层数据也能够对外提供服务,进行实时报表统计。用户能够在中间层进行查问操作,对接各种实时利用、实时大屏。比方

  • 间接查DWD层的数据,典型的如依据用户ID返回举荐商品(KV场景)
--场景4: 依据用户特色举荐商品SELECT * FROM dws_users WHERE user_id = ? AND ds = '2022-11-09’;--场景4: 依据店铺特色举荐商品SELECT * FROM dws_shops WHERE shop_id = ? AND ds = '2022-11-09’;
  • 实时报表查看订单量和退单量(OLAP)。
--场景6:基于宽表数据展现实时报表-- 最近30天,每个品类的订单总量和退单总量SELECTTO_CHAR(order_create_time, 'YYYYMMDD'),order_product_catalog_name,COUNT(*),COUNT(CASE WHEN refund_id IS NOT NULL THEN 1 ELSE null END)FROMdwd_ordersWHEREorder_create_time > now() - '30 day' :: intevalGROUP BY1, 2ORDER BY1, 2;

第五步,问题排查:Flink数据探查。如果某个业务指标出现异常,Flink能够间接探查每层表的数据来疾速定位。比方用Flink探查Hologres DWD层的orders表。Hologres反对Flink的流模式和批模式对数据的探查。

因为流模式是Flink的默认模式,因而咱们不须要设置执行模式。它能够间接记录数据变动,从而十分不便的查看数据异样。流模式能够探查获取一段时间范畴内的数据及其变动状况。

-- 流模式探查 SELECT   * FROM holo.order_dw.dwd_orders /*+ OPTIONS('cdcMode'='false', 'startTime'='2022-11-09 00:00:00') */ c WHERE   user_id = 0;

与此同时,批模式探查是获取以后时刻的最新数据。Hologres也反对Flink批模式的数据探查。批模式和流模式的区别在于,流模式关注的是变动,批模式关注的是表中的最新状态。

-- 批模式探查 set 'execution.runtime-mode'='batch’;SELECT   * FROM   holo.order_dw.dwd_orders WHERE   user_id = 0  AND order_create_time>'2022-11-09 00:00:00';

五、总结

Hologres跟Flink深度集成。实现残缺的Streaming Warehouse计划,该计划有如下显著劣势:

  1. 一站式:全链路都能够用SQL示意,并且只须要用到Flink和Hologres两个组件,操作十分不便。实时ETL链路、数据分层齐全能够用Flink SQL实现,Hologres提供对外提供在线服务和OLAP查问,每层数据可复用、可查,不便构建实时数仓的数据分层和复用体系。
  2. 高性能:这种计划能够使得使得Hologres施展极致的实时写入、实时更新能力和多维OLAP、高并发点查能力,Flink施展实时加工能力。
  3. 企业级:自带多种企业级能力,不仅运维更简略,可观测性更好,平安能力更强,也提供多种高可用能力,从而企业更加不便的构建企业级的Streaming Warehouse。