乐趣区

关于大数据:Flink-X-Hologres构建企业级Streaming-Warehouse

摘要:本文整顿自阿里云资深技术专家,阿里云 Hologres 负责人姜伟华,在 FFA 实时湖仓专场的分享。点击查看 >> 本篇内容次要分为四个局部:

一、实时数仓分层的技术需要

二、阿里云一站式实时数仓 Hologres 介绍

三、Flink x Hologres:天作之合

四、基于 Flink Catalog 的 Streaming Warehouse 实际

点击查看视频回放

一、实时数仓分层的技术需要

首先,咱们讲一讲数仓的分层技术以及分层技术的现状。

1、实时数仓分层技术现状

大数据当初越来越考究实时化,在各种场景下都须要实时,例如春晚实时直播大屏,双 11 GMV 实时大屏、实时个性化举荐等场景,都对数据的实时性有着十分高的要求。为了满足业务的实时性需要,大数据技术也开始逐渐倒退出实时数仓。

但如何构建实时数仓呢?

相比离线数仓,实时数仓没有明确的方法论体系。因而在实践中,有各种各样的办法,但没有一个办法是万能。最近行业内提出了 Streaming Warehouse 的概念。Streaming Warehouse 的实质是分层之间可能做到实时数据的流动,从而解决实时数仓分层的问题。

上面,咱们先来理解下实时数仓的支流分层计划。

2、实时数仓支流分层计划

实时数仓的支流分层计划次要有 4 个。

计划 1:流式 ETL

ETL(Extract- Transform-Load)是比拟传统的数据仓库建设办法,而流式 ETL 就是指:实时数据通过 Flink 实时 ETL 解决之后,将后果写入到 KV 引擎中,供给用查问。而为了解决中间层不不便排查的问题,也须要将中间层数据同步到实时数仓中供剖析之用。最常见的做法就是数据通过 Flink 荡涤后,写到 Kafka 造成 ODS 层。再从 Kafka 生产,通过加工造成 DWD 层。而后 Flink 加工成 DWS 层,最初通过加工造成 ADS 层的数据写到 KV 引擎并对接下层利用。因为间接应用 Kafka 数据进行剖析和探查很麻烦,所以也会同步一份 Kafka 数据到实时数仓,通过实时数仓进行剖析和探查。

这个计划的劣势是档次明确,分工明确。但劣势是须要有大量的同步工作、数据资源耗费很大、数据有很多冗余、解决链路较简单须要很多的组件。除此之外,这个计划构建的实时数仓分层,尤其是 Kafka 分层,复用性十分差,也没方法响应 schema 的动态变化。

计划 2:流式 ELT

而流式 ELT 则是将计算后置,间接将明细数据写进实时数仓(EL),不须要严格的数仓分层,整个架构只须要一层,下层利用查问的时候进行数据的变换(T)或者分层。常见的做法就是把数据加工荡涤后,写到实时数仓里,造成 DWD 层,所有的查问都基于 DWD 层的明细数据进行。

这个计划的益处在于,没有 ETL,只有一层;数据订正很不便。但它的弊病有两个方面:

  • 在查问性能方面,因为是明细数据查问,所以在某些场景下不能满足 QPS 或提早的要求。
  • 因为没有严格的数仓分层,所以数据复用很艰难,很难兼顾各方面的诉求。

计划 3:定时调度

既然实时流式无奈实现数据的实时数仓分层,咱们能够将数据实时写入实时数仓的 DWD 层。DWS 层、ADS 层用离线的高频调度办法,实现分钟级的调度,从而借用离线数仓,进行分层结构。这个也就是业界罕用的计划 3。

这个计划的益处在于能够复用很多离线教训,计划成本低且成熟。但计划也存在如下毛病:

  • 提早大:每一层的提早都跟调度相干,随着档次越多,调度提早越大,实时数仓也变成了准实时数仓。
  • 不能齐全复用离线计划:离线调度个别是小时级或天级,咱们能够应用全量计算。但在分钟级调度时,必须做增量计算,否则无奈及时调度。

计划 4:实时物化视图

第 4 种计划就是通过实时数仓的物化视图能力实现数仓分层。常见的做法就是 Flink 实时加工后,将数据写到实时数仓造成 DWD 层,DWS 层或 ADS 层的结构依赖于实时数仓的实时物化视图能力。

当初支流实时数仓都开始提供物化视图的能力,但实质上都是提供了一些简略的聚合类物化视图。如果物化视图的需要比较简单,能够利用实时数仓里的实时物化视图能力,将 DWS 层到 ADS 层的构建自动化,从而让物化视图的查问保障较高的 QPS。但这个计划最大的毛病在于,当初的实时物化视图技术都还不成熟,能力无限,反对的场景也比拟无限。

二、阿里云一站式实时数仓 Hologres 介绍

接下来,先介绍一下阿里云一站式实时数仓 Hologres 产品。Hologres 是阿里云自研的一站式实时数仓,它同时蕴含三种能力:

  • OLAP 能力:同传统的实时数仓一样,能够反对数据的实时写入、以及简单 OLAP 实时多维分析疾速响应,满足业务的极致数据摸索能力。
  • 在线服务 Serving(KV):能够反对 KV 查问场景,提供十分高的 QPS 和毫秒级的低提早。
  • 湖仓一体:可能间接查问数据湖的数据,以及可能减速阿里云离线数仓 MaxCompute,助力业务更低成本实现湖仓一体。

上面为具体介绍:

  • 首先,大家能够把 Hologres 当做一个常见的实时数仓。它的特点在于写入侧反对百万 RPS 的实时写入,写入即可查,没有提早。同时也反对高性能的实时整行更新和部分更新。其中,整行更新是把整行替换掉,部分更新能够更新一行中的部分字段,二者都是实时更新。
  • 在查问侧,一方面反对简单的 OLAP 多维分析,能够十分好的反对实时大屏、实时报表等场景。近期 Hologres 拿到了 TPC-H 30TB 的性能世界第一的 TPC 官网认证问题,见 >> 阿里云 ODPS-Hologres 刷新世界纪录,当先第二名 23%。其次,Hologres 也反对在线服务查问,不仅反对百万 QPS KV 点查,而且也反对阿里云达摩院的 Proxima 向量检索引擎,能够反对十分高效的向量检索能力。同时这些能力在 Hologres 中是全用 SQL 表白,对用户应用十分敌对。此外,为了兼顾数据服务和实时数仓的需要,Hologres 在行存、列存的数据格式根底上,也反对行列共存,即行列共存的表即一份行存,又有一份列存,并且零碎保障这两份数据是强统一的,对于 OLAP 剖析,优化器会主动抉择列存,对于线上服务,会主动抉择行存,通过行列共存能够十分敌对的实现一份数据撑持多个利用场景。
  • 因为 Hologres 同时反对 OLAP 剖析和线上服务,其中线上服务要求十分高的稳定性和 SLA。为了保障 OLAP 剖析和线上服务时不会发生冲突,咱们反对了读写拆散,从而实现 OLAP 与数据服务的强隔离。
  • 最初,在湖仓数据交互式剖析方面,Hologres 对阿里云 MaxCompute 离线数仓里的数据,数据湖中的数据都能够秒级交互式剖析,且不须要做任何的数据搬迁。
  • 除此之外,Hologres 的定位是一站式的企业级实时数仓,所以除了上述能力,咱们还有很多其余能力。包含数据的治理、老本治理、数据血统、数据脱敏、数据加密、IP 白名单、数据的备份和复原等等。

三、Flink x Hologres:天作之合

1、Hologres 与 Flink 深度集成

Flink 对于实时数仓可能提供十分丰盛的数据处理、数据入湖仓的能力。Hologres 与 Flink 有些十分深度的整合能力,具体包含:

  • Hologres 能够作为 Flink 的维表:在实时计算的场景下,Flink 对维表的需要很强,Hologres 反对百万级至千万级 RPS 的 KV 点查能力,能够间接当做 Flink 维表应用,且能够做到实时更新,对于像实时特色存储等维表关联场景就也能够十分高效的反对。
  • Hologres 能够作为 Flink 的后果表:Hologres 反对高性能的实时写入和整行实时更新的能力,能够联合 Flink,输入须要弱小的 Update 能力,满足数仓场景下的实时更新、笼罩等需要。与此同时,Hologres 还有很强的部分更新能力。部分更新能力在很多场景下,能够代替 Flink 的多流 Join,为客户节省成本。
  • Hologres 能够作为 Flink 的源表:Hologres 反对 Binlog 能力,一张表的任何变动,比方 insert、update、delete 等等,都会产生 Binlog 事件。Flink 能够订阅 Hologres Binlog,进行驱动计算。因为 Flink 反对 Hologres 的整表读取,二者联合形成了 Flink 全增量一体化的读取能力。并且,Hologres 也对了接 Flink CDC,它能够驱动 Flink CDC 的计算。
  • 反对 Hologres Catalog:通过 Hologres Catalog 的任何操作,都会间接实时反映到 Hologres 里,用户也不须要在 Flink 建 Hologres 表,这样就使得 Flink+Hologres 就具备了整库同步、Schema Evolution 的能力。

2、基于 Flink+Hologres 的 Streaming Warehouse 计划

那 Flink 和 Hologres 如何构建 Streaming Warehouse?

Streaming Warehouse:数据能在数仓之间实时的流动,实质上就是解决实时数仓分层的问题

最开始咱们介绍了常见的数仓分层计划,Flink+Hologres 的 Streaming Warehouse 计划则是能够齐全将 Flink+Kafka 替换。具体做法如下:

  1. 将 Flink 写到 Hologres 里,造成 ODS 层。Flink 订阅 ODS 层的 Hologres Binlog 进行加工,将 Flink 从 DWD 层再次写入 Hologres 里。
  2. Flink 再订阅 DWD 层的 Hologres Binlog,通过计算造成 DWS 层,将其再次写入 Hologres 里。
  3. 最初,由 Hologres 对外提供利用查问。

该计划相比 Kafka 有如下长处:

  • 解决了传统中间层 Kafka 数据不易查、不易更新、不易修改的问题。Hologres 的每一层都可查、可更新、可修改。
  • Hologres 的每一层都能够独自对外提供服务。因为每一层的数据都是可查的,所以数据的复用会更好,真正实现数仓分层复用的指标。
  • Hologres 反对数据复用,模型对立,架构简化。通过 Flink+Hologres,就能实现实时数仓分层,简化架构和降低成本。

3、Flink+Hologres 外围能力:Binlog、行列共存、资源隔离

下面讲的 Flink+Hologres 的 Streaming Warehouse 计划,其强依赖于以下三个 Hologres 外围能力:

  1. Binlog:因为实时数仓个别没有 Binlog,但 Hologres 提供了 Binlog 能力,用来驱动 Flink 做实时计算,正因为有了 Binlog,Hologres 能力作为流式计算的上游。
  2. 行列共存。一张表既有行存数据,又有列存数据。这两份数据是强统一的。行列共存的个性让中间层的每张表,岂但可能给 Flink 应用,而且能够给其余利用(比方 OLAP、或者线上服务)应用。
  3. 资源强隔离。实时数仓个别是弱隔离或软隔离,通过资源组、资源队列的办法实现资源隔离。如果 Flink 的资源耗费很大,可能影响中间层的点查性能。但在 Hologres 强隔离的能力下,Flink 对 Hologres Binlog 的数据拉取,不会影响线上服务。

通过 Binlog、行列共存、资源强隔离的三个特点,不仅能让 Flink+Hologres 造成 Streaming Warehouse,并且可能使两头的每层数据复用,被其余利用或线上服务应用,助力企业构建最简略最残缺的实时数仓。

4、基于 Flink+Hologres 的多流合并

接下来,讲一讲基于 Flink+Hologres 的多流合并。

因为 Hologres 有特地弱小的部分更新能力,基于此咱们能够简化 Flink 的多流 Join。比方在风控场景下,咱们须要基于用户 ID 构建用户的多侧面画像,用户画像来自很多数据源,比方客户的浏览行为、成交行为、履约行为等等。把数据源的数据依照用户 ID,把每个用户放到一行里,造成不同的字段,造成用户的残缺画像。

传统的形式须要用 Flink 多流 Join 实现,Flink 把上游的多个数据源关联到一起,Join 后写到 Kafka 里,而后驱动上游的 Flink,加工这行残缺的数据。这就使得多流 Join 十分耗资源。

所以在 Flink+Hologres 的 Streaming Warehouse 计划中,能够利用 Hologres 的部分更新能力,把一张表定为定义成 Hologres 的行存表或行列共存表。此时,整个计划就简化成上游每个数据源,同步数据到 Hologres 表的若干个字段里,若干个工作同时写入这张表,而后利用 Hologres 的部分更新能力,把数据汇总在一起。

如果关上这张 Hologres 表的 Binlog,上游任何数据源的变动都会更新这张表,使这张表的 Binlog 中生成行数据的最新状态,而后驱动上游的 Flink 持续计算,从而完满匹配常见的风控场景。这种用法下,资源耗费、运维都失去了极大的简化。

四、基于 Flink Catalog 的 Streaming Warehouse 实际

Flink+Hologres 的 Streaming Warehouse 计划曾经十分成熟,但惟一的毛病在于,用户须要在两个零碎之间切换,过程比拟繁琐。为了让用户操作更简略,咱们基于 Flink Catalog 提供了更加简略的应用体验。

上面咱们来看看怎么样基于 Flink Catalog 去构建基于 Flink+Hologres 的 Streaming Warehouse。咱们会发现,有了 Flink Catalog 后,整个应用体验会很简略,并能充分发挥 Flink 和 Hologres 两个产品的弱小能力。

下图是一个典型的 Flink+Hologres 实时 ETL 链路:

  1. ODS 层、DWD 层、ODS 层的数据都存在 Hologres 中。
  2. 链路中所有的数据加工都是通过 Flink SQL 实现。在整个 ETL 链路中,用户不须要任何 Hologres SQL,间接写 Flink SQL 即可。
  3. Flink 用户能够通过 Flink SQL 对每层中的 Hologres 数据进行数据探查(流模式和批模式都能够)。比方:当咱们发现 DWS 层的数据后果呈现问题,须要查看哪层的后果有问题或逻辑有谬误。此时,咱们能够复用原来的 Flink SQL 来进行探查、定位或者数据从新生产。
  4. Hologres 中的每层数据都能够对外提供查问和服务(通过 Hologres SQL)。

![]()

接下来,以某个电商场景为例,演示一下基于 Flink Catalog 的 Streaming Warehouse。如下图所示,有一个 MySQL 数据库作为订单库,外面有订单表 orders、订单领取表 orders_pay、以及产品品类表 product_catalog。

  1. 第一步,咱们通过 Flink 的实时数仓,把数据实时同步到 Hologres 里,造成 ODS 层。
  2. 第二步,加工 DWD 层。将 DWD 层的数据写到 Hologres 里。在这个过程中,咱们须要把订单表和订单领取表,合并成一张表,实现多路合并。与此同时,咱们心愿 orders 表关联商品品类表 product_catalog。
  3. 第三步,驱动上游计算,构建 DWS 层。以用户维度和商店维度,收集统计数据。比方用户每天的订单金额和商店每天的订单金额,从而造成一条残缺的链路。
  4. 第四步,将 DWS 层的表举荐给零碎应用。作为用户和商店的特色,用做举荐用处。
  5. 第五步,DWD 层的表可能间接用来做实时统计分析、统计产品、实时大屏、实时报表。

上图中的绿色链路,全副应用 Flink SQL 实现。橙色链路对外提供服务,由 Hologres SQL 实现。

接下来,讲一讲每个步骤是如何运行的。

第一步,在 Flink 实时数仓,造成 ODS 层。首先,创立一个 Hologres 的 Catalog。MySQL 中存储订单、领取以及商品信息 3 张表,通过 Flink Catalog 性能,将 MySQL 整库的数据实时同步至 Hologres,造成 ODS。相干代码如下所示。咱们能够看到,MySQL 整库同步到 Hologres,通过 Flink SQL 来表白是非常简单的。

-- 创立 Hologres Catalog
CREATE CATALOG holo WITH (‘type’=‘hologres’…);

 -- MySQL 整库同步到 Hologres
CREATE DATABASE IF NOT EXISTS holo.order_dw 
AS DATABASE mysql.sw INCLUDING all tables;

第二步,DWD 实时构建。数据实时写入 ODS 层后,Flink 读取 Hologres Binlog,并用多流合并、维表关联将订单、交易、商品 3 个表打成一个大宽表,实时写入至 Hologres 的订单汇总表中,造成 DWD 层。

如下 SQL 是 DWD 层表的建表语句。这张指标表蕴含了来自 orders、orders_pay、product_catalog 的字段,关联了相干的用户信息、商户信息、订单信息、商品品类信息等等,造成了一张宽表。

CREATE TABLE holo.order_dw.dwd_orders (
  order_id bigint not null primary key,
  -- 字段来自 order 表
  order_user_id bigint,
  order_shop_id bigint,
  order_product_id string,
  order_fee numeric(20,2),
  order_create_time timestamp_ltz,
  order_update_time timestamp_ltz,
  order_state int,
  -- 字段来自 product_catalog 表
  order_product_catalog_name string,
  -- 字段来自 orders_pay 表
  pay_id bigint,
  pay_platfrom int,
  pay_create_time timestamp_ltz
) ;

上面的 SQL 是真正的计算逻辑,这里蕴含两个 INSERT 语句:

  • 第一个 INSERT 语句是从 orders 表实时打宽后写入。这里用到了 Hologres 的维表关联能力。实时打宽后,写入指标表的局部字段。
  • 第二个 INSERT 语句是从 orders_pay 表实时同步到同一张指标表,更新另外一些字段。

这两个 INSERT 语句最大的关联在于,它们写的是同一张表,会主动利用指标表的主键 ID 进行关联。每个 INSERT 都是做了指标表的部分更新,两者的合力后果是实时更新的指标宽表。

BEGIN STATEMENT SET;
-- 从 orders 表实时打宽后写入
INSERT INTO holo.order_dw.dwd_orders (
order_id,
order_user_id,
order_shop_id,
order_product_id,
order_fee,
order_create_time,
order_update_time,
order_state,
order_product_catalog_name
)
SELECT 
o.*,
dim.catalog_name
FROM
holo.order_dw.orders o 
LEFT JOIN holo.order_dw.product_catalog 
FOR SYSTEM_TIME AS OF proctime () AS dim
ON
o.product_id = dim.product_id;

-- 从 order_pays 表实时写入
INSERT INTO holo.order_dw.dwd_orders (
pay_id,
order_id,
pay_platform,
pay_create_time
)
SELECT *
FROM
holo.order_dw.orders_pay;
END;

第三步,DWS 层的实时聚合。在 DWD 的根底上,通过 Flink 读取 Hologres DWD 的 Binlog 数据,进行实时指标聚合计算,比方依照用户维度聚合,依照商户维度聚合等,而后实时写入 Hologres,造成 DWS 层。

  • 先是创立对应的聚合指标表,DDL 语句如下
-- 用户维度聚合指标表
CREATE TABLE holo.order_dw.dws_users (
  user_id bigint not null,
  ds string not null,
  -- 当日实现领取总金额     payed_buy_fee_sum numeric(20,2) not null,      
  primary key(user_id,ds) NOT ENFORCED
);
 -- 商户维度聚合指标表
CREATE TABLE holo.order_dw.dws_shops (
  shop_id bigint not null,
  ds string not null,
  -- 当日实现领取总金额
  payed_buy_fee_sum numeric(20,2) not null,     
  primary key(shop_id,ds) NOT ENFORCED
);
  • 而后将数据写入 Hologres 中,通过简略的三步后,Flink SQL 构建了残缺的 Streaming Warehouse 分层体系。
-- 数据写入 Hologres BEGIN STATEMENT SET;
INSERT INTO holo.order_dw.dws_users
SELECT 
  order_user_id,
  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
  SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL 
AND order_fee IS NOT NULL
GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
 INSERT INTO holo.order_dw.dws_shops
SELECT 
  order_shop_id,
  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
  SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL 
AND order_fee IS NOT NULL
GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
END;

第四步,构建利用,基于 DWS 层,对外提供服务。

数据的分层和加工实现后,业务就能够通过 Hologres 查问数据并利用。在这个例子里,举荐零碎要求十分高的点查性能,所以要求百万级的 QPS 查看能力。Hologres 的行存表或者行列共存表齐全能够满足。

这个计划和传统的实时数仓最大的差异是:传统的实时数仓只有最初一层的数据,可对外提供服务。而在 Hologres 里,DWD 等中间层数据也能够对外提供服务,进行实时报表统计。用户能够在中间层进行查问操作,对接各种实时利用、实时大屏。比方

  • 间接查 DWD 层的数据,典型的如依据用户 ID 返回举荐商品(KV 场景)
-- 场景 4: 依据用户特色举荐商品
SELECT * 
FROM dws_users 
WHERE 
user_id = ? 
AND ds = '2022-11-09’;

-- 场景 4: 依据店铺特色举荐商品

SELECT * 
FROM dws_shops 
WHERE 
shop_id = ? 
AND ds = '2022-11-09’;
  • 实时报表查看订单量和退单量(OLAP)。
-- 场景 6: 基于宽表数据展现实时报表
-- 最近 30 天, 每个品类的订单总量和退单总量
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD'),
order_product_catalog_name,
COUNT(*),
COUNT(CASE WHEN refund_id IS NOT NULL THEN 1 ELSE null END)
FROM
dwd_orders
WHERE
order_create_time > now() - '30 day' :: inteval
GROUP BY
1, 2
ORDER BY
1, 2;

第五步,问题排查:Flink 数据探查。如果某个业务指标出现异常,Flink 能够间接探查每层表的数据来疾速定位。比方用 Flink 探查 Hologres DWD 层的 orders 表。Hologres 反对 Flink 的流模式和批模式对数据的探查。

因为流模式是 Flink 的默认模式,因而咱们不须要设置执行模式。它能够间接记录数据变动,从而十分不便的查看数据异样。流模式能够探查获取一段时间范畴内的数据及其变动状况。

-- 流模式探查 
SELECT 
  * 
FROM holo.order_dw.dwd_orders 
/*+ OPTIONS('cdcMode'='false', 'startTime'='2022-11-09 00:00:00') */ c 
WHERE 
  user_id = 0;

与此同时,批模式探查是获取以后时刻的最新数据。Hologres 也反对 Flink 批模式的数据探查。批模式和流模式的区别在于,流模式关注的是变动,批模式关注的是表中的最新状态。

-- 批模式探查 
set 'execution.runtime-mode'='batch’;

SELECT 
  * 
FROM 
  holo.order_dw.dwd_orders 
WHERE 
  user_id = 0
  AND order_create_time>'2022-11-09 00:00:00';

五、总结

Hologres 跟 Flink 深度集成。实现残缺的 Streaming Warehouse 计划,该计划有如下显著劣势:

  1. 一站式:全链路都能够用 SQL 示意,并且只须要用到 Flink 和 Hologres 两个组件,操作十分不便。实时 ETL 链路、数据分层齐全能够用 Flink SQL 实现,Hologres 提供对外提供在线服务和 OLAP 查问,每层数据可复用、可查,不便构建实时数仓的数据分层和复用体系。
  2. 高性能:这种计划能够使得使得 Hologres 施展极致的实时写入、实时更新能力和多维 OLAP、高并发点查能力,Flink 施展实时加工能力。
  3. 企业级:自带多种企业级能力,不仅运维更简略,可观测性更好,平安能力更强,也提供多种高可用能力,从而企业更加不便的构建企业级的 Streaming Warehouse。
退出移动版