共计 20010 个字符,预计需要花费 51 分钟才能阅读完成。
搭建一套数据治理体系耗时耗力,但或者咱们没有必要从头开始搞本人的数据血统我的项目。本文分享如何用开源、古代的 DataOps、ETL、Dashboard、元数据、数据血统管理系统构建大数据治理基础设施。
元数据治理零碎
元数据治理 零碎是一个提供了所有数据在哪、格式化形式、生成、转换、依赖、出现和所属的 一站式视图。
元数据治理零碎是所有数据仓库、数据库、表、仪表板、ETL 作业等的 目录接口(catalog),有了它,咱们就不必在群里喊“大家好,我能够更改这个表的 schema 吗?”、“请问谁晓得我如何找到 table-view-foo-bar 的原始数据?”…一个成熟的数据治理计划中的元数据治理零碎,对数据团队来说十分必要。
而 数据血统 则是元数据治理零碎泛滥须要治理的元数据之一,例如,某些 Dashboard 是某一个 Table View 的上游,而这个 Table View 又是从另外两个上游表 JOIN 而来。显然,应该清晰地把握、治理这些信息,去构建一个可信、可控的零碎和数据品质管制体系。
数据治理的可行计划
数据治理方案设计
元数据和数据血统实质上非常适合采纳图数据建模、图数据库。因为数据治理波及的典型查问便是面向图关系的查问,像“查找指定组件(即表)的所有 n 度(深度)的数据血统”就是图查问语句 FIND ALL PATH
跑起来的事。从日常大家在论坛、微信群里探讨的查问和图建模来看,NebulaGraph 社区很多人在从零开始搭建数据血统零碎,而这些工作看起来大多是在反复造轮子,而且还是不容易造的轮子。
既然如此,前人种树后人乘凉,这里我决定搭建一个齐备、端到端(不只有元数据管理)的数据系统,供大家参考解决数据血统、数据治理问题。这个套数据系统会采纳市面上优良的开源我的项目,而图数据库这块还是采纳大家的老朋友——NebulaGraph。心愿对大家能有所启发,在此基础之上领有一个绝对欠缺的图模型,以及设计精美、开箱即用的元数据治理零碎。
上面,来看看元数据治理零碎的轮子都须要哪些性能组件:
-
元数据抽取
- 这部分须要从不同的数据栈拉 / 推数据,像是从数据库、数仓、Dashboard,甚至是 ETL Pipeline 和利用、服务中搞数据。
-
元数据存储
- 能够存在数据库、图数据库里,甚至存成超大的 JSON manifest 文件都行
-
元数据目录接口零碎 Catalog
- 提供 API / GUI 来读写元数据和数据血统零碎
下图是整个计划的简略示意图:
其中,下面的虚线框是元数据的起源与导入、上面的虚线框是元数据的存储与展现、发现。
开源技术栈
上面,介绍下数据治理零碎的每个局部。
数据库和数仓
为了解决和应用原始和两头数据,这里肯定波及至多一个数据库或者数仓。它能够是 Hive、Apache Delta、TiDB、Cassandra、MySQL 或 Postgres。
在这个参考我的项目中,咱们选一个简略、风行的 Postgres。
✓ 数据仓库:Postgres
数据运维 DataOps
咱们应该有某种 DataOps 的计划,让 Pipeline 和环境具备可重复性、可测试性和版本控制性。
在这里,咱们应用了 GitLab 创立的 Meltano。
Meltano 是一个 just-work 的 DataOps 平台,它能够用奇妙且优雅的形式将 Singer 作为 EL 和 dbt 作为 T 连接起来。此外,它还连贯到其余一些 dataInfra 实用程序,例如 Apache Superset 和 Apache Airflow 等。
至此,咱们又纳入了一个成员:
✓ GitOps:Meltano https://gitlab.com/meltano/meltano
ETL 工具
下面咱们提到过组合 Singer 与 Meltano 将来自许多不同数据源的数据 E(提取)和 L(加载)数据指标,并应用 dbt 作为 Transform 的平台。于是咱们失去:
✓ EL:Singer
✓ T: dbt
数据可视化
在数据之上创立 Dashboard、图表和表格失去数据的洞察是很合乎直觉的,相似大数据之上的 Excel 图标性能。
Apache Superset 是我很喜爱的开源数据可视化我的项目,我筹备用它来作为被治理治理的指标之一。同时,还会利用它实现可视化性能来实现元数据洞察。于是,
✓ Dashboard:Apache Superset
工作编排(DAG Job Orchestration)
在大多数状况下,咱们的 DataOps 作业、工作调演变成须要编排零碎的规模,咱们能够用 Apache Airflow 来负责这一块。
✓ DAG:Apache Airflow https://airflow.apache.org/
元数据治理
随着越来越多的组件和数据被引入数据基础设施,在数据库、表、数据建模(schema)、Dashboard、DAG(编排零碎中的有向无环图)、利用与服务的各个生命周期阶段中都将存着海量的元数据,须要对它们的管理员和团队进行协同治理、连贯和发现。
Linux Foundation Amundsen 是解决该问题的最佳我的项目之一。Amundsen 用图数据库为事实源(single source of truth)以减速多跳查问,Elasticsearch 为全文搜索引擎。它在顺滑地解决所有元数据及其血统之余,还提供了优雅的 UI 和 API。Amundsen 反对多种图数据库为后端,这里咱们用 NebulaGraph。
当初的技术栈:
✓ 数据发现:Linux Foundation Amundsen
✓ 全文搜寻:Elasticsearch
✓ 图数据库:NebulaGraph
好的,所有组件都齐正了,开始组装它们吧。
环境搭建与各组件初识
本次实际的我的项目计划已开源,你能够拜访 https://github.com/wey-gu/data-lineage-ref-solution 来取得对应的代码。
整个实际过程,我遵循了尽量洁净、激励共建的准则。我的项目预设在一个 unix-like 零碎上运行,且联网和装有 Docker-Compose。
这里,我将在 Ubuntu 20.04 LTS X86_64 上运行它,当然在其余发行版或 Linux 版本上应该也没有问题。
运行一个数仓、数据库
首先,装置 Postgres 作为咱们的数仓。
这个单行命令会创立一个应用 Docker 在后盾运行的 Postgres,过程敞开之后容器不会残留而是被清理掉(因为参数--rm
)。
docker run --rm --name postgres \
-e POSTGRES_PASSWORD=lineage_ref \
-e POSTGRES_USER=lineage_ref \
-e POSTGRES_DB=warehouse -d \
-p 5432:5432 postgres
咱们能够用 Postgres CLI 或 GUI 客户端来验证命令是否执行胜利。
DataOps 工具链部署
接下来,装置有机联合了 Singer 和 dbt 的 Meltano。
Meltano 帮忙咱们治理 ETL 工具(作为插件)及其所有配置和 pipeline。这些元信息位于 Meltano 配置及其零碎数据库中,其中配置是基于文件的(能够应用 GitOps 治理),它的默认零碎数据库是 SQLite。
装置 Meltano
应用 Meltano 的工作流是启动一个“meltano 我的项目”并开始将 E、L 和 T 增加到配置文件中。Meltano 我的项目的启动只须要一个 CLI 命令 meltano init yourprojectname
。不过,在那之前,先用 Python 的包管理器 pip 或者 Docker 镜像装置 Meltano,像我示范的这样:
在 Python 虚拟环境中应用 pip 装置 Meltano:
mkdir .venv
# example in a debian flavor Linux distro
sudo apt-get install python3-dev python3-pip python3-venv python3-wheel -y
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano
# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
meltano init <yourprojectname>
或者,用 Docker 容器装置 Meltano:
docker pull meltano/meltano:latest
docker run --rm meltano/meltano --version
# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
docker run --rm -v "$(pwd)":/projects \
-w /projects --env-file .env \
meltano/meltano init <yourprojectname>
除了通晓 meltano init
之外,最好把握 Meltano 局部命令,例如 meltano etl
示意 ETL 的执行,meltano invoke <plugin>
来调用插件命令。具体能够参考它的速查表 https://docs.meltano.com/reference/command-line-interface。
Meltano GUI 界面
Meltano 自带一个基于 Web 的 UI,执行 ui
子命令就能启动它:
meltano ui
它默认会跑在 http://localhost:5000
上。
针对 Docker 的运行环境,在裸露 5000 端口的状况下运行容器即可。因为容器的默认命令曾经是 meltano ui
,所以 run
的命令只需:
docker run -v "$(pwd)":/project \
-w /project \
-p 5000:5000 \
meltano/meltano
Meltano 我的项目示例
GitHub 用户 Pat Nadolny 在开源我的项目 singer_dbt_jaffle 中做了很好的示例。他采纳 dbt 的 Meltano 示例数据集,利用 Airflow 编排 ETL 工作。
不只这样,他还有利用 Superset 的例子,见 jaffle_superset。
前人种树咱们来吃果,依照 Pat Nadolny 的实际,咱们能够这样地运行数据管道(pipeline):
- tap-CSV(Singer)从 CSV 文件中提取数据
- target-postgres(Singer)将数据加载到 Postgres
- dbt 将数据转换为聚合表或视图
留神,下面咱们曾经启动了 Postgres,能够跳过容器启动 Postgres 这步。
操作过程是:
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/
meltano install
touch .env
echo PG_PASSWORD="lineage_ref" >> .env
echo PG_USERNAME="lineage_ref" >> .env
# Extract and Load(with Singer)
meltano run tap-csv target-postgres
# Trasnform(with dbt)
meltano run dbt:run
# Generate dbt docs
meltano invoke dbt docs generate
# Serve generated dbt docs
meltano invoke dbt docs to serve
# Then visit http://localhost:8080
当初,咱们能够连贯到 Postgres 来查看加载和转换后的数据预览。如下所示,截图来自 VS Code 的 SQLTool。
payments 表里长这样子:
搭一个 BI Dashboard 零碎
当初,咱们的数据仓库有数据了。接下来,能够试着用下这些数据。
像仪表盘 Dashbaord 这样的 BI 工具能帮咱们从数据中取得有用的洞察。应用可视化工具 Apache Superset 能够很容易地创立和治理这些基于数据源的 Dashboard 和各式各样的图表。
本章的重点不在于 Apache Superset 自身,所以,咱们还是复用 Pat Nadolny 的 jaffle_superset 例子。
Bootstrap Meltano 和 Superset
创立一个装置了 Meltano 的 Python VENV:
mkdir .venv
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano
参考 Pat 的小抄,做一些轻微的调整:
克隆 repo,进入 jaffle_superset
我的项目:
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/jaffle_superset/
批改 meltano 配置文件,让 Superset 连贯到咱们创立的 Postgres:
vim meltano_projects/jaffle_superset/meltano.yml
这里,我将主机名更改为“10.1.1.111”,这是我以后主机的 IP。如果你是 Windows 或者 macOS 上的 Docker Desktop,这里不要批改主机名,否则就要和我一样手动改成理论地址:
--- a/meltano_projects/jaffle_superset/meltano.yml
+++ b/meltano_projects/jaffle_superset/meltano.yml
@@ -71,7 +71,7 @@ plugins:
A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-drivers
config:
database_name: my_postgres
- sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}
+ sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}
tables:
- model.my_meltano_project.customers
- model.my_meltano_project.orders
增加 Postgres 登录的信息到 .env
文件:
echo PG_USERNAME=lineage_ref >> .env
echo PG_PASSWORD=lineage_ref >> .env
装置 Meltano 我的项目,运行 ETL 工作:
meltano install
meltano run tap-csv target-postgres dbt:run
调用、启动 Superset,这里留神 ui
不是 meltano 的外部命令,而是一个配置进去的自定义行为(user-defined action):
meltano invoke superset:ui
在另一个命令行终端执行自定义的命令 load_datasources
:
meltano invoke superset:load_datasources
通过浏览器拜访 http://localhost:8088/
就是 Superset 的图形界面了:
创立一个 Dashboard
当初,咱们站在 Meltano、Postgres 的肩膀上,用 ETL 数据建一个 Dashboard 吧:
点击 + DASHBOARD
,填写仪表盘名称,再先后点击 SAVE
和 + CREATE A NEW CHART
:
在新图表(Create a new chart)视图中,抉择图表类型和数据集。在这里,我抉择了 orders
表作为数据源和 Pie Chart
图表类型:
点击 CREATE NEW CHART
后,在图表定义视图中抉择“status”的“Query”为“DIMENSIONS”,“COUNT(amount)”为“METRIC”。至此,咱们就能够看到每个订单状态散布的饼图了。
点击 SAVE
,零碎会询问应该将此图表增加到哪个 Dashboard。抉择后,单击 SAVE & GO TO DASHBOARD
。
在 Dashboard 中,咱们能够看到所有的图表。这不,你能够看到我额定增加的、用来显示客户订单数量散布的图表:
点 ···
能看到刷新率设置、下载渲染图等其余的性能。
当初,咱们有一个简略但典型的 HomeLAB 数据技术栈了,并且所有货色都是开源的!
设想一下,咱们在 CSV 中有 100 个数据集,在数据仓库中有 200 个表,并且有几个数据工程师在运行不同的我的项目,这些我的项目应用、生成不同的利用与服务、Dashbaord 和数据库。当有人想要查找、发现或者批改其中的一些表、数据集、Dashbaord 和管道,在沟通和工程方面可能都是十分不好治理的。
下面咱们提到,这个示例我的项目的 次要性能是元数据发现零碎。
元数据发现零碎
当初,须要咱们部署一个带有 NebulaGraph 和 Elasticsearch 的 Amundsen。有了 Amundsen,咱们能够在一个中央发现和治理整个数据栈中的所有元数据。
Amundsen 次要有两个局部组成:
-
元数据导入 Metadata Ingestion
- Amundsen Databuilder
-
元数据目录服务 Metadata Catalog
- Amundsen Frontend Service
- Amundsen Metadata Service
- Amundsen Search Service
它的工作原理是:利用 Databuilder
提取不同数据源的元数据,并将元数据长久化到 Metadata Service
和 Search Service
中,用户从 Frontend Service
或 Metadata Service
的 API 获取数据。
部署 Amundsen
元数据服务 Metadata Service
咱们用 docker-compose 部署一个 Amundsen 集群。因为 Amundsen 对 NebulaGraph 后端的反对 pr#1817 尚未合并,还不能用官网的代码。这里,先用我的 fork 版本。
先克隆蕴含所有子模块的 repo:
git clone -b amundsen_nebula_graph --recursive git@github.com:wey-gu/amundsen.git
cd amundsen
启动所有目录服务 catalog services 及其后端存储:
docker-compose -f docker-amundsen-nebula.yml up
因为这个 docker-compose 文件是供开发人员试玩、调试 Amundsen 用的,而不是给生产部署筹备的,它在启动的时候会从代码库构建镜像,第一次跑的时候启动会慢一些。
部署好了之后,咱们应用 Databuilder 将一些示例、虚构的数据加载存储里。
抓取元数据 Databuilder
Amundsen Databuilder 就像 Meltano 零碎一样,只不过是用在元数据的上的 ETL,它把元数据加载到 Metadata Service
和 Search Service
的后端存储:NebulaGraph 和 Elasticsearch 里。这里的 Databuilder 只是一个 Python 模块,所有的元数据 ETL 作业能够作为脚本运行,也能够用 Apache Airflow 等 DAG 平台进行编排。
装置 Amundsen Databuilder:
cd databuilder
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install wheel
python3 -m pip install -r requirements.txt
python3 setup.py install
调用这个示例数据构建器 ETL 脚本来把示例的虚构数据导进去。
python3 example/scripts/sample_data_loader_nebula.py
验证一下 Amundsen
在拜访 Amundsen 之前,咱们须要创立一个测试用户:
# run a container with curl attached to amundsenfrontend
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot
# Create a user with id test_user_id
curl -X PUT -v http://amundsenmetadata:5002/user \
-H "Content-Type: application/json" \
--data \
'{"user_id":"test_user_id","first_name":"test","last_name":"user","email":"test_user_id@mail.com"}'
exit
而后咱们能够在 http://localhost:5000
查看 UI 并尝试搜寻 test
,它应该会返回一些后果。
而后,能够单击并浏览在 sample_data_loader_nebula.py
期间加载到 Amundsen 的那些示例元数据。
此外,咱们还能够通过 NebulaGraph Studio 的地址 http://localhost:7001
拜访 NebulaGraph 里的这些数据。
下图显示了无关 Amundsen 组件的更多详细信息:
┌────────────────────────┐ ┌────────────────────────────────────────┐
│ Frontend:5000 │ │ Metadata Sources │
├────────────────────────┤ │ ┌────────┐ ┌─────────┐ ┌─────────────┐ │
│ Metaservice:5001 │ │ │ │ │ │ │ │ │
│ ┌──────────────┐ │ │ │ Foo DB │ │ Bar App │ │ X Dashboard │ │
┌────┼─┤ Nebula Proxy │ │ │ │ │ │ │ │ │ │
│ │ └──────────────┘ │ │ │ │ │ │ │ │ │
│ ├────────────────────────┤ │ └────────┘ └─────┬───┘ └─────────────┘ │
┌─┼────┤ Search searvice:5002 │ │ │ │
│ │ └────────────────────────┘ └──────────────────┼─────────────────────┘
│ │ ┌─────────────────────────────────────────────┼───────────────────────┐
│ │ │ │ │
│ │ │ Databuilder ┌───────────────────────────┘ │
│ │ │ │ │
│ │ │ ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ │
│ │ ┌──┼─► Extractor of Sources ├─► nebula_search_data_extractor │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │ │ │ Loader filesystem_csv_nebula │ │ Loader Elastic FS loader │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │ │ │ Publisher nebula_csv_publisher │ │ Publisher Elasticsearch │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ └─────────────────┼─────────────────────────────────┼─────────────────┘
│ │ └────────────────┐ │ │
│ │ ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐
│ │ │ NebulaGraph │ │ │ │ │
│ └────┼─────┬───────┴───┼───────────┐ ┌─────┐ │ │ │
│ │ │ │ │ │MetaD│ │ │ │
│ │ ┌───▼──┐ ┌───▼──┐ ┌───▼──┐ └─────┘ │ │ │
│ ┌────┼─►GraphD│ │GraphD│ │GraphD│ │ │ │
│ │ │ └──────┘ └──────┘ └──────┘ ┌─────┐ │ │ │
│ │ │ :9669 │MetaD│ │ │ Elastic │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ └─────┘ │ │ Search │
│ │ │ │ │ │ │ │ │ │ │ Cluster │
│ │ │ │StorageD│ │StorageD│ │StorageD│ ┌─────┐ │ │ :9200 │
│ │ │ │ │ │ │ │ │ │MetaD│ │ │ │
│ │ │ └────────┘ └────────┘ └────────┘ └─────┘ │ │ │
│ │ ├───────────────────────────────────────────┤ │ │
│ └────┤ Nebula Studio:7001 │ │ │
│ └───────────────────────────────────────────┘ └─────▲─────┘
└──────────────────────────────────────────────────────────┘
穿针引线:元数据发现
设置好根本环境后,让咱们把所有货色穿起来。还记得咱们有 ELT 一些数据到 PostgreSQL 吗?
那么,咱们如何让 Amundsen 发现这些数据和 ETL 的元数据呢?
提取 Postgres 元数据
咱们从数据源开始:首先是 Postgres。
咱们为 Python3 装置 Postgres 客户端:
sudo apt-get install libpq-dev
pip3 install Psycopg2
执行 Postgres 元数据 ETL
运行一个脚本来解析 Postgres 元数据:
export CREDENTIALS_POSTGRES_USER=lineage_ref
export CREDENTIALS_POSTGRES_PASSWORD=lineage_ref
export CREDENTIALS_POSTGRES_DATABASE=warehouse
python3 example/scripts/sample_postgres_loader_nebula.py
咱们看看把 Postgres 元数据加载到 NebulaGraph 的示例脚本的代码,非常简单间接:
# part 1: PostgresMetadata --> CSV --> NebulaGraph
job = DefaultJob(
conf=job_config,
task=DefaultTask(extractor=PostgresMetadataExtractor(),
loader=FsNebulaCSVLoader()),
publisher=NebulaCsvPublisher())
...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)
job = DefaultJob(conf=job_config, task=task)
第一个工作门路是:PostgresMetadata --> CSV --> NebulaGraph
PostgresMetadataExtractor
用于从 Postgres 中提取元数据,能够参考文档 https://www.amundsen.io/amundsen/databuilder/#postgresmetadataextractor。FsNebulaCSVLoader
用于将提取的数据转为 CSV 文件NebulaCsvPublisher
用于将元数据以 CSV 格局公布到 NebulaGraph
第二个工作门路是:Metadata stored in NebulaGraph --> Elasticsearch
NebulaSearchDataExtractor
用于获取存储在 NebulaGraph 中的元数据SearchMetadatatoElasticasearchTask
用于使 Elasticsearch 对元数据进行索引。
请留神,在生产环境中,咱们能够在脚本中或应用 Apache Airflow 等编排平台触发这些作业。
验证 Postgres 中元数据的获取
搜寻 payments
或者间接拜访 http://localhost:5000/table_detail/warehouse/postgres/public/payments
,你能够看到咱们 Postgres 的元数据,比方:
像下面的屏幕截图一样,咱们能够轻松实现元数据管理操作,如:增加标签、所有者和形容。
提取 dbt 元数据
其实,咱们也能够从 dbt 自身提取元数据。
Amundsen DbtExtractor 会解析 catalog.json
或 manifest.json
文件并将元数据加载到 Amundsen 存储,这里当然指的是 NebulaGraph 和 Elasticsearch。
在下面的 Meltano 章节中,咱们曾经应用 meltano invoke dbt docs generate
生成了这个文件:
14:23:15 Done.
14:23:15 Building catalog
14:23:15 Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.json
dbt 元数据 ETL 的执行
咱们试着解析示例 dbt 文件中的元数据吧:
$ ls -l example/sample_data/dbt/
total 184
-rw-rw-r-- 1 w w 5320 May 15 07:17 catalog.json
-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json
我写的这个示例的加载例子如下:
python3 example/scripts/sample_dbt_loader_nebula.py
其中次要的代码如下:
# part 1: dbt manifest --> CSV --> NebulaGraph
job = DefaultJob(
conf=job_config,
task=DefaultTask(extractor=DbtExtractor(),
loader=FsNebulaCSVLoader()),
publisher=NebulaCsvPublisher())
...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)
job = DefaultJob(conf=job_config, task=task)
它和 Postgres 元数据 ETL 的惟一区别是 extractor=DbtExtractor()
,它带有以下配置以获取无关 dbt 我的项目的以下信息:
- 数据库名称
- 目录_json
- manifest_json
job_config = ConfigFactory.from_dict({
'extractor.dbt.database_name': database_name,
'extractor.dbt.catalog_json': catalog_file_loc, # File
'extractor.dbt.manifest_json': json.dumps(manifest_data), # JSON Dumped objecy
'extractor.dbt.source_url': source_url})
验证 dbt 抓取后果
搜寻 dbt_demo
或者间接拜访 http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value
,能够看到
这里给一个小提示,其实,咱们能够抉择启用 DEBUG log 级别去看已发送到 Elasticsearch 和 NebulaGraph 的内容。
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)
或者,在 NebulaGraph Studio 中摸索导入的数据:
先点击 Start with Vertices
,并填写顶点 vid:snowflake://dbt_demo.public/fact_warehouse_inventory
咱们能够看到顶点显示为粉红色的点。再让咱们批改下 Expand
/”拓展“选项:
- 方向:双向
- 步数:单向、三步
并双击顶点(点),它将双向拓展 3 步:
像截图展现的那般,在可视化之后的图数据库中,这些元数据能够很容易被查看、剖析,并从中取得洞察。
而且,咱们在 NebulaGraph Studio 中看到的同 Amundsen 元数据服务的数据模型相响应:
最初,请记住咱们曾利用 dbt 来转换 Meltano 中的一些数据,并且清单文件门路是 .meltano/transformers/dbt/target/catalog.json
,你能够尝试创立一个数据构建器作业来导入它。
提取 Superset 中的元数据
Amundsen 的 Superset Extractor 能够获取
- Dashboard 元数据抽取,见 apache_superset_metadata_extractor.py
- 图表元数据抽取,见 apache_superset_chart_extractor.py
- Superset 元素与数据源(表)的关系抽取,见 apache_superset_table_extractor.py
来,当初试试提取之前创立的 Superset Dashboard 的元数据。
Superset 元数据 ETL 的执行
下边执行的示例 Superset 提取脚本能够获取数据并将元数据加载到 NebulaGraph 和 Elasticsearch 中。
python3 sample_superset_data_loader_nebula.py
如果咱们将日志记录级别设置为 DEBUG
,咱们实际上能够看到这些两头的过程日志:
# fetching metadata from superset
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280
INFO:databuilder.task.task:Running a task
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058
...
# insert Dashboard
DEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());
...
# insert a DASHBOARD_WITH_TABLE relationship/edge
INFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']
DEBUG:databuilder.publisher.nebula_csv_publisher:Query:
INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());
验证 Superset Dashboard 元数据
通过在 Amundsen 中搜寻它,咱们当初能够取得 Dashboard 信息。
咱们也能够从 NebulaGraph Studio 进行验证。
注:能够参阅 Dashboard 抓取指南中的 Amundsen Dashboard 图建模:
用 Superset 预览数据
Superset 能够用来预览表格数据,文档能够参考 https://www.amundsen.io/amundsen/frontend/docs/configuration/#preview-client,其中 /superset/sql_json/
的 API 被 Amundsen Frontend Service
调用,获得预览信息。
开启数据血统信息
默认状况下,数据血统是敞开的,咱们能够通过以下形式启用它:
第一步,cd
到 Amundsen 代码仓库下,这也是咱们运行 docker-compose -f docker-amundsen-nebula.yml up
命令的中央:
cd amundsen
第二步,批改 frontend 下的 TypeScript 配置
--- a/frontend/amundsen_application/static/js/config/config-default.ts
+++ b/frontend/amundsen_application/static/js/config/config-default.ts
tableLineage: {
- inAppListEnabled: false,
- inAppPageEnabled: false,
+ inAppListEnabled: true,
+ inAppPageEnabled: true,
externalEnabled: false,
iconPath: 'PATH_TO_ICON',
isBeta: false,
第三步,从新构建 Docker 镜像,其中将重建前端图像。
docker-compose -f docker-amundsen-nebula.yml build
第四步,从新运行 up -d
以确保前端用新的配置:
docker-compose -f docker-amundsen-nebula.yml up -d
后果大略长这样子:
$ docker-compose -f docker-amundsen-nebula.yml up -d
...
Recreating amundsenfrontend ... done
之后,咱们能够拜访 http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1
看到 Lineage(beta)
血统按钮曾经显示进去了:
咱们能够点击 Downstream
查看该表的上游资源:
或者点击血统按钮查看血统的图表式:
也有用于血统查问的 API。
上面这个例子中,咱们用 cURL 调用下这个 API:
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot
curl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"
下面的 API 调用是查问上游和上游方向的 linage,表 snowflake://dbt_demo.public/raw_inventory_value
的深度为 3。
后果应该是这样的:
{
"depth": 3,
"downstream_entities": [
{
"level": 2,
"usage": 0,
"key": "snowflake://dbt_demo.public/fact_daily_expenses",
"parent": "snowflake://dbt_demo.public/fact_warehouse_inventory",
"badges": [],
"source": "snowflake"
},
{
"level": 1,
"usage": 0,
"key": "snowflake://dbt_demo.public/fact_warehouse_inventory",
"parent": "snowflake://dbt_demo.public/raw_inventory_value",
"badges": [],
"source": "snowflake"
}
],
"key": "snowflake://dbt_demo.public/raw_inventory_value",
"direction": "both",
"upstream_entities": []}
实际上,这个血统数据就是在咱们的 dbtExtractor 执行期间提取和加载的,其中 extractor .dbt.{DbtExtractor.EXTRACT_LINEAGE}
默认为 true
,因而,创立了血统元数据并将其加载到了 Amundsen。
在 NebulaGraph 中洞察血统
应用图数据库作为元数据存储的两个长处是:
图查问自身是一个灵便的 DSL for lineage API,例如,这个查问帮忙咱们执行 Amundsen 元数据 API 的等价的查问:
MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)
WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p
来,在 NebulaGraph Studio 或者 Explorer 的控制台中查问下:
渲染下这个后果:
提取数据血统
这些血统信息是须要咱们明确指定、获取的,获取的形式能够是本人写 Extractor,也能够是用已有的形式。比方:dbt 的 Extractor 和 Open Lineage 我的项目的 Amundsen Extractor。
通过 dbt
这个在方才曾经展现过了,dbt 的 Extractor 会从表级别获取血统同其余 dbt 中产生的元数据信息一起被拿到。
通过 Open Lineage
Amundsen 中的另一个开箱即用的血统 Extractor 是 OpenLineageTableLineageExtractor。
Open Lineage 是一个凋谢的框架,能够将不同起源的血统数据收集到一个中央,它能够将血统信息输入为 JSON 文件,参见文档 https://www.amundsen.io/amundsen/databuilder/#openlineagetablelineageextractor。
下边是它的 Amundsen Databuilder 例子:
dict_config = {
# ...
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',
}
...
task = DefaultTask(extractor=OpenLineageTableLineageExtractor(),
loader=FsNebulaCSVLoader())
回顾
整套元数据治理 / 发现的计划思路如下:
- 将整个数据技术栈中的组件作为元数据源(从任何数据库、数仓,到 dbt、Airflow、Openlineage、Superset 等各级我的项目)
- 应用 Databuilder(作为脚本或 DAG)运行元数据 ETL,以应用 NebulaGraph 和 Elasticsearch 存储和索引
- 从前端 UI(应用 Superset 预览)或 API 去应用、生产、治理和发现元数据
- 通过查问和 UI 对 NebulaGraph,咱们能够取得更多的可能性、灵活性和数据、血统的洞察
波及到的开源
此参考我的项目中应用的所有我的项目都按字典程序在上面列出。
- Amundsen
- Apache Airflow
- Apache Superset
- dbt
- Elasticsearch
- meltano
- NebulaGraph
- Open Lineage
- Singer
谢谢你读完本文 (///▽///)
要来近距离体验一把图数据库吗?当初能够用用 NebulaGraph Cloud 来搭建本人的图数据系统哟,快来节俭大量的部署安装时间来搞定业务吧~ NebulaGraph 阿里云计算巢现 30 天收费应用中,点击链接来用用图数据库吧~
想看源码的小伙伴能够返回 GitHub 浏览、应用、(^з^)-☆ star 它 -> GitHub;和其余的 NebulaGraph 用户一起交换图数据库技术和利用技能,留下「你的名片」一起游玩呢~