关于后端:巴别时代基于-Apache-Paimon-的-Streaming-Lakehouse-的探索与实践

52次阅读

共计 13942 个字符,预计需要花费 35 分钟才能阅读完成。

摘要:本文次要介绍巴别时代基于 Apache Paimon(Incubating) 构建 Streaming Lakehouse 的生产实践经验。咱们基于 Apache Paimon(Incubating) 构建 Streaming Lakehouse 的落地实际次要分为三期:

第一期是在调研验证的根底上进行数仓分层,并且上线一些简略的业务验证成果;第二期是实现流式数仓的基础设施建设,以便优先替换以后基于 Apache Kafka 构建的实时数仓;第三期次要是欠缺 Paimon 的生态建设,包含数据资产、数据服务等平台服务建设,次要指标是提供残缺的基于 Apache Paimon(Incubating) 端到端的平台服务能力。目前根本实现第一期的数仓分层,同时进行数据品质验证,根本能够满足业务需要。

咱们基于 Apache Paimon(Incubating) 构建 Streaming Lakehouse 的落地实际次要分为三期:第一期是在调研验证的根底上进行数仓分层,并且上线一些简略的业务验证成果;第二期是实现流式数仓的基础设施建设,以便优先替换以后基于 Apache Kafka 构建的实时数仓;第三期次要是欠缺 Paimon 的生态建设,包含数据资产、数据服务等平台服务建设,次要指标是提供残缺的基于 Apache Paimon(Incubating) 端到端的平台服务能力。目前根本实现第一期的数仓分层,同时进行数据品质验证,根本能够满足业务需要。

点击进入 Apache Paimon 官网

1. 业务背景

基于 Apache Kafka 构建的实时数仓过程中咱们遇到一些痛点,例如中间层数据不可剖析,数据保留工夫短等问题,同时咱们的实时数仓是基于 Flink+Kafka+Redis+ClickHouse 构建的,难以查问和剖析 Kafka 的中间层数据和 Redis 的维表数据。目前只有 ADS 层数据最终写入到 ClickHouse 里能力剖析,然而 ClickHouse 对于数据更新反对的不是很好,所以咱们须要通过写入反复数据的形式以达到更新的成果,ClickHouse 去重表执行操作也是异步的,这就须要在业务端进行数据去重,大大增加了业务 SQL 的复杂度,也有肯定水平的性能损耗,并且 ClickHouse 不反对事务,很难做到 Flink 到 ClickHouse 端到端的数据一致性保障。

基于以上痛点,咱们心愿可能借助当下比拟风行的数据湖存储计划简化咱们的数仓架构,进步数据分析的效率,升高数据存储和开发成本,最终抉择 Apache Paimon 作为湖仓底座,次要是基于以下几个方面的考量:

  • Apache Paimon(Incubating) 基于 LSM 的弱小的数据更新能力正是咱们须要的,基于 PK 进行数据更新以及 Partial Update 的局部更新和 Aggregate 表的预聚合能力可能大大简化咱们的业务开发的复杂度。
  • Apache Paimon(Incubating) 过后作为 Apache Flink 的子项目,对于 Flink 集成的成熟度也是咱们所考量的,Apache Paimon(Incubating) 反对所有的 Flink SQL 语法,对于 Flink 集成的优先反对是较其余数据湖框架劣势的中央。
  • Flink Forward Asia 2021 的主题演讲里,Apache Flink 中文社区发起人王峰老师提出流式数仓的概念,即整个数仓的数据全副实时流动起来,Paimon 就是在此背景下推出的流批一体的存储,是 Flink 在推动流批一体演进中存储畛域上的重要一环,流式数仓作为新型数仓架构演进的一种计划,而 Paimon 作为流式湖仓的标杆,毋庸置疑成为构建流式数仓的首选,随着社区一直倒退和框架自身的成熟,Paimon 将成为 Streaming Lakehouse 畛域的规范。
  • 调研测试过程中发现之前遇到的业务问题和需要通过在社区群中发问,可能失去社区各位老师的急躁答疑,反馈的相干问题可能失去社区的疾速响应和 Bug 修复,促成最终抉择 Apache Paimon(Incubating) 计划,打消应用 Apache Paimon(Incubating) 的诸多疑虑。

在此特地鸣谢之信老师、晓峰老师以及 Paimon 社区的各位开发者的反对。

2. 数仓架构

目前咱们实现基于 Paimon 的数仓分层的设计,包含 ODS,DWD,DIM 层的搭建以及 DWS 层一些业务模型的建设,整体架构如下:

2.1 数据起源

咱们的数据源次要包含前端和后端打点日志以及业务数据库的 Binlog,打点日志按我的项目通过 Filebeat 采集到 Kafka 对应的 Topic, 而后通过 Flink SQL 同步到湖仓的 ODS 层,业务库的数据通过 FlinkCDC 整库同步到湖仓的 ODS 层的 Paimon 表。

2.2 湖仓建设

湖仓次要基于 Apache Paimon(Incubating) 构建,各层都是通过 Flink SQL 进行数据的准实时同步。ODS 层采纳 Paimon 的 Append Only 表,保留数据原貌不做更新。DIM 层采纳 Paimon 的 PK 表,局部维表须要应用 Partial Update 能力保留最新的维表数据。

DWD 层也采纳 Paimon 的 PK 表,ODS 层的表数据经由 Flink SQL 做 ETL 荡涤,并通过 Retry Lookup Join 关联维表拉宽后写入到 DWD 层对应的 Paimon 表里,因为维表数据可能晚于事实数据达到湖仓,存在 Join 不上的状况,所以这里须要减少重试机制。DWS 层次要是分主题进行数仓建模,目前次要采纳 Paimon 的 Agg 表进行一些预聚合模型及大宽表的建设,ADS 层次要将 DWS 层的后果数据和 DWD 层的一些明细表数据流读到 ClickHouse 在线零碎,提供在线服务应用。

2.3 在线零碎

通过 Flink SQL 将 DWS 层的后果数据和 DWD 的一些明细表数据近实时地流读到 ClickHouse 在线零碎进行 OLAP 剖析,提供 BI 实时报表,大屏展现以及用户行为剖析零碎等应用,同时扩大 Paimon 的 Presto 连接器,数据分析师能够应用 Presto 引擎进行 Adhoc 查问和数据捞取工作。

2.4 平台服务

咱们的湖仓目前应用 Paimon 的 Hive Catalog, 基于 HMS 做元数据的对立治理,其中数据开发是基于 Dinky 做得二次开发,应用 Dinky 在 Flink SQL 开发这块儿的能力,数据指标基于不同类型的游戏进行梳理,以便构建对立的指标体系。前面思考基于 Paimon 构建数据资产以及数据服务。

3. 生产实践

介绍业务生产实践之前,首先介绍一些 Paimon 的正确应用姿态,以便更好了解以下的业务建表实际。

Merge Engine

指定 Merge Engine 的作用是把写到 Paimon 表的多条雷同 PK 的数据合并为一条,用户能够通过 merge-engine 配置项抉择以何种形式合并同 PK 的数据。

Paimon 反对的 Merge Engine 包含:

  • deduplicate:如果用户建表时不指定 merge-engine 配置, 创立的 PK 表默认的 Merge Engine 是 deduplicate 即只保留最新的记录,其余的同 PK 数据则被抛弃,如果最新的记录是 DELETE 记录,那么雷同 PK 的所有数据都将被删除。
  • partial-update:如果用户建表时指定 ’merge-engine’ = ‘partial-update’,那么就会应用局部更新表引擎,能够做到多个 Flink 流工作去更新同一张表,每条流工作只更新一张表的局部列,最终实现一行残缺的数据的更新,对于须要拉宽表的业务场景,partial-update 非常适合此场景,而且构建宽表的操作也绝对简略。这里所说的多个 Flink 流工作并不是指多个 Flink Job 并发写同一张 Paimon 表,这样须要拆分 Compaction 工作,就不能在每个 Job 的 Writer 端做 Compaction, 须要一个独立的 Compaction 工作,比拟麻烦。目前举荐将多条 Flink 流工作 UNION ALL 起来,启动一个 Job 写 Paimon 表。这里须要留神的是,对于流读场景,partial-update 表引擎须要联合 Lookup 或者 full-compaction 的 Changelog Producer 一起应用,同时 partial-update 不能接管和解决 DELETE 音讯,为了防止接管到 DELETE 消息报错,须要通过配置 ‘partial-update.ignore-delete’ = ‘true’ 疏忽 DELETE 音讯。
  • aggregation:如果用户建表时指定 ‘merge-engine’ = ‘aggregation’,此时应用聚合表引擎,能够通过聚合函数做一些预聚合,每个除主键以外的列都能够指定一个聚合函数,雷同主键的数据就能够依照列字段指定的聚合函数进行相应的预聚合,如果不指定则默认为 last-non-null-value,空值不会笼罩。Agg 表引擎也须要联合 Lookup 或者 full-compaction 的 Changelog Producer 一起应用,须要留神的是除了 SUM 函数,其余的 Agg 函数都不反对 Retraction,为了防止接管到 DELETE 和 UPDATEBEFORE 消息报错,须要通过给指定字段配置 ‘fields.${field_name}.ignore-retract’=’true’ 疏忽。

Changelog Producer

Changelog 次要利用在流读场景,在数仓各层的建设过程中,咱们须要流读上游的数据写入到上游,实现各层之间的数据同步,做到让整个数仓的数据全实时地流动起来。如果上游流读的 Source 是业务库的 Binlog 或者 Kafka 等音讯零碎的音讯,间接生成残缺的 Changelog 以供流读的。

然而目前数仓分层是在 Paimon 里做的,数据以 Table Format 的模式存储在文件系统上,如果上游的 Flink 工作要流读 Paimon 表数据,须要存储帮忙生成 Changelog(老本较低,但提早绝对较高),以便上游流读的,这时就须要咱们在建表时指定 Paimon 的 Changelog Producer 决定以何种形式在何时生成 Changelog。如果不指定则不会在写入 Paimon 表的时候生成 Changelog,那么上游工作须要在流读时生成一个物化节点来产生 Changelog。这种形式的老本绝对较高,同时官网不倡议这样应用,因为上游工作在 State 中存储一份全量的数据,即每条数据以及其变更记录都须要保留在状态中。

Paimon 反对的 Changelog Produer 包含:

  • none:如果不指定,默认就是 none,老本较高,不倡议应用。
  • input:如果咱们的 Source 源是业务库的 Binlog,即写入 Paimon 表 Writer 工作的输出是残缺的 Changelog,此时可能齐全依赖输出端的 Changelog, 并且将输出端的 Changelog 保留到 Paimon 的 Changelog 文件,由 Paimon Source 提供给上游流读。通过配置 ‘changelog-producer’ = ‘input’,将 Changelog Producer 设置为 input。
  • lookup:如果咱们的输出不是残缺的 Changelog, 并且不想在上游流读时通过 Normalize 节点生成 Changelog, 通过配置 ‘changelog-producer’ = ‘lookup’,通过 Lookup 的形式在数据写入的时候生成 Changelog,此 Changelog Produer 目前处于试验状态,暂未通过大量的生产验证。
  • full-compaction:除了以上几种形式,通过配置 ‘changelog-producer’ = ‘full-compaction’ 将 Changelog Producer 设置为 full-compaction,Writer 端在 Compaction 后产生残缺的 Changelog,并且写入到 Changelog 文件。通过设置 changelog-producer.compaction-interval 配置项管制 Compaction 的距离和频率,不过此参数打算弃用,倡议应用 full-compaction.delta-commits,此配置下默认为 1 即每次提交都做 Compaction。

Append Only Table

建表时配置 ‘write-mode’ = ‘append-only’,用户能够创立 Append Only 表。Append Only 表采纳追加写的形式,只能插入一条残缺的记录,不能更新和删除,也无需定义主键。Append Only 表次要用于无需更新的场景,例如 ODS 层数据将 Kafka 埋点日志数据解析后写入到 Paimon 表,保留原貌不做任何更新,此时举荐采纳 Paimon 的 Append Only 表。

须要留神的是因为 Append Only 表没有主键,用户必须指定 bucket-key,否则采纳整行数据做 Hash 效率偏低。

3.1 ODS 层入湖

3.1.1 业务库数据入湖

业务库数据入湖,咱们应用的是 FlinkCDC 的整库同步,目前是基于 Dinky 实现的 FlinkCDC 到 Paimon 的整库同步能力(这里要特地鸣谢文末老师的反对),能够主动建表,多表或整库同步业务库数据到 Paimon 的对应库。因为咱们是每个我的项目一个业务库,所以在 Paimon 中也是按我的项目建库,与 MySQL 中业务库对应,以下是局部我的项目的图示:

入湖 SQL:

上面以一个我的项目的入湖 SQL 为例:

EXECUTE CDCSOURCE cdc_demo WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'username',
    'password' = 'password',
    'checkpoint' = '30000',
    'scan.startup.mode' = 'initial',
    'source.server-time-zone' = 'Asia/Tokyo',
    'parallelism' = '4',
    'database-name' = 'demo',
    'sink.connector' = 'sql-catalog',
    'sink.catalog.name' = 'fts_hive',
    'sink.catalog.type' = 'fts_hive',
    'sink.catalog.uri' = 'thrift://localhost:9083',
    'sink.bucket' = '4',
    'sink.snapshot.time-retained' = '24h',
    'table-list' = 'A01,A02,A03,A04,A05',
    'sink.changelog-producer' = 'input',
    'sink.catalog.warehouse' = 'hdfs://cluster/warehouse/table_store',
    'sink.sink.db' = 'fts_ods_db_demo'
);

FlinkCDC 整库同步目前还是基于单表单 Task 的模式, 执行成果如下所示:

3.1.2 日志数据入湖

日志入湖是通过 Flink SQL 将 Kafka 中的日志数据同步到 ODS 层的 Paimon 表,ODS 层埋点日志没有确定类型,防止因为类型转换过滤掉数据,这里以用户登录日志为例,介绍日志数据入湖:

入湖 SQL:

--CREATE TABLE
create table t_ods_table(
      ......
    gn string,
    dt string 
 ) partitioned by (gn,dt) 
WITH (
    'bucket' = '8',
    'bucket-key' = 'id',
    'write-mode' = 'append-only', -- 创立 Append Anly 表
    'snapshot.time-retained' = '24h'
);

--INSERT
create table default_catalog.default_database.role_login (
    message string,
    fields row < project_id int,
    topic string,
    gn string >
) with (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = '${kafka_server}',
    'properties.group.id' = 'topic_group',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);
insert into
    fts_ods_log.t_ods_table
select
      ......
      cast(SPLIT_INDEX(message, '|', 5) as int) log_create_unix_time,
    fields.gn gn,
    FROM_UNIXTIME(cast(SPLIT_INDEX(message, '|', 5) as int),
        'yyyy-MM-dd'
    ) dt
from
    default_catalog.default_database.role_login
where
  try_cast(SPLIT_INDEX(message, '|', 5) as int) is not null
  and cast(SPLIT_INDEX(message, '|', 5) as int) between 0 and 2147483647;

日志数据入湖的执行成果如下所示:

3.2 DIM 层入湖

DIM 层数据次要是将 ODS 层多个业务库的雷同表的数据同步到 DIM 层对应的表,比方 fts_ods_db_A 和 fts_ods_db_B 都有同名的表 A01,须要 ODS 不同业务库中同名的表同步 DIM 层的 fts_dim 库中的 t_dim_A01 表中,该表的更新频率较低且数据量较小。也有的是业务库表和日志表数据通过 Partial Update 能力拉宽后造成的维表。这里以 tdim_A01 表为例,介绍 DIM 层数据入湖:

入湖 SQL:

--CREATE TABLE
create table t_dim_A01(
    ......
    gn string,
    PRIMARY KEY (gn,lid) NOT ENFORCED
) WITH (
    'bucket' = '4',
    'snapshot.time-retained' = '24h'
);
--INSERT
insert into
    fts_dim.t_dim_A01
select
    'AA' as gn,
    ......
from
    fts_ods_db_A.A01
union all
select
    'BB' as gn,
    ......
from
    fts_ods_db_B.A01
......

3.3 DWD 层入湖

DWD 层数据入湖是通过 Flink SQL 荡涤过滤,关联维表后造成宽表写入到 DWD 层的 Paimon 表。维表也是在 Paimon 中,所以这里很不便通过 Lookup Join 关联维表,因为维表数据可能会晚于事实表数据达到 Paimon, 所以应用 Retry Lookup Join, 如果事实表一开始关联不上维表,能够减少一些重试,以便可能关联上维表数据,这里以用户登录表为例。

入湖 SQL:

--CREATE TABLE
create table t_dwd_table(
    ......
    id string,
    gn string,
    dt string,
    PRIMARY KEY (gn, id, log_create_unix_time, dt) NOT ENFORCED
) partitioned by (gn, dt) WITH (
    'bucket' = '8',
    'bucket-key' = 'id',
    'changelog-producer' = 'full-compaction',
    'changelog-producer.compaction-interval' = '54s',
    'snapshot.time-retained' = '24h'
);
--INSERT
create view default_catalog.default_database.t_table_view as (
    select
        ......
        PROCTIME() proc_time,
        gn,
        dt
    from
        fts_ods_log.t_ods_table
    where
        AA is not null
        and try_cast(BB as int) is not null
        and try_cast(CC as int) is not null
)
insert into
    fts_dwd.t_dwd_table
select
    /*+ LOOKUP('table'='fts_dim.t_dim_A01', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30'),
     LOOKUP('table'='fts_dim.t_dim_A02', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30'),
     LOOKUP('table'='fts_dim.t_dim_A03', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30')*/
    ......
    cast(d.open_date_time as int) open_date_time,
    cast(d.merge_server_time as int) merge_server_time,
    CONCAT(a.aa, a.bb) id,
    a.gn,
    a.dt
from
    default_catalog.default_database.t_table_view as a
    left join fts_dim.t_dim_A01 for SYSTEM_TIME AS OF a.proc_time as b on a.AA = b.AA
    and a.BB = b.BB
    left join fts_dim.t_B01 for SYSTEM_TIME AS OF a.proc_time as c on a.AA = c.AA
    and a.BB = c.BB
    left join fts_dim.t_dim_C01 for SYSTEM_TIME AS OF a.proc_time as d on a.AA = d.AA
    and a.BB = d.BB;

3.4 DWS 层入湖

DWS 次要是分主题,按不同的维度进行聚合,咱们也有一些宽表须要有聚合的列,也放在 DWS 层构建。这里以角色域的一张角色宽表为例,介绍 DWS 层应用 Paimon 的 Agg 表做预聚合的场景。

角色宽表的建表语句如下:

CREATE TABLE t_dws_role(
    ......
    gn string,
    id bigint,
    aa int,
    bb int,
    acc int,
    PRIMARY KEY (gn, id) NOT ENFORCED
) WITH (
    'bucket' = '16',
    'bucket-key' = 'id',
    'merge-engine' = 'aggregation', -- 指定应用 Agg 表引擎
    'changelog-producer' = 'full-compaction', -- 指定 changelog producer 为 full-compaction
    'changelog-producer.compaction-interval' = '40s', -- 指定 campaction 的距离为 40 秒
    'fields.aa.aggregate-function' = 'max',
    'fields.bb.aggregate-function' = 'min',
    'fields.acc.aggregate-function' = 'sum',
      'fields.aa.ignore-retract' = 'true', -- 疏忽掉 retract, 防止接管到 DELETE 音讯出错
     ......
    'snapshot.time-retained' = '24h' -- 指定 snapshot 文件保留 24 小时
);

角色宽表须要由 DWD 层的多张表关联生成。基于 Paimon 的 Agg 表引擎创立,PK 为 gn + id 形成的联结主键,只有咱们须要关联的表与角色表有雷同的主键就能够很不便的做到局部更新和预聚合,所以角色表,登录登出表等都比拟好解决,然而注册表和前端日志表是没有 roleid 的,没法间接写入角色宽表做解决,因为主键不同。

这里咱们首先想到是不是能够通过流 Join 的模式,别离给注册表和前端日志表添上 roleid, 这样就与角色表有雷同的 PK, 就能够更新和聚合了。然而流 Join 的形式须要保留的状态就绝对要大。

所以咱们最终是通过 Paimon 的 Partial Update 将注册表和前端日志表做成一张维表,而后 Flink SQL 流读 DWD 层角色表写入角色宽表的时候和维表做 Lookup Join, 最终补全一些字段。因为注册表和前端日志表的数据都可能先于角色表的数据达到 Paimon,所以须要用 Retry Lookup Join,保障可能 Join 上。

还有须要做非凡解决的就是订单表,比方角色宽表中有累积付费字段,来自于订单表,每个角色的累积付费须要用订单表中的充值金额做 SUM 聚合,然而订单表可能呈现反复数据,比方发现订单数据有问题或是缺数,都可能在 CDC 端进行重跑来修复或补全数据,因为 DWD 层的订单表是 PK 表,过去反复数据就会在 changelog 文件中保留 -U 和 +U 的记录,这样 Flink SQL 流读订单表写到角色宽表做聚合时,过去反复的数据就会反复求和,计算结果就不准了,这里应用 audit_log 零碎表过滤 Changelog, 只有 +I 的记录,过滤掉 -U 和 +U 的记录,疏忽掉更新的订单数据,这样也会存在肯定的问题,比方失落订单的更新数据,不过充值金额个别是极少更新的。

audit_log 零碎表应用示例:

SELECT * FROM MyTable$audit_log where rowkind='+I'

上面先介绍下注册表和 foreend 表通过 Partial Update 构建维表的示例:

建表语句:

CREATE TABLE t_dim_table (
    gn string,
    ......
    PRIMARY KEY (gn, id) NOT ENFORCED
) WITH (
    'bucket' = '8',
    'bucket-key' = 'id'
    'merge-engine' = 'partial-update', -- 指定 merge engine 为局部更新列
    'changelog-producer' = 'full-compaction', -- 指定 changelog producer 为 full-compaction
    'changelog-producer.compaction-interval' = '48s', --compaction 距离 48 秒
    'snapshot.time-retained' = '24h',
      'partial-update.ignore-delete' = 'true' -- 疏忽 DELETE 数据
);

插入 SQL:

-- 省略 Flink SQL 参数设置
INSERT INTO 
        fts_dim.t_dim_table
SELECT 
    gn,
    id,
    ......
    CAST(NULL AS STRING),
    CAST(NULL AS STRING),
    unix_timestamp()
FROM
      fts_dwd.t_dwd_table
UNION ALL
SELECT 
      gn,
      id,
    CAST(NULL AS STRING),
    CAST(NULL AS INT),
    ......
    unix_timestamp()
FROM
      fts_dwd.t_dwd_foreend 
WHERE pid <> '0'

执行成果如下所示:

能够看到,在数据写入频率比拟高,Compaction 工夫距离设置的比拟短的时候,Writer 端存在肯定的压力,常常处于 Busy 状态。

角色宽表入湖 SQL:

-- 省略 Flink SQL 参数设置
CREATE view default_catalog.default_database.t_view AS
SELECT
    ......
    id,
    gn,
    PROCTIME() AS proc_time
FROM
    fts_dwd.t_dwd_table
-- 插入数据
INSERT INTO
    fts_dws.t_dws_role
SELECT
    ......
    id,
    CAST(NULL AS STRING),
    unix_timestamp(),
FROM
    (
        SELECT
            /*+ LOOKUP('table'='fts_dim.t_dim_register', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30')*/
            a.*,
            b.aa,
            b.bb
        FROM
            (
                  SELECT
                    *
                FROM
                    default_catalog.default_database.t_view
            ) AS a
            LEFT JOIN fts_dim.t_dim_table for SYSTEM_TIME AS OF a.proc_time AS b ON a.AA = b.AA
            AND a.BB = b.BB
    )
UNION ALL
SELECT
    ......
    createt_time,
    unix_timestamp(),
    CAST(NULL AS INT)
FROM
    fts_dwd.`t_dwd_o_table$audit_log`
WHERE
    rowkind = '+I'
UNION ALL
......

宽表入湖执行成果如下:

能够看到,第一个 Source 因为用到了 Retry Lookup Join, 前面来的数据在排队,须要等后面的数据 Join 上或是重试次数用完,前面的数据才会解决,很多状况下这个节点处于 Busy 状态,导致效率很低。

3.5 ADS 层流读到 ClickHouse

--CREATE TABLE
CREATE TABLE t_role (
      gn string,
      id string,
    ......
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://localhoust:8123',
    'database-name' = 'streaming_warehouse',
      'table-name' = 't_role_all',
    'use-local' = 'false',
      'is-changelog' = 'true',
      'sink.batch-size' = '5000',
    'sink.flush-interval' = '2000',
    'sink.max-retries' = '3',
    'username' = 'username',
    'password' = 'password'
);
-- INSERT
INSERT INTO
    t_role
SELECT
    *
FROM
    fts_dws.t_dws_role
WHERE
    is_role = 1

综上所述,基于 Apache Paimon(Incubating) 构建 Streaming Lakehouse 可能解决咱们以后实时数仓中间层数据不可剖析,保留工夫短,问题排查艰难、数据更新解决简单等痛点,并可能将 Hive 离线数仓 T+1 的提早放大到分钟级,上线后将会优先替换实时数仓,前期缓缓赋能离线业务,用流批一体的计算引擎 + 流批一体的存储做到真正的流批一体的湖仓体验。

4. 实际总结

  • 应用 full-compaction Changelog Producer 时,changelog-producer.compaction-interval 和 checkpoint interval 设置值较小,比方一分钟以下时,Writer 端在写入数据和 Compaction 时的压力较大,须要较大的资源,如果工作暂停一段时间,再从 Savepoint 复原时,Writer 端反压重大,需一直调整资源。前面思考测试 lookup Changelog Producer,和 full-compaction Changelog Producer 比照,测试是否能够满足生产环境低提早流读场景需要。
  • 当数仓某一层的数据呈现问题,须要通过 Time Travel 从新读取某个快照或是某个工夫点开始的数据修复问题,此时须要 Snaphot 文件保留工夫可能满足问题回溯周期,然而目前 Checkpoint Interval 设置较小,写入数据提早较小,导致 Snapshot 保留工夫越长,生成越多的小文件,存在小文件过多的问题。
  • 目前的 Retry Lookup Join 是有序的,如果后面一条数据始终 Join 不上,那么前面来的数据也会排队,并不会解决,须要始终等到后面的数据 Join 上维表数据或重试次数用完,这样造成数据处理效率很低,此时如果应用 Unordered Output,则须要 Paimon 实现异步 Lookup Join,目前社区正在反对:(https://github.com/apache/incubator-paimon/issues/848)。

5. 将来布局

  • 欠缺基于 Apache Paimon(Incubating) 的流式数仓的建设。
  • 优化 Presto 查问,帮忙基于 Paimon 进行即席查问发挥作用。
  • 目前 Paimon 须要对接 BI 报表或是剖析零碎的数据都是当时流读到 ClickHouse, 而后再于 ClickHouse 进行可视化展现,前面思考是否间接在 Paimon 里预聚合完后果数据,与前端交互间接查问 Paimon,缩小数据处理链路以及升高复杂度。
  • 欠缺基于 Apache Paimon(Incubating) 的平台服务建设。

6. Paimon 信息

  • 官方网站:https://paimon.apache.org/
  • Github 我的项目:https://github.com/apache/incubator-paimon (欢送大家 star&fork 反对)
  • 钉钉交换群:10880001919 Apache Paimon 交换群

作者简介

石在虎,大数据研发工程师,专一于实时计算,对数据湖和数据集成有着浓重的趣味

史亚光,大数据平台工程师,专一于数据集成与平台开发,对流式数仓和数据湖有着浓重的趣味

点击进入 Apache Paimon 官网


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

正文完
 0