关于nebula:从-Neo4j-导入-Nebula-Graph-实践见-SPark-数据导入原理

28次阅读

共计 5216 个字符,预计需要花费 14 分钟才能阅读完成。

本文次要讲述如何应用数据导入工具 Nebula Graph Exchange 将数据从 Neo4j 导入到 Nebula Graph Database。在讲述如何实操数据导入之前,咱们先来理解下 Nebula Graph 外部是如何实现这个导入性能的。

Nebula Graph Exchange 的数据处理原理

咱们这个导入工具名字是 Nebula Graph Exchange,采纳 Spark 作为导入平台,来反对海量数据的导入和保障性能。Spark 自身提供了不错的形象——DataFrame,使得能够轻松反对多种数据源。在 DataFrame 的反对下,增加新的数据源只需提供配置文件读取的代码和返回 DataFrame 的 Reader 类,即可反对新的数据源。

DataFrame 能够视为一种分布式存表格。DataFrame 能够存储在多个节点的不同分区中,多个分区能够存储在不同的机器上,从而反对并行操作。Spark 还提供了一套简洁的 API 使用户轻松操作 DataFrame 如同操作本地数据集个别。当初大多数数据库提供间接将数据导出成 DataFrame 性能,即便某个数据库并未提供此性能也能够通过数据库 driver 手动构建 DataFrame。

Nebula Graph Exchange 将数据源的数据处理成 DataFrame 之后,会遍历它的每一行,依据配置文件中 fields 的映射关系,按列名获取对应的值。在遍历 batchSize 个行之后,Exchange 会将获取的数据一次性写入到 Nebula Graph 中。目前,Exchange 是通过生成 nGQL 语句再由 Nebula Client 异步写入数据,下一步会反对间接导出 Nebula Graph 底层存储的 sst 文件,以获取更好的性能。接下来介绍一下 Neo4j 数据源导入的具体实现。

Neo4j 数据导入具体实现

尽管 Neo4j 官网提供了可将数据间接导出为 DataFrame 的库,但应用它读取数据难以满足断点续传的需要,咱们未间接应用这个库,而是应用 Neo4j 官网的 driver 实现数据读取。Exchange 通过在不同分区调取 Neo4j driver 执行不同 skiplimit 的 Cypher 语句,将数据分布在不同的分区,来获取更好的性能。这个分区数量由配置项 partition 指定。

Exchange 中的 Neo4jReader 类会先将用户配置中的 exec Cypher 语句,return 后边的语句替换成 count(*) 执行获取数据总量,再依据分区数计算每个分区的起始偏移量和大小。这里如果用户配置了 check_point_path 目录,会读取目录中的文件,如果处于续传的状态,Exchange 会计算出每个分区应该的偏移量和大小。而后每个分区在 Cypher 语句后边增加不同的 skiplimit,调用 driver 执行。最初将返回的数据处理成 DataFrame 就实现了 Neo4j 的数据导入。

过程如下图所示:

Neo4j 数据导入实际

咱们这里导入演示的零碎环境如下:

  • cpu name:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
  • cpu cores:14
  • memory size:251G

软件环境如下:

  • Neo4j:3.5.20 社区版
  • Nebula graph:docker-compose 部署,默认配置
  • Spark:单机版,版本为 2.4.6 pre-build for hadoop2.7

因为 Nebula Graph 是强 schema 数据库,数据导入前需先进行创立 Space,建 Tag 和 Edge 的 schema,具体的语法能够参考这里。

这里建了名为 test 的 Space,正本数为 1。这里创立了两种 Tag 别离为 tagA 和 tagB,均含有 4 个属性的点类型,此外,还创立一种名为 edgeAB 的边类型,同样含有 4 个属性。具体的 nGQL 语句如下所示:


# 创立图空间

CREATE SPACE test(replica_factor=1);

# 抉择图空间 test

USE test;

# 创立标签 tagA

CREATE TAG tagA(idInt int, idString string, tboolean bool, tdouble double);

# 创立标签 tagB

CREATE TAG tagB(idInt int, idString string, tboolean bool, tdouble double);

# 创立边类型 edgeAB

CREATE EDGE edgeAB(idInt int, idString string, tboolean bool, tdouble double);

同时向 Neo4j 导入 Mock 数据——标签为 tagA 和 tagB 的点,数量总共为 100 万,并且导入了连贯 tagA 和 tagB 类型点边类型为 edgeAB 的边,共 1000 万个。另外须要留神的是,从 Neo4j 导出的数据在 Nebula Graph 中必须存在属性,且数据对应的类型要同 Nebula Graph 统一。

最初为了晋升向 Neo4j 导入 Mock 数据的效率和 Mock 数据在 Neo4j 中的读取效率,这里为 tagA 和 tagB 的 idInt 属性建了索引。对于索引须要留神 Exchange 并不会将 Neo4j 中的索引、束缚等信息导入到 Nebula Graph 中,所以须要用户在执行数据写入在 Nebula Graph 之后,自行创立索引和 REBUILD 索引(为已有数据建设索引)。

接下来就能够将 Neo4j 数据导入到 Nebula Graph 中了,首先咱们须要下载和编译打包我的项目,我的项目在 nebula-java 这个仓库下 tools/exchange 文件夹中。可执行如下命令:


git clone https://github.com/vesoft-inc/nebula-java.git

cd nebula-java/tools/exchange

mvn package -DskipTests

而后就能够看到 target/exchange-1.0.1.jar 这个文件。

接下来编写配置文件,配置文件的格局为:HOCON(Human-Optimized Config Object Notation),能够基于 src/main/resources/server_application.conf 文件的根底上进行更改。首先对 nebula 配置项下的 address、user、pswd 和 space 进行配置,测试环境均为默认配置,所以这里不须要额定的批改。而后进行 tags 配置,须要 tagA 和 tagB 的配置,这里仅展现 tagA 配置,tagB 和 tagA 配置雷同。


{

# ======neo4j 连贯设置 =======

name: tagA

# 必须和 Nebula Graph 的中 tag 名字统一,须要在 Nebula Graph 中当时建好 tag

server: "bolt://127.0.0.1:7687"

# neo4j 的地址配置

user: neo4j

# neo4j 的用户名

password: neo4j

# neo4j 的明码

encryption: false

# (可选): 传输是否加密,默认值为 false

database: graph.db

# (可选): neo4j database 名称,社区版不反对

# ====== 导入设置 ============

type: {

source: neo4j

# 还反对 PARQUET、ORC、JSON、CSV、HIVE、MYSQL、PULSAR、KAFKA...

sink: client

# 写入 Nebula Graph 的形式,目前仅反对 client,将来会反对间接导出 Nebula Graph 底层数据库文件

}

nebula.fields: [idInt, idString, tdouble, tboolean]

fields : [idInt, idString, tdouble, tboolean]

# 映射关系 fields,上方为 nebula 的属性名,下方为 neo4j 的属性名,一一对应

# 映射关系的配置是 List 而不是 Map,是为了放弃 fields 的程序,将来间接导出 nebula 底层存储文件时须要

vertex: idInt

# 作为 nebula vid 的 neo4j field,类型须要是整数 (long or int)。partition: 10

# 分区数

batch: 2000

# 一次写入 nebula 多少数据

check_point_path: "file:///tmp/test"

# (可选): 保留导入进度信息的目录,用于断点续传

exec: "match (n:tagA) return n.idInt as idInt, n.idString as idString, n.tdouble as tdouble, n.tboolean as tboolean order by n.idInt"

}

边的设置大部分与点的设置无异,但因为边在 Nebula Graph 中有终点的 vid 和起点的 vid 标识,所以这里须要指定作为边终点 vid 的域和作为边起点 vid 的域。

上面给出边的特地配置。


source: {

field: a.idInt

# policy: "hash"

}

# 终点的 vid 设置

target: {

field: b.idInt

# policy: "uuid"

}

# 起点的 vid 设置

ranking: idInt

# (可选): 作为 rank 的 field

partition: 1

# 这里分区数设置为 1,起因在后边

exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) return a.idInt, b.idInt, r.idInt as idInt, r.idString as idString, r.tdouble as tdouble, r.tboolean as tboolean order by id(r)"

点的 vertex 和边的 source、target 配置项下都能够设置 policy hash/uuid,它能够将类型为字符串的域作为点的 vid,通过 hash/uuid 函数将字符串映射成整数。

下面的例子因为作为点的 vid 为整数,所以并不需要 policy 的设置。hash/uuid 的 区别请看这里。

Cypher 规范中如果没有 order by 束缚的话就不能保障每次查问后果的排序统一,尽管看起来即使不加 order by Neo4j 返回的后果程序也是不变的,但为了避免可能造成的导入时数据失落,还是强烈建议在 Cypher 语句中退出 order by,尽管这会减少导入的工夫。为了晋升导入效率,order by 语句最好选取有索引的属性作为排序的属性。如果没有索引,也可察看默认的排序,抉择适合的排序属性以提高效率。如果默认的排序找不到法则,能够应用点 / 关系的 ID 作为排序属性,并且将 partition 的值尽量设小,缩小 Neo4j 的排序压力,本文中边 edgeABpartition 就设置为 1。

另外 Nebula Graph 在创立点和边时会将 ID 作为惟一主键,如果主键已存在则会笼罩该主键中的数据。所以如果将某个 Neo4j 属性值作为 Nebula Graph 的 ID,而这个属性值在 Neo4j 中是有反复的,就会导致“反复 ID”对应的数据有且只有一条会存入 Nebula Graph 中,其它的则会被笼罩掉。因为数据导入过程是并发地往 Nebula Graph 中写数据,最终保留的数据并不能保障是 Neo4j 中最新的数据。

这里还要注意下断点续传性能,在断点和续传之间,数据库不应该扭转状态,如增加数据或删除数据,且 partition 数量也不能更改,否则可能会有数据失落。

最初因为 Exchange 须要在不同分区执行不同 skiplimit 的 Cypher 语句,所以用户提供的 Cypher 语句不能含有 skiplimit 语句。

接下来就能够运行 Exchange 程序导数据了,执行如下命令:


$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local[10]" target/exchange-1.0.1.jar -c /path/to/conf/neo4j_application.conf

在上述这些配置下,导入 100 万个点用时 13s,导入 1000 万条边用时 213s,总用时是 226s。

附:Neo4j 3.5 Community 和 Nebula Graph 1.0.1 的一些比拟

Neo4j 和 Nebula Graph 在零碎架构、数据模型和拜访形式上都有一些差别,下表列举了常见的异同

作者有话说:Hi,我是李梦捷,图数据库 Nebula Graph 的研发工程师,如果你对此文有疑难,欢送来咱们的 Nebula Graph 论坛交换下心得~~

喜爱这篇文章?来来来,给咱们的 GitHub 点个 star 表激励啦~~ ????‍♂️????‍♀️ [手动跪谢]

交换图数据库技术?交个敌人,Nebula Graph 官网小助手微信:NebulaGraphbot 拉你进交换群~~

举荐浏览

  • 360 数科实际:JanusGraph 到 NebulaGraph 迁徙

正文完
 0