利用实际 | Apache Doris 整合 Iceberg + Flink CDC 构建实时湖仓一体的联邦查问剖析架构

导读:这是一篇十分残缺全面的利用技术干货,手把手教你如何应用 Doris+Iceberg+Flink CDC 构建实时湖仓一体的联邦查问剖析架构。依照本文中步骤一步步实现,残缺体验搭建操作的残缺过程。

作者Apache Doris PMC 成员 张家锋

1.概览

这篇教程将展现如何应用 Doris+Iceberg+Flink CDC 构建实时湖仓一体的联邦查问剖析,Doris 1.1版本提供了Iceberg的反对,本文次要展现Doris和Iceberg怎么应用,同时本教程整个环境是都基于伪分布式环境搭建,大家依照步骤能够一步步实现。残缺体验整个搭建操作的过程。

1.1 软件环境

本教程的演示环境如下:

  1. Centos7
  2. Apahce doris 1.1
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.14.4
  6. flink-sql-connector-mysql-cdc-2.2.1
  7. Apache Iceberg 0.13.2
  8. JDK 1.8.0_311
  9. MySQL 8.0.29
wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gzwget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gzwget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgzwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

1.2 零碎架构

咱们整顿架构图如下

  1. 首先咱们从Mysql数据中应用Flink 通过 Binlog实现数据的实时采集
  2. 而后再Flink 中创立 Iceberg 表,Iceberg的元数据保留在hive里
  3. 最初咱们在Doris中创立Iceberg表面
  4. 在通过Doris 对立查问入口实现对Iceberg里的数据进行查问剖析,供前端利用调用,这里iceberg表面的数据能够和Doris外部数据或者Doris其余内部数据源的数据进行关联查问剖析

Doris湖仓一体的联邦查问架构如下:

  1. Doris 通过 ODBC 形式反对:MySQL,Postgresql,Oracle ,SQLServer
  2. 同时反对 Elasticsearch 表面
  3. 1.0版本反对Hive表面
  4. 1.1版本反对Iceberg表面
  5. 1.2版本反对Hudi 表面

2.环境装置部署

2.1 装置Hadoop、Hive

tar zxvf hadoop-3.3.3.tar.gztar zxvf apache-hive-3.1.3-bin.tar.gz

配置零碎环境变量

export HADOOP_HOME=/data/hadoop-3.3.3export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_HOMEexport HIVE_HOME=/data/hive-3.1.3export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

2.2 配置hdfs

2.2.1 core-site.xml

vi etc/hadoop/core-site.xml

<configuration>    <property>        <name>fs.defaultFS</name>        <value>hdfs://localhost:9000</value>    </property></configuration>
2.2.2 hdfs-site.xml

vi etc/hadoop/hdfs-site.xml

  <configuration>    <property>      <name>dfs.replication</name>      <value>1</value>    </property>    <property>      <name>dfs.namenode.name.dir</name>      <value>/data/hdfs/namenode</value>    </property>    <property>      <name>dfs.datanode.data.dir</name>      <value>/data/hdfs/datanode</value>    </property>  </configuration>
2.2.3 批改Hadoop启动脚本

sbin/start-dfs.sh

sbin/stop-dfs.sh

在文件开始加上上面的内容

HDFS_DATANODE_USER=rootHADOOP_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root

sbin/start-yarn.sh

sbin/stop-yarn.sh

在文件开始加上上面的内容

YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root

2.3 配置yarn

这里我扭转了Yarn的一些端口,因为我是单机环境和Doris 的一些端口抵触。你能够不启动yarn

vi etc/hadoop/yarn-site.xml

<property>            <name>yarn.resourcemanager.address</name>      <value>jiafeng-test:50056</value> </property>  <property>      <name>yarn.resourcemanager.scheduler.address</name>     <value>jiafeng-test:50057</value> </property> <property>     <name>yarn.resourcemanager.resource-tracker.address</name>      <value>jiafeng-test:50058</value> </property> <property>    <name>yarn.resourcemanager.admin.address</name>     <value>jiafeng-test:50059</value> </property> <property>    <name>yarn.resourcemanager.webapp.address</name>     <value>jiafeng-test:9090</value> </property> <property>     <name>yarn.nodemanager.localizer.address</name>    <value>0.0.0.0:50060</value> </property> <property>     <name>yarn.nodemanager.webapp.address</name>     <value>0.0.0.0:50062</value>  </property>

vi etc/hadoop/mapred-site.xm

<property>           <name>mapreduce.jobhistory.address</name>      <value>0.0.0.0:10020</value>  </property> <property>     <name>mapreduce.jobhistory.webapp.address</name>     <value>0.0.0.0:19888</value> </property> <property>     <name>mapreduce.shuffle.port</name>    <value>50061</value> </property>
2.2.4 启动hadoop
sbin/start-all.sh

2.4 配置Hive

2.4.1 创立hdfs目录
hdfs dfs -mkdir -p /user/hive/warehousehdfs dfs -mkdir /tmphdfs dfs -chmod g+w /user/hive/warehousehdfs dfs -chmod g+w /tmp
2.4.2 配置hive-site.xml
<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration>        <property>            <name>javax.jdo.option.ConnectionURL</name>            <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>        </property>        <property>            <name>javax.jdo.option.ConnectionDriverName</name>            <value>com.mysql.jdbc.Driver</value>        </property>        <property>            <name>javax.jdo.option.ConnectionUserName</name>            <value>root</value>        </property>        <property>            <name>javax.jdo.option.ConnectionPassword</name>            <value>MyNewPass4!</value>        </property>        <property>                <name>hive.metastore.warehouse.dir</name>                <value>/user/hive/warehouse</value>                <description>location of default database for the warehouse</description>        </property>        <property>                <name>hive.metastore.uris</name>                <value/>                <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>        </property>        <property>                <name>javax.jdo.PersistenceManagerFactoryClass</name>                <value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>        </property>        <property>                <name>hive.metastore.schema.verification</name>                <value>false</value>        </property>        <property>                <name>datanucleus.schema.autoCreateAll</name>                <value>true</value>        </property></configuration>
2.4.3 配置 hive-env.sh

退出一下内容

HADOOP_HOME=/data/hadoop-3.3.3
2.4.4 hive元数据初始化
schematool -initSchema -dbType mysql
2.4.5 启动hive metaservice

后盾运行

nohup bin/hive --service metaservice 1>/dev/null 2>&1 &

验证

lsof -i:9083COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAMEjava    20700 root  567u  IPv6 54605348      0t0  TCP *:emc-pp-mgmtsvc (LISTEN)

2.5 装置MySQL

具体请参照这里:

应用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

2.5.1 创立MySQL数据库表并初始化数据
CREATE DATABASE demo;USE demo;CREATE TABLE userinfo (  id int NOT NULL AUTO_INCREMENT,  name VARCHAR(255) NOT NULL DEFAULT 'flink',  address VARCHAR(1024),  phone_number VARCHAR(512),  email VARCHAR(255),  PRIMARY KEY (`id`))ENGINE=InnoDB ;INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

2.6 装置 Flink

tar zxvf flink-1.14.4-bin-scala_2.12.tgz

而后须要将上面的依赖拷贝到Flink装置目录下的lib目录下,具体的依赖的lib文件如下:

上面将几个Hadoop和Flink里没有的依赖下载地址放在上面

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jarwget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jarwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

其余的:

hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jarhadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jarhadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jarhadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jaradoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jarhadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jarhive-3.1.3/lib/hive-exec-3.1.3.jarhive-3.1.3/lib/hive-metastore-3.1.3.jarhive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
2.6.1 启动Flink
bin/start-cluster.sh

启动后的界面如下:

2.6.2 进入 Flink SQL Client
 bin/sql-client.sh embedded 

开启 checkpoint,每隔3秒做一次 checkpoint

Checkpoint 默认是不开启的,咱们须要开启 Checkpoint 来让 Iceberg 能够提交事务。 并且,mysql-cdc 在 binlog 读取阶段开始前,须要期待一个残缺的 checkpoint 来防止 binlog 记录乱序的状况。

留神:

这里是演示环境,checkpoint的距离设置比拟短,线上应用,倡议设置为3-5分钟一次checkpoint。

Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.
2.6.3 创立Iceberg Catalog
CREATE CATALOG hive_catalog WITH (  'type'='iceberg',  'catalog-type'='hive',  'uri'='thrift://localhost:9083',  'clients'='5',  'property-version'='1',  'warehouse'='hdfs://localhost:8020/user/hive/warehouse');

查看catalog

Flink SQL> show catalogs;+-----------------+|    catalog name |+-----------------+| default_catalog ||    hive_catalog |+-----------------+2 rows in set
2.6.4 创立 Mysql CDC 表
 CREATE TABLE user_source (    database_name STRING METADATA VIRTUAL,    table_name STRING METADATA VIRTUAL,    `id` DECIMAL(20, 0) NOT NULL,    name STRING,    address STRING,    phone_number STRING,    email STRING,    PRIMARY KEY (`id`) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'localhost',    'port' = '3306',    'username' = 'root',    'password' = 'MyNewPass4!',    'database-name' = 'demo',    'table-name' = 'userinfo'  );

查问CDC表:

select * from user_source;

2.6.5 创立Iceberg表
---查看catalogshow catalogs;---应用cataloguse catalog hive_catalog;--创立数据库CREATE DATABASE iceberg_hive; --应用数据库use iceberg_hive;
2.6.5.1 创立表
CREATE TABLE all_users_info (    database_name STRING,    table_name    STRING,    `id`          DECIMAL(20, 0) NOT NULL,    name          STRING,    address       STRING,    phone_number  STRING,    email         STRING,    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED  ) WITH (    'catalog-type'='hive'  );

从CDC表里插入数据到Iceberg表里

use catalog default_catalog;insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

在web界面能够看到工作的运行状况

而后停掉工作,咱们去查问iceberg表

select * from hive_catalog.iceberg_hive.all_users_info

能够看到上面的后果

咱们去hdfs上能够看到hive目录下的数据及对应的元数据

咱们也能够通过Hive建好Iceberg表,而后通过Flink将数据插入到表里

下载Iceberg Hive运行依赖

 wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

在hive shell下执行:

SET engine.hive.enabled=true; SET iceberg.engine.hive.enabled=true; SET iceberg.mr.catalog=hive;  add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

创立表

CREATE EXTERNAL TABLE iceberg_hive(   `id` int,   `name` string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'TBLPROPERTIES (  'iceberg.mr.catalog'='hadoop', 'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'  ); 

而后再Flink SQL Client下执行上面语句将数据插入到Iceber表里

INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');

查问这个表

select * from hive_catalog.iceberg_hive.iceberg_hive

能够看到上面的后果

3. Doris 查问 Iceberg

Apache Doris 提供了 Doris 间接拜访 Iceberg 内部表的能力,内部表省去了繁琐的数据导入工作,并借助 Doris 自身的 OLAP 的能力来解决 Iceberg 表的数据分析问题:

  1. 反对 Iceberg 数据源接入Doris
  2. 反对 Doris 与 Iceberg 数据源中的表联结查问,进行更加简单的剖析操作

3.1装置Doris

这里咱们不在具体解说Doris的装置,如果你不晓得怎么装置Doris请参照官网文档:疾速入门

3.2 创立Iceberg表面

CREATE TABLE `all_users_info` ENGINE = ICEBERGPROPERTIES ("iceberg.database" = "iceberg_hive","iceberg.table" = "all_users_info","iceberg.hive.metastore.uris"  =  "thrift://localhost:9083","iceberg.catalog.type"  =  "HIVE_CATALOG");
参数阐明:
  • ENGINE 须要指定为 ICEBERG
  • PROPERTIES 属性:

    • iceberg.hive.metastore.uris:Hive Metastore 服务地址
    • iceberg.database:挂载 Iceberg 对应的数据库名
    • iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。
    • iceberg.catalog.type:Iceberg 中应用的 catalog 形式,默认为 HIVE_CATALOG,以后仅反对该形式,后续会反对更多的 Iceberg catalog 接入形式。
mysql> CREATE TABLE `all_users_info`    -> ENGINE = ICEBERG    -> PROPERTIES (    -> "iceberg.database" = "iceberg_hive",    -> "iceberg.table" = "all_users_info",    -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",    -> "iceberg.catalog.type"  =  "HIVE_CATALOG"    -> );Query OK, 0 rows affected (0.23 sec)mysql> select * from all_users_info;+---------------+------------+-------+----------+-----------+--------------+-------+| database_name | table_name | id    | name     | address   | phone_number | email |+---------------+------------+-------+----------+-----------+--------------+-------+| demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  || demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  || demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  || demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  || demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  || demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  || demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  || demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  || demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |+---------------+------------+-------+----------+-----------+--------------+-------+9 rows in set (0.18 sec)

3.3 同步挂载

当 Iceberg 表 Schema 产生变更时,能够通过 REFRESH 命令手动同步,该命令会将 Doris 中的 Iceberg 表面删除重建。

-- 同步 Iceberg 表REFRESH TABLE t_iceberg;-- 同步 Iceberg 数据库REFRESH DATABASE iceberg_test_db;

3.4 Doris 和 Iceberg 数据类型对应关系

反对的 Iceberg 列类型与 Doris 对应关系如下表:

ICEBERGDORIS形容
BOOLEANBOOLEAN
INTEGERINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMESTAMPDATETIMETimestamp 转成 Datetime 会损失精度
STRINGSTRING
UUIDVARCHAR应用 VARCHAR 来代替
DECIMALDECIMAL
TIME-不反对
FIXED-不反对
BINARY-不反对
STRUCT-不反对
LIST-不反对
MAP-不反对

3.5 注意事项

  • Iceberg 表 Schema 变更不会主动同步,须要在 Doris 中通过 REFRESH 命令同步 Iceberg 表面或数据库。
  • 以后默认反对的 Iceberg 版本为 0.12.0,0.13.x,未在其余版本进行测试。后续后反对更多版本。

3.6 Doris FE 配置

上面几个配置属于 Iceberg 表面零碎级别的配置,能够通过批改 fe.conf 来配置,也能够通过 ADMIN SET CONFIG 来配置。

  • iceberg_table_creation_strict_mode

    创立 Iceberg 表默认开启 strict mode。 strict mode 是指对 Iceberg 表的列类型进行严格过滤,如果有 Doris 目前不反对的数据类型,则创立表面失败。

  • iceberg_table_creation_interval_second

    主动创立 Iceberg 表的后台任务执行距离,默认为 10s。

  • max_iceberg_table_creation_record_size

    Iceberg 表创立记录保留的最大值,默认为 2000. 仅针对创立 Iceberg 数据库记录。

4. 总结

这里Doris On Iceberg咱们只演示了Iceberg单表的查问,你还能够联结Doris的表,或者其余的ODBC表面,Hive表面,ES表面等进行联结查问剖析,通过Doris对外提供对立的查问剖析入口。

自此咱们残缺从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的应用全副介绍完了,Doris朝着数据仓库和数据交融的架构演进,反对湖仓一体的联邦查问,给咱们的开发带来更多的便当,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。
最初,欢送更多的开源技术爱好者退出 Apache Doris 社区,携手成长,共建社区生态。

SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师、产品经理和反对工程师组成的团队,凋敝开源社区生态,打造实时剖析型数据库畛域的国内工业界规范。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为用户和客户提供开箱即用的能力。

相干链接:

SelectDB 官方网站:

https://selectdb.com (We Are Coming Soon)

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org