关于数据库:Nebula-Importer-数据导入实践

29次阅读

共计 4665 个字符,预计需要花费 12 分钟才能阅读完成。

本文首发于 Nebula Graph Community 公众号

前言

Nebula 目前作为较为成熟的产品,曾经有着很丰盛的生态。数据导入的维度而言就曾经提供了多种抉择。有大而全的 Nebula Exchange,小而精简的 Nebula Importer, 还有为 Spark / Flink 引擎提供的 Nebula Spark Connector 和 Nebula Flink Connector。

在泛滥的导入形式中,到底哪种较为不便呢?

应用场景介绍:

  • Nebula Exchange

    • 须要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
    • 须要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
    • 须要将大批量数据生成 Nebula Graph 能辨认的 SST 文件
  • Nebula Importer

    • Importer 实用于将本地 CSV 文件的内容导入至 Nebula Graph 中
  • Nebula Spark Connector:

    • 在不同的 Nebula Graph 集群之间迁徙数据
    • 在同一个 Nebula Graph 集群内不同图空间之间迁徙数据
    • Nebula Graph 与其余数据源之间迁徙数据
    • 联合 Nebula Algorithm 进行图计算
  • Nebula Flink Connector

    • 在不同的 Nebula Graph 集群之间迁徙数据
    • 在同一个 Nebula Graph 集群内不同图空间之间迁徙数据
    • Nebula Graph 与其余数据源之间迁徙数据

以上摘自 Nebula 官网文档:https://docs.nebula-graph.com.cn/2.6.2/1.introduction/1.what-is-nebula-graph/

总体来说,Exchange 大而全,能够和大部分的存储引擎联合,导入到 Nebula 中,然而须要部署 Spark 环境。

Importer 应用简略,所需依赖较少,但须要本人提前生成数据文件,配置好 schema 一劳永逸,然而不反对断点续传,适宜数据量中等。

Spark / Flink Connector 须要和流数据联合。

不同的场景抉择不同的工具,如果作为新人应用 Nebula 在导入数据时,倡议应用 Nebula Importer 工具,简略疾速上手

Nebula Importer 的应用

在咱们接触 Nebula Graph 初期,过后生态不够欠缺, 加上只有局部业务迁徙到 Nebula Graph 上,咱们对 Nebula Graph 数据的导入不论全量还是增量都是采纳 Hive 表推到 Kafka,生产 Kafka 批量写入 Nebula Graph 的形式。起初随着越来越多的数据和业务切换到 Nebula Graph,导入的数据效率问题愈发严厉,导入时长的减少,使得业务高峰期时依然在全量的数据导入,这是不可承受的。

针对以上问题,在尝试 Nebula Spark Connector 和 Nebula Importer 之后,由便于保护和迁徙多方面思考,采纳 Hive table -> CSV -> Nebula Server -> Nebula Importer 的形式进行全量的导入,整体耗时时长也有较大的晋升。

Nebula Importor 的相干配置

零碎环境

[root@nebula-server-prod-05 importer]# lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    2
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 85
Model name:            Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping:              7
CPU MHz:               2499.998
BogoMIPS:              4999.99
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              1024K
L3 cache:              36608K
NUMA node0 CPU(s):     0-15

Disk:SSD
Memory: 128G

集群环境

  • Nebula Version:v2.6.1
  • 部署形式:RPM
  • 集群规模:三正本,六节点

数据规模

+---------+--------------------------+-----------+
| "Space" | "vertices"               | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges"                  | 722490436 |
+---------+--------------------------+-----------+

Importer 配置

# Graph 版本,连贯 2.x 时设置为 v2。version: v2

description: Relation Space import data

# 是否删除长期生成的日志和谬误数据文件。removeTempFiles: false

clientSettings:

  # nGQL 语句执行失败的重试次数。retry: 3

  # Nebula Graph 客户端并发数。concurrency: 5

  # 每个 Nebula Graph 客户端的缓存队列大小。channelBufferSize: 1024

  # 指定数据要导入的 Nebula Graph 图空间。space: Relation

  # 连贯信息。connection:
    user: root
    password: ******
    address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669

  postStart:
    # 配置连贯 Nebula Graph 服务器之后,在插入数据之前执行的一些操作。commands: |

    # 执行上述命令后到执行插入数据命令之间的距离。afterPeriod: 1s

  preStop:
    # 配置断开 Nebula Graph 服务器连贯之前执行的一些操作。commands: |

# 谬误等日志信息输入的文件门路。logPath: /mnt/csv_file/prod_relation/err/test.log
....

因为篇幅 只展现些全局相干的配置,点边相干的配置较多,不再开展,详情能够参考 GitHub。

设置 Crontab,Hive 生成表之后传输到 Nebula Server,在夜间流量较低的时候跑起 Nebula Importer 工作:

50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log

总共耗时 2h,在六点左右实现全量数据的导入。

局部 log 如下,导入速度最高维持在 200,000/s 左右:

2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)

而后在七点,依据工夫戳,从新生产 Kafka 导入当天凌晨到七点的增量数据, 避免 T+1 的全量数据笼罩当天的增量数据。

50 07 15 * * python3  /mnt/code/consumer_by_time/relation_consumer_by_timestamp.py

增量的生产大略耗时 10-15min。

实时性

依据 MD5 比照之后失去的增量数据,导入 Kafka 中,实时生产 Kafka 的数据,确保数据的提早不超过 1 分钟。

另外长时间的实时可能会有非预期的数据问题呈现而未发现,所以每 30 天会导入一次全量数据,下面介绍的 Importer 导入。而后给 Space 的点边增加 TTL=35 天确保未及时更新的数据会被 Filter 和后续回收。

一些留神点

论坛帖子 https://discuss.nebula-graph.com.cn/t/topic/361 这里提到了对于 CSV 导入常遇到的问题,大家能够参考下。另外依据教训这边有几点倡议:

  1. 对于并发度,问题中有提到,这个 concurrency 指定为你的 cpu cores 就能够, 示意起多少个 client 连贯 Nebula Server。实际操作中,要去 trade off 导入速度和服务器压力的影响。在咱们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,不倡议一下把并发拉太高,能够依据理论业务测试下做衡量。
  2. Importer 并不能断点续传,如果呈现谬误,须要手动解决。在实际操作中,咱们会程序剖析 Importer 的 log,依据状况解决,如果哪局部数据呈现了非预期的谬误,会告警告诉,人工染指,避免出现意外。
  3. Hive 生成表之后传输到 Nebula Server,这部分工作 理论耗时是和 Hadoop 资源状况密切相关的,有可能会呈现资源不够导致 Hive 和 CSV 表生成工夫滞缓,而 Importer 失常在跑的状况,这部分须要提前做好预判。咱们这边是依据 hive 工作完结工夫和 Importer 工作开始工夫做比照,判断是否须要 Importer 的过程失常运行。

交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~

正文完
 0