乐趣区

关于数据库:应用实践-Apache-Doris-整合-Iceberg-Flink-CDC-构建实时湖仓一体的联邦查询分析架构

利用实际 | Apache Doris 整合 Iceberg + Flink CDC 构建实时湖仓一体的联邦查问剖析架构

导读:这是一篇十分残缺全面的利用技术干货,手把手教你如何应用 Doris+Iceberg+Flink CDC 构建实时湖仓一体的联邦查问剖析架构。依照本文中步骤一步步实现,残缺体验搭建操作的残缺过程。

作者|Apache Doris PMC 成员 张家锋

1. 概览

这篇教程将展现如何应用 Doris+Iceberg+Flink CDC 构建实时湖仓一体的联邦查问剖析,Doris 1.1 版本提供了 Iceberg 的反对,本文次要展现 Doris 和 Iceberg 怎么应用,同时本教程整个环境是都基于伪分布式环境搭建,大家依照步骤能够一步步实现。残缺体验整个搭建操作的过程。

1.1 软件环境

本教程的演示环境如下:

  1. Centos7
  2. Apahce doris 1.1
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.14.4
  6. flink-sql-connector-mysql-cdc-2.2.1
  7. Apache Iceberg 0.13.2
  8. JDK 1.8.0_311
  9. MySQL 8.0.29
wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gz
wget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

1.2 零碎架构

咱们整顿架构图如下

  1. 首先咱们从 Mysql 数据中应用 Flink 通过 Binlog 实现数据的实时采集
  2. 而后再 Flink 中创立 Iceberg 表,Iceberg 的元数据保留在 hive 里
  3. 最初咱们在 Doris 中创立 Iceberg 表面
  4. 在通过 Doris 对立查问入口实现对 Iceberg 里的数据进行查问剖析,供前端利用调用,这里 iceberg 表面的数据能够和 Doris 外部数据或者 Doris 其余内部数据源的数据进行关联查问剖析

Doris 湖仓一体的联邦查问架构如下:

  1. Doris 通过 ODBC 形式反对:MySQL,Postgresql,Oracle,SQLServer
  2. 同时反对 Elasticsearch 表面
  3. 1.0 版本反对 Hive 表面
  4. 1.1 版本反对 Iceberg 表面
  5. 1.2 版本反对 Hudi 表面

2. 环境装置部署

2.1 装置 Hadoop、Hive

tar zxvf hadoop-3.3.3.tar.gz
tar zxvf apache-hive-3.1.3-bin.tar.gz

配置零碎环境变量

export HADOOP_HOME=/data/hadoop-3.3.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HIVE_HOME=/data/hive-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

2.2 配置 hdfs

2.2.1 core-site.xml

vi etc/hadoop/core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
2.2.2 hdfs-site.xml

vi etc/hadoop/hdfs-site.xml

  <configuration>
    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>/data/hdfs/namenode</value>
    </property>
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>/data/hdfs/datanode</value>
    </property>
  </configuration>
2.2.3 批改 Hadoop 启动脚本

sbin/start-dfs.sh

sbin/stop-dfs.sh

在文件开始加上上面的内容

HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

sbin/start-yarn.sh

sbin/stop-yarn.sh

在文件开始加上上面的内容

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

2.3 配置 yarn

这里我扭转了 Yarn 的一些端口,因为我是单机环境和 Doris 的一些端口抵触。你能够不启动 yarn

vi etc/hadoop/yarn-site.xml

<property>        
    <name>yarn.resourcemanager.address</name>  
    <value>jiafeng-test:50056</value> 
</property>  
<property>  
    <name>yarn.resourcemanager.scheduler.address</name> 
    <value>jiafeng-test:50057</value> 
</property> 
<property> 
    <name>yarn.resourcemanager.resource-tracker.address</name>  
    <value>jiafeng-test:50058</value> 
</property> 
<property>
    <name>yarn.resourcemanager.admin.address</name> 
    <value>jiafeng-test:50059</value> 
</property> 
<property>
    <name>yarn.resourcemanager.webapp.address</name> 
    <value>jiafeng-test:9090</value> 
</property> 
<property> 
    <name>yarn.nodemanager.localizer.address</name>
    <value>0.0.0.0:50060</value> 
</property> 
<property> 
    <name>yarn.nodemanager.webapp.address</name> 
    <value>0.0.0.0:50062</value>  
</property>

vi etc/hadoop/mapred-site.xm

<property>       
    <name>mapreduce.jobhistory.address</name>  
    <value>0.0.0.0:10020</value>  
</property> 
<property> 
    <name>mapreduce.jobhistory.webapp.address</name> 
    <value>0.0.0.0:19888</value> 
</property> 
<property> 
    <name>mapreduce.shuffle.port</name>
    <value>50061</value> 
</property>
2.2.4 启动 hadoop
sbin/start-all.sh

2.4 配置 Hive

2.4.1 创立 hdfs 目录
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /tmp
2.4.2 配置 hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
​
<configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>MyNewPass4!</value>
        </property>
        <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/user/hive/warehouse</value>
                <description>location of default database for the warehouse</description>
        </property>
        <property>
                <name>hive.metastore.uris</name>
                <value/>
                <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
        </property>
        <property>
                <name>javax.jdo.PersistenceManagerFactoryClass</name>
                <value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
        </property>
        <property>
                <name>hive.metastore.schema.verification</name>
                <value>false</value>
        </property>
        <property>
                <name>datanucleus.schema.autoCreateAll</name>
                <value>true</value>
        </property>
</configuration>
2.4.3 配置 hive-env.sh

退出一下内容

HADOOP_HOME=/data/hadoop-3.3.3
2.4.4 hive 元数据初始化
schematool -initSchema -dbType mysql
2.4.5 启动 hive metaservice

后盾运行

nohup bin/hive --service metaservice 1>/dev/null 2>&1 &

验证

lsof -i:9083
COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    20700 root  567u  IPv6 54605348      0t0  TCP *:emc-pp-mgmtsvc (LISTEN)

2.5 装置 MySQL

具体请参照这里:

应用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

2.5.1 创立 MySQL 数据库表并初始化数据
CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
  id int NOT NULL AUTO_INCREMENT,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

2.6 装置 Flink

tar zxvf flink-1.14.4-bin-scala_2.12.tgz

而后须要将上面的依赖拷贝到 Flink 装置目录下的 lib 目录下,具体的依赖的 lib 文件如下:

上面将几个 Hadoop 和 Flink 里没有的依赖下载地址放在上面

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

其余的:

hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
hadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jar
hadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar
adoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jar
hadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jar
hive-3.1.3/lib/hive-exec-3.1.3.jar
hive-3.1.3/lib/hive-metastore-3.1.3.jar
hive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
2.6.1 启动 Flink
bin/start-cluster.sh

启动后的界面如下:

2.6.2 进入 Flink SQL Client
 bin/sql-client.sh embedded 

开启 checkpoint,每隔 3 秒做一次 checkpoint

Checkpoint 默认是不开启的,咱们须要开启 Checkpoint 来让 Iceberg 能够提交事务。并且,mysql-cdc 在 binlog 读取阶段开始前,须要期待一个残缺的 checkpoint 来防止 binlog 记录乱序的状况。

留神:

这里是演示环境,checkpoint 的距离设置比拟短,线上应用,倡议设置为 3 - 5 分钟一次 checkpoint。

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
2.6.3 创立 Iceberg Catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);

查看 catalog

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    hive_catalog |
+-----------------+
2 rows in set
2.6.4 创立 Mysql CDC 表
 CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'MyNewPass4!',
    'database-name' = 'demo',
    'table-name' = 'userinfo'
  );

查问 CDC 表:

select * from user_source;

2.6.5 创立 Iceberg 表
--- 查看 catalog
show catalogs;
--- 应用 catalog
use catalog hive_catalog;
-- 创立数据库
CREATE DATABASE iceberg_hive; 
-- 应用数据库
use iceberg_hive;
​
2.6.5.1 创立表
CREATE TABLE all_users_info (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH ('catalog-type'='hive');

从 CDC 表里插入数据到 Iceberg 表里

use catalog default_catalog;
​
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

在 web 界面能够看到工作的运行状况

而后停掉工作,咱们去查问 iceberg 表

select * from hive_catalog.iceberg_hive.all_users_info

能够看到上面的后果

咱们去 hdfs 上能够看到 hive 目录下的数据及对应的元数据

咱们也能够通过 Hive 建好 Iceberg 表,而后通过 Flink 将数据插入到表里

下载 Iceberg Hive 运行依赖

 wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

在 hive shell 下执行:

SET engine.hive.enabled=true; 
SET iceberg.engine.hive.enabled=true; 
SET iceberg.mr.catalog=hive; 
 add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

创立表

CREATE EXTERNAL TABLE iceberg_hive( 
  `id` int, 
  `name` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
TBLPROPERTIES (
  'iceberg.mr.catalog'='hadoop', 
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  ); 

而后再 Flink SQL Client 下执行上面语句将数据插入到 Iceber 表里

INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');

查问这个表

select * from hive_catalog.iceberg_hive.iceberg_hive

能够看到上面的后果

3. Doris 查问 Iceberg

Apache Doris 提供了 Doris 间接拜访 Iceberg 内部表的能力,内部表省去了繁琐的数据导入工作,并借助 Doris 自身的 OLAP 的能力来解决 Iceberg 表的数据分析问题:

  1. 反对 Iceberg 数据源接入 Doris
  2. 反对 Doris 与 Iceberg 数据源中的表联结查问,进行更加简单的剖析操作

3.1 装置 Doris

这里咱们不在具体解说 Doris 的装置,如果你不晓得怎么装置 Doris 请参照官网文档:疾速入门

3.2 创立 Iceberg 表面

CREATE TABLE `all_users_info` 
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
"iceberg.catalog.type"  =  "HIVE_CATALOG"
);
参数阐明:
  • ENGINE 须要指定为 ICEBERG
  • PROPERTIES 属性:

    • iceberg.hive.metastore.uris:Hive Metastore 服务地址
    • iceberg.database:挂载 Iceberg 对应的数据库名
    • iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。
    • iceberg.catalog.type:Iceberg 中应用的 catalog 形式,默认为 HIVE_CATALOG,以后仅反对该形式,后续会反对更多的 Iceberg catalog 接入形式。
mysql> CREATE TABLE `all_users_info`
    -> ENGINE = ICEBERG
    -> PROPERTIES (
    -> "iceberg.database" = "iceberg_hive",
    -> "iceberg.table" = "all_users_info",
    -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
    -> "iceberg.catalog.type"  =  "HIVE_CATALOG"
    -> );
Query OK, 0 rows affected (0.23 sec)
​
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id    | name     | address   | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  |
| demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  |
| demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  |
| demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  |
| demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  |
| demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  |
| demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)

3.3 同步挂载

当 Iceberg 表 Schema 产生变更时,能够通过 REFRESH 命令手动同步,该命令会将 Doris 中的 Iceberg 表面删除重建。

-- 同步 Iceberg 表
REFRESH TABLE t_iceberg;
​
-- 同步 Iceberg 数据库
REFRESH DATABASE iceberg_test_db;

3.4 Doris 和 Iceberg 数据类型对应关系

反对的 Iceberg 列类型与 Doris 对应关系如下表:

ICEBERG DORIS 形容
BOOLEAN BOOLEAN
INTEGER INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIMESTAMP DATETIME Timestamp 转成 Datetime 会损失精度
STRING STRING
UUID VARCHAR 应用 VARCHAR 来代替
DECIMAL DECIMAL
TIME 不反对
FIXED 不反对
BINARY 不反对
STRUCT 不反对
LIST 不反对
MAP 不反对

3.5 注意事项

  • Iceberg 表 Schema 变更 不会主动同步,须要在 Doris 中通过 REFRESH 命令同步 Iceberg 表面或数据库。
  • 以后默认反对的 Iceberg 版本为 0.12.0,0.13.x,未在其余版本进行测试。后续后反对更多版本。

3.6 Doris FE 配置

上面几个配置属于 Iceberg 表面零碎级别的配置,能够通过批改 fe.conf 来配置,也能够通过 ADMIN SET CONFIG 来配置。

  • iceberg_table_creation_strict_mode

    创立 Iceberg 表默认开启 strict mode。strict mode 是指对 Iceberg 表的列类型进行严格过滤,如果有 Doris 目前不反对的数据类型,则创立表面失败。

  • iceberg_table_creation_interval_second

    主动创立 Iceberg 表的后台任务执行距离,默认为 10s。

  • max_iceberg_table_creation_record_size

    Iceberg 表创立记录保留的最大值,默认为 2000. 仅针对创立 Iceberg 数据库记录。

4. 总结

这里 Doris On Iceberg 咱们只演示了 Iceberg 单表的查问,你还能够联结 Doris 的表,或者其余的 ODBC 表面,Hive 表面,ES 表面等进行联结查问剖析,通过 Doris 对外提供对立的查问剖析入口。

自此咱们残缺从搭建 Hadoop,hive、flink、Mysql、Doris 及 Doris On Iceberg 的应用全副介绍完了,Doris 朝着数据仓库和数据交融的架构演进,反对湖仓一体的联邦查问,给咱们的开发带来更多的便当,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。
最初,欢送更多的开源技术爱好者退出 Apache Doris 社区,携手成长,共建社区生态。

SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师、产品经理和反对工程师组成的团队,凋敝开源社区生态,打造实时剖析型数据库畛域的国内工业界规范。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为用户和客户提供开箱即用的能力。

相干链接:

SelectDB 官方网站:

https://selectdb.com (We Are Coming Soon)

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

退出移动版