关于大数据:Apache-Hudi-在袋鼠云数据湖平台的设计与实践

41次阅读

共计 4713 个字符,预计需要花费 12 分钟才能阅读完成。

在大数据处理中,实时数据分析是一个重要的需要。随着数据量的一直增长,对于实时剖析的挑战也在一直加大,传统的批处理形式曾经不能满足实时数据处理的需要,须要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库治理性能。

本文将介绍袋鼠云基于 Hudi 构建数据湖的整体计划架构及其在实时数据仓库解决方面的特点,并且为大家展现一个应用 Apache Hudi 的简略示例,便于新手上路。

Apache Hudi 介绍

Apache Hudi 是一个开源的数据湖存储系统,能够在 Hadoop 生态系统中提供实时数据仓库解决性能。Hudi 最早由 Uber 开发,起初成为 Apache 顶级我的项目。

Hudi 次要个性

· 反对疾速插入和更新操作,以便在数据仓库中实时处理数据;

· 提供增量查问性能,可无效进步数据分析效率;

· 反对工夫点查问,以便查看数据在某一时刻的状态;

· 与 Apache Spark、Hive 等大数据分析工具兼容。

Hudi 架构

Apache Hudi 的架构包含以下几个次要组件:

· Hudi 数据存储:Hudi 数据存储是 Hudi 的外围组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存储类型会在对数据进行更新时,创立一个新的数据文件正本,将更新的数据写入正本中,之后,新的数据文件正本会替换原始数据文件;

· Merge-On-Read:MOR 存储类型会在查问时,将更新的数据与原始数据进行合并,这种形式能够缩小数据存储的写入提早,但会减少查问的计算量;

· Hudi 索引:Hudi 索引用于保护数据记录的地位信息,索引有两种类型:内置索引(如 Bloom 过滤器)和内部索引(如 HBase 索引);

· Hudi 查问引擎:Hudi 查问引擎负责解决查问申请,Hudi 反对多种查问引擎,如 Spark SQL、Hive、Presto 等。

Hudi 的应用场景

Apache Hudi 能够帮忙企业和组织实现实时数据处理和剖析。实时数据处理须要疾速地解决和查问数据,同时还须要保证数据的一致性和可靠性。

Apache Hudi 的增量数据处理、ACID 事务性保障、写时合并等技术个性能够帮忙企业更好地实现实时数据处理和剖析,基于 Hudi 的个性能够在肯定水平上在实时数仓的构建过程中承当上下游数据链路的对接(相似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的解决提供存储根底。

Hudi 的劣势和劣势

● 劣势

· 高效解决大规模数据集;

· 反对实时数据更新和查问;

· 实现了增量写入机制,进步了数据拜访效率;

· Hudi 能够与流解决管道集成;

· Hudi 提供了工夫旅行性能,容许回溯数据的历史版本。

● 劣势

· 在读写数据时须要付出额定的代价;

· 操作比较复杂,须要应用业余的编程语言和工具。

Hudi 在袋鼠云数据湖平台上的实际

Hudi 在袋鼠云数据湖的技术架构

Hudi 在袋鼠云的数据湖平台上次要对数据湖治理提供助力:

· 元数据的接入,让用户能够疾速的对表进行治理;

· 数据疾速接入,包含对符合条件的原有表数据进行转换,疾速搭建数据湖能力;

· 湖表的治理,监控小文件定期进行合并,晋升表的查问性能,外在丰盛的表操作性能,包含 time travel , 孤儿文件清理,过期快照清理等;

· 索引构建,提供多种索引包含 bloom filter,zorder 等,晋升计算引擎的查问性能。

Hudi 应用示例

在介绍了 Hudi 的根本信息和袋鼠云数据湖平台的构造之后,咱们来看一个应用示例,替换 Flink 在内存中的 join 过程。

在 Flink 中对多流 join 往往是比拟头疼的场景,须要思考 state ttl 工夫设置,设置太小数据常常关联不上,设置太大内存又须要很高能力保留,咱们通过 Hudi 的形式来换个思路实现。

● 构建 catalog

public String createCatalog(){
    String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
            "'type' = 'hudi',\n"+"    'mode' = 'hms',\n"+"    'default-database' = 'default',\n"+"    'hive.conf.dir' = '/hive_conf_dir',\n"+"    'table.external' = 'true'\n"+")";

    return createCatalog;
}

● 创立 hudi 表

public String createHudiTable(){

    String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +
            "id int ,\n" +
            "name VARCHAR(10),\n" +
            "age int ,\n" +
            "address VARCHAR(10),\n" +
            "dt VARCHAR(10),\n" +
            "primary key(id) not enforced\n" +
            ")\n" +
            "PARTITIONED BY (dt)\n" +
            "WITH (\n" +
            "'connector' = 'hudi',\n"+"  'table.type' = 'MERGE_ON_READ',\n"+"  'changelog.enabled' = 'true',\n"+"  'index.type' = 'BUCKET',\n"+"  'hoodie.bucket.index.num.buckets' = '2',\n" +
            String.format("'%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +"  'write.payload.class' = '"+ PartialUpdateAvroPayload.class.getName() +"'\n"+");";

    return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 从 kafka 中读取 topic1

public String createKafkaTable1(){
    String kafkaSource1 = "CREATE TABLE source1\n" +
            "(\n" +
            "id        INT,\n" +
            "name      STRING,\n" +
            "age        INT,\n" +
            "dt        String,\n" +
            "PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "'connector' = 'kafka'\n"+"      ,'topic' = 'join_topic1'\n"+"      ,'properties.bootstrap.servers' = 'localhost:9092'\n"+"      ,'scan.startup.mode' = 'earliest-offset'\n"+"      ,'format' = 'json'\n"+"      ,'json.timestamp-format.standard' = 'SQL'\n"+"      )";

    return kafkaSource1;
}

02 从 kafka 中读取 topic2

public String createKafkaTable2(){
    String kafkaSource2 = "CREATE TABLE source2\n" +
            "(\n" +
            "id        INT,\n" +
            "name      STRING,\n" +
            "address        string,\n" +
            "dt        String,\n" +
            "PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "'connector' = 'kafka'\n"+"      ,'topic' = 'join_topic2'\n"+"      ,'properties.bootstrap.servers' = 'localhost:9092'\n"+"      ,'scan.startup.mode' = 'earliest-offset'\n"+"      ,'format' = 'json'\n"+"      ,'json.timestamp-format.standard' = 'SQL'\n"+"      )";

    return kafkaSource2;
}

● 执行插入逻辑 1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt)" +
        "select id, name,age,dt from source1";

● 通过 spark 查问数据

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 执行插入逻辑 2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt)" +
        "select id, name, address,dt from source2";

● 运行胜利

运行胜利后在 spark 中查问对应的表数据:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

能够发现在第二次数据运行之后,表数据的对应字段 address 曾经更新,达到了相似在 Flink 中间接执行 join 的成果。

insert into hudi_catalog.flink_db.test_hudi_flink_join_2
        select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id 

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实际白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想理解或征询更多无关袋鼠云大数据产品、行业解决方案、客户案例的敌人,浏览袋鼠云官网:https://www.dtstack.com/?src=szsf

同时,欢送对大数据开源我的项目有趣味的同学退出「袋鼠云开源框架钉钉技术 qun」,交换最新开源技术信息,qun 号码:30537511,我的项目地址:https://github.com/DTStack

正文完
 0