关于后端:Apache-Paimon-在同程旅行的探索实践

35次阅读

共计 8012 个字符,预计需要花费 21 分钟才能阅读完成。

摘要:本文次要介绍 Apache Paimon 在同程旅行的生产落地实践经验。在同程旅行的业务场景下,通过应用 Paimon 替换 Hudi,实现了读写性能的大幅晋升(写入性能 3.3 倍,查问性能 7.7 倍),接下来将分为如下几个局部进行具体介绍:

  1. 湖仓场景现状和遇到的问题
  2. 遇见 Apache Paimon
  3. Apache Paimon 的利用实际
  4. 问题发现和解决
  5. 将来布局

点击查看更多技术内容

一、湖仓场景现状和遇到的问题

随着公司业务倒退,实时性业务需要越来越多,2021 年开始逐渐调研并引入湖仓架构,联合过后数据湖架构,最终咱们抉择 hudi 作为湖仓底座。通过外部自研数据集成能力可能一键将外部 base 层的 binglog 数据导入到湖仓内,逐渐代替了基于 hive 实时同步,凌晨合并的形式;另外还联合湖上的流读能力,通过增量读的形式将增量后果合并到 DWD 层;以及联合 flink 窗口计算实现了大量实时报表的革新,极大进步了数据时效性,同时也节俭了大量批处理合并计算资源。

然而随着工作和场景的增多,基于 hudi 的湖仓逐步暴露出了一些问题,让咱们不得不从新思考湖仓架构,以及后续演进方向。

1.1 湖仓利用现状

目前外部数据湖场景次要利用于以下几个场景:

  1. 数据库 base 层入湖,晋升 ods 层时效性
  2. 利用湖增量能力,构建上游 dwd 层,节俭计算资源
  3. 利用湖上部分更新能力,构建实时统计视图和报表
  4. 利用湖近实时更新能力,构建实时监控场景

整体架构如下:

利用湖仓的各项能力,咱们将 ODS 后置批处理工夫提前了近 1 小时,同时两头过程的计算存储老本也极大缩小。不过同时也遇到了不少问题,在基于 Hudi 湖仓的实际过程中咱们遇到的问题次要集中在写入性能,查问性能,资源耗费等方面。

1.2 湖仓写入性能问题

Apache Hudi 提供了两种写入模式 COW 和 MOR,COW 人造存在写入瓶颈,这里次要应用 MOR 类型,为了方便管理,同时开启工作异步 compact(5 个 commit/ 次)。

尽管 Hudi 应用类 LSM 模式进行数据写入与合并,不过有区别于 LSM 的 SSTable,合并过程全读全写,即便只变更了其中一条数据,也须要整个文件进行读取合并,这就造成 compact 过程须要比拟大的内存。尤其当存在热点数据时,工作须要从一开始便保留足够的资源来应答突增的大流量数据,从而造成肯定的内存资源节约。以下是一个 Hudi 入湖工作的资源配比状况:

<center>(上图为运行容器数)</center>

<center>(上图为容器资源配比)</center>

1.3 湖仓查问性能问题

咱们次要利用基于表主键的 bucket 索引,因为湖仓做到了近实时,所以带来了更多的点查场景,Hudi 利用分区和主键下推到查问引擎后可能剪枝掉大量的分区和文件,不过单 bucket 内依然须要 scan 整个文件来定位到具体的主键数据,点查性能略显吃力,联合 MOR 查问时的合并流程(如写入流程所形容)点查性能很难晋升,以下是基于 Hudi 的点查,耗时 21s。

<p><center>(上图为基于 Hudi 的点查耗时状况)</center></p>

最初是写入资源压力,咱们的湖仓次要架设在 HDFS 之上,大量上线湖仓工作之后 HDFS 的 IO 压力也逐渐升高,这与 Hudi 写入原理无关。

1.4 老本绝对较高

实时工作运行资源老本高,Hudi 有较多的调优参数,用户上手老本高,外部推广难,晚期 Hudi 与 Spark 强绑定,前期解耦后,Flink 集成呈现了不少问题,优化老本高。

综上所述,咱们在湖仓场景下面临的问题总结如下:

  • MOR 类型表写入工作并行度和资源资源配置过高,造成资源节约
  • 点查性能难以优化,不能很好的满足需要
  • 因为合并带来的存储 IO 压力变大

二、遇见 Apache Paimon

彼时还叫 Flink Table Store,现在胜利晋升为 Apache 孵化我的项目 Apache Paimon,官网地址:Apache Paimon,首次接触在 FLIP-188: Introduce Built-in Dynamic Table Storage – Apache Flink – Apache Software Foundation 中,就被基于原生 LSM 的写入设计以及 universal compaction 深深吸引,便继续关注,在 0.2 版本公布后咱们开始接入测试应用。

2.1 Apache Paimon 简介

Apache Paimon(incubating) is a streaming data lake platform that supports high-speed data ingestion, change data tracking and efficient real-time analytics

Apache Paimon 是一款反对高吞吐数据摄入,变更跟踪,高效剖析的数据湖平台。以下是官网的架构图

Apache Paimon 底层存储利用 LSM 构造,反对多分布式存储系统,且兼容当下所有支流的计算引擎(Flink,spark,hive,Trino),文件构造组织相似 Iceberg,绝对 Hudi 来说更加简略和容易了解:

同时涵盖了湖技术目前咱们特地关注的几大个性:

  • 近实时高效更新
  • 部分更新
  • 增量流读
  • 全增量混合流读
  • 多云存储反对
  • 多查问引擎反对
  • 特地的 Lookup 能力
  • CDC 摄入(进行中)
  • 构造演进(进行中)

2.2 基于 Apache Paimon 优化成果

写入性能和资源耗费方面,雷同的表(均开启异步 Compact)基于 Apache Paimon 的资源应用状况如下:

<p><center>(上图为 Apache Paimon 写入容器数)</center></p>

<p><center>(上图为 Apache Paimon 写入资源配比)</center></p>

在不升高写入性能的状况下 Apache Paimon 应用了更少的容器数和更低的资源配比。这得益于 SortRun 和 Universal-Compaction 策略的写优化能力,Upsert 效率绝对 Hudi MOR 也有较大晋升,如下 Flink 配置的状况下:

parallelish.default : 2
execution.checkpointing.interval : 2 min
taskmanager.memory.process.size : 6g

Upsert 4 亿数据,800 个分区(实际效果与集群性能相干与时间段相干,大略做个参考)的场景下,应用 Apache Paimon 总共耗时 3 小时左右,而 Apache Hudi MOR 须要耗时 10 小时左右。

再来看下点查性能

雷同的条件下 Apache Paimon 只须要 2.7 秒,比照 Hudi 21 秒晋升微小。性能晋升的次要起因在于有序的存储构造可能在数据检索时疾速定位和缩小 Scan 数量。

目前咱们上线了局部场景的利用,大批量上线之后再察看 HDFS IO 压力状况。

三、Apache Paimon 的利用实际

目前咱们在外部数据集成中退出了 Paimon 的反对,同时将多个场景切换到了 Paimon,次要包含 Binglog 集成,Partial Update 准实时宽表,以及 Append Only 场景。

3.1 Paimon 的自动化数据集成

咱们通过集成平台屏蔽了用户对 binglog 的感知,通过一键的形式实现底层 Base 表全量 + 增量的同步性能,大抵流程如下:

用户更加关注他们所相熟的 Mysql 以及咱们的最终湖仓表,大抵集成界面如下:

<center> 注:Paimon 原名 Flink Table Store</center>

同时咱们为了将 Hudi 表迁徙到 Paimon 之中,小数据量的咱们间接通过重做的形式,而大数据量会通过 Flink 批量导入形式进行初始化,通过测试,4 亿左右的表只须要不到 20 分钟即可导入实现,大抵导入配置如下:

INSERT INTO paimon.ods.order_info
/*+ OPTIONS('sink.parallelism'='100','write-buffer-size'='1024m','sink.partition-shuffle' = 'true') */
SELECT
*
FROM
hudi.ods.order_info/*+ OPTIONS('read.tasks' = '100') */
;

另外咱们的集成环境和监控针对 Paimon 也进行了一系列优化:

  • 依据表数据量来制订特定参数,使用户无感知
  • 调整分区策略和资源,优化大量随机写状况
  • 构建监控大盘,时刻关注工作运行状况,时刻维持工作失常运行和资源分配的一个平衡点

3.2 基于 Partial Update 的准实时宽表

准实时是介于离线和实时之间,其中准实时宽表是一个常见的案例,次要用来反对 Ad-Hoc Query。在准实时场景下,次要存在如下特点和挑战:

  • 通过微批调度(分钟,小时)进行数据更新,然而提早绝对较高
  • 通过流式引擎构建,则会存在保留大量状态造成资源重大节约的状况

Paimon 提供了 Partial Update 的性能,可通过 Merge-Engine 参数来指定:

'merge-engine' = 'partial-update'

Partial Update 的特点:

  • 后果表字段由多个数据源提供组成,可应用 Union All 的形式进行逻辑拼接
  • 数据在存储层进行 Join 拼接,与计算引擎无关,不须要保留状态,节俭资源

具体案例如下:

案例实际:数据写入

--FlinkSQL 参数设置
set `table.dynamic-table-options.enabled`=`true`;
SET `env.state.backend`=`rocksdb`; 
SET `execution.checkpointing.interval`=`60000`;
SET `execution.checkpointing.tolerable-failed-checkpoints`=`3`;
SET `execution.checkpointing.min-pause`=`60000`;

-- 创立 Paimon catalog
CREATE CATALOG paimon WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
  'uri' = 'thrift://localhost:9083',
  'warehouse' = 'hdfs://paimon',
  'table.type' = 'EXTERNAL'
);

-- 创立 Partial update 后果表
CREATE TABLE if not EXISTS paimon.dw.order_detail
(
    `order_id` string 
    ,`product_type` string 
    ,`plat_name` string 
    ,`ref_id` bigint 
    ,`start_city_name` string 
    ,`end_city_name` string 
    ,`create_time` timestamp(3)
    ,`update_time` timestamp(3) 
    ,`dispatch_time` timestamp(3) 
    ,`decision_time` timestamp(3) 
    ,`finish_time` timestamp(3) 
    ,`order_status` int 
    ,`binlog_time` bigint
    ,PRIMARY KEY (order_id) NOT ENFORCED
) 
WITH (
  'bucket' = '20', -- 指定 20 个 bucket
  'bucket-key' = 'order_id',
  'sequence.field' = 'binlog_time', -- 记录排序字段
  'changelog-producer' = 'full-compaction',  -- 抉择 full-compaction,在 compaction 后产生残缺的 changelog
  'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间
  'merge-engine' = 'partial-update',
  'partial-update.ignore-delete' = 'true' -- 疏忽 DELETE 数据,防止运行报错
);

INSERT INTO paimon.dw.order_detail
-- order_info 表提供次要字段
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,     
order_status,
binlog_time
FROM
paimon.ods.order_info /*+ OPTIONS ('scan.mode'='latest') */

union all 

-- order_address 表提供城市字段
SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,  
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address /*+ OPTIONS ('scan.mode'='latest') */
;

3.3 AppendOnly 利用

除了 Binlog 数据源,还有大量日志、埋点相干的 AppendOnly 数据源,这类数据根本都是数据量十分大的存在,一般来说,这类数据都是间接生产落在分布式文件系统上的。

当咱们采纳 Paimon 来构建 AppendOnly 表时,数据不仅能够实时写入,还能够实时读取,读写程序统一,而且实时资源耗费也升高了不少齐全能够替换局部音讯队列的场景,达到解耦和降本增效的成果。SQL 如下:

CREATE TABLE if not exists paimon.ods.event_log(.......) 
PARTITIONED BY (......)
WITH (
  'bucket' = '100',
  'bucket-key' = 'uuid',
  'snapshot.time-retained' = '7 d',
  'write-mode' = 'append-only'
);
INSERT INTO paimon.ods.event_log
SELECT 
    .......
FROM 
    realtime_event_kafka_source
;

写入成果如下:

四、问题发现和解决

4.1 Spark 跨 Warehouse 查问能力调整

以后 Hive Catalog 次要基于 Warehouse 门路组装 Paimon 表门路,在 Spark 环境内申明 Warehouse 之后不太容易跨 Warehouse 进行多 Paimon 表的查问。外部咱们重载了 HiveCatalog 的 getDataTableLocation 办法,基于 Hive 表构建 Paimon 表门路

    @Override
    public Path getDataTableLocation(Identifier identifier) {
        try {Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
            return new Path(table.getSd().getLocation());
        } catch (TException e) {throw new RuntimeException("Failed to get table location", e);
        }
    }

同时也减少了构建 Hive 内部表的能力,[[FLINK-29922] Support create external table for hive catalog](https://github.com/apache/incubator-paimon/pull/357)

4.2 大量分区 + Bucket 场景下 Flink 批读超过 Akka 音讯限度优化

实际过程中如果发现相似以下谬误,能够适当调大 Flink 中的 akka.framesize 参数,默认 10M。

2023-03-21 15:51:08,996 ERROR akka.remote.EndpointWriter                                   [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@hadoop-0xx-xxx:29413/user/rpc/taskmanager_0#1719925448]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 1077637236 bytes.

最终通过退出分批次 Split 形式进行解决,[[flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator ](https://github.com/apache/incubator-paimon/pull/687),成果如下:

4.3 流读场景下,并行度调配不合理以及基于工夫戳读取过期工夫报错的问题

目前跟进中,[[Feature] Some problems with stream reading](https://github.com/apache/incubator-paimon/issues/699)

五、将来布局

  • 欠缺 Paimon 平台剖析等相干生态
  • 基于 Paimon 的流式数仓构建
  • 推广 Paimon 在团体外部的利用实际
  • 替换局部音讯队列的场景

Reference:

  • Apache Paimon 官网:https://paimon.apache.org/
  • Piamon Github:https://github.com/apache/incubator-paimon
  • 基于 Apache Flink Table Store 的全增量一体实时入湖:https://www.jianshu.com/p/ac2ba73367fe
  • Flink Table Store 0.3 构建流式数仓最佳实际:https://juejin.cn/post/7207659644486959163

作者简介:

吴祥平:同程旅行大数据计算组负责人,Apache Hudi & Paimon Contributor,对流计算和数据湖技术充满热情

曾思杨:同程旅行公共 BI 数据开发,酷爱流计算和数据湖技术及其理论利用

点击查看更多技术内容


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc

正文完
 0