本文首发于 Nebula Graph Community 公众号
前言
Nebula 目前作为较为成熟的产品,曾经有着很丰盛的生态。数据导入的维度而言就曾经提供了多种抉择。有大而全的 Nebula Exchange,小而精简的 Nebula Importer, 还有为 Spark / Flink 引擎提供的 Nebula Spark Connector 和 Nebula Flink Connector。
在泛滥的导入形式中,到底哪种较为不便呢?
应用场景介绍:
-
Nebula Exchange
- 须要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
- 须要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
- 须要将大批量数据生成 Nebula Graph 能辨认的 SST 文件
-
Nebula Importer
- Importer 实用于将本地 CSV 文件的内容导入至 Nebula Graph 中
-
Nebula Spark Connector:
- 在不同的 Nebula Graph 集群之间迁徙数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁徙数据
- Nebula Graph 与其余数据源之间迁徙数据
- 联合 Nebula Algorithm 进行图计算
-
Nebula Flink Connector
- 在不同的 Nebula Graph 集群之间迁徙数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁徙数据
- Nebula Graph 与其余数据源之间迁徙数据
以上摘自 Nebula 官网文档:https://docs.nebula-graph.com.cn/2.6.2/1.introduction/1.what-is-nebula-graph/
总体来说,Exchange 大而全,能够和大部分的存储引擎联合,导入到 Nebula 中,然而须要部署 Spark 环境。
Importer 应用简略,所需依赖较少,但须要本人提前生成数据文件,配置好 schema 一劳永逸,然而不反对断点续传,适宜数据量中等。
Spark / Flink Connector 须要和流数据联合。
不同的场景抉择不同的工具,如果作为新人应用 Nebula 在导入数据时,倡议应用 Nebula Importer 工具,简略疾速上手 。
Nebula Importer 的应用
在咱们接触 Nebula Graph 初期,过后生态不够欠缺, 加上只有局部业务迁徙到 Nebula Graph 上,咱们对 Nebula Graph 数据的导入不论全量还是增量都是采纳 Hive 表推到 Kafka,生产 Kafka 批量写入 Nebula Graph 的形式。起初随着越来越多的数据和业务切换到 Nebula Graph,导入的数据效率问题愈发严厉,导入时长的减少,使得业务高峰期时依然在全量的数据导入,这是不可承受的。
针对以上问题,在尝试 Nebula Spark Connector 和 Nebula Importer 之后,由便于保护和迁徙多方面思考,采纳 Hive table -> CSV -> Nebula Server -> Nebula Importer
的形式进行全量的导入,整体耗时时长也有较大的晋升。
Nebula Importor 的相干配置
零碎环境
[root@nebula-server-prod-05 importer]# lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2499.998
BogoMIPS: 4999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Disk:SSD
Memory: 128G
集群环境
- Nebula Version:v2.6.1
- 部署形式:RPM
- 集群规模:三正本,六节点
数据规模
+---------+--------------------------+-----------+
| "Space" | "vertices" | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges" | 722490436 |
+---------+--------------------------+-----------+
Importer 配置
# Graph 版本,连贯 2.x 时设置为 v2。version: v2
description: Relation Space import data
# 是否删除长期生成的日志和谬误数据文件。removeTempFiles: false
clientSettings:
# nGQL 语句执行失败的重试次数。retry: 3
# Nebula Graph 客户端并发数。concurrency: 5
# 每个 Nebula Graph 客户端的缓存队列大小。channelBufferSize: 1024
# 指定数据要导入的 Nebula Graph 图空间。space: Relation
# 连贯信息。connection:
user: root
password: ******
address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669
postStart:
# 配置连贯 Nebula Graph 服务器之后,在插入数据之前执行的一些操作。commands: |
# 执行上述命令后到执行插入数据命令之间的距离。afterPeriod: 1s
preStop:
# 配置断开 Nebula Graph 服务器连贯之前执行的一些操作。commands: |
# 谬误等日志信息输入的文件门路。logPath: /mnt/csv_file/prod_relation/err/test.log
....
因为篇幅 只展现些全局相干的配置,点边相干的配置较多,不再开展,详情能够参考 GitHub。
设置 Crontab,Hive 生成表之后传输到 Nebula Server,在夜间流量较低的时候跑起 Nebula Importer 工作:
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log
总共耗时 2h,在六点左右实现全量数据的导入。
局部 log 如下,导入速度最高维持在 200,000/s 左右:
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)
而后在七点,依据工夫戳,从新生产 Kafka 导入当天凌晨到七点的增量数据, 避免 T+1 的全量数据笼罩当天的增量数据。
50 07 15 * * python3 /mnt/code/consumer_by_time/relation_consumer_by_timestamp.py
增量的生产大略耗时 10-15min。
实时性
依据 MD5 比照之后失去的增量数据,导入 Kafka 中,实时生产 Kafka 的数据,确保数据的提早不超过 1 分钟。
另外长时间的实时可能会有非预期的数据问题呈现而未发现,所以每 30 天会导入一次全量数据,下面介绍的 Importer 导入。而后给 Space 的点边增加 TTL=35 天确保未及时更新的数据会被 Filter 和后续回收。
一些留神点
论坛帖子 https://discuss.nebula-graph.com.cn/t/topic/361 这里提到了对于 CSV 导入常遇到的问题,大家能够参考下。另外依据教训这边有几点倡议:
- 对于并发度,问题中有提到,这个 concurrency 指定为你的 cpu cores 就能够, 示意起多少个 client 连贯 Nebula Server。实际操作中,要去 trade off 导入速度和服务器压力的影响。在咱们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,不倡议一下把并发拉太高,能够依据理论业务测试下做衡量。
- Importer 并不能断点续传,如果呈现谬误,须要手动解决。在实际操作中,咱们会程序剖析 Importer 的 log,依据状况解决,如果哪局部数据呈现了非预期的谬误,会告警告诉,人工染指,避免出现意外。
- Hive 生成表之后传输到 Nebula Server,这部分工作 理论耗时是和 Hadoop 资源状况密切相关的,有可能会呈现资源不够导致 Hive 和 CSV 表生成工夫滞缓,而 Importer 失常在跑的状况,这部分须要提前做好预判。咱们这边是依据 hive 工作完结工夫和 Importer 工作开始工夫做比照,判断是否须要 Importer 的过程失常运行。
交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~