共计 5824 个字符,预计需要花费 15 分钟才能阅读完成。
摘要:次要介绍如何通过官网 ETL 工具 Exchange 将业务线上数据从 Neo4j 间接导入到 Nebula Graph 以及在导入过程中遇到的问题和优化办法。
本文首发于 Nebula 论坛:https://discuss.nebula-graph.com.cn/t/topic/2044
1 背景
随着业务数据量一直增长,业务对图数据库在线数据实时更新写入和查问的效率要求也一直减少。Neo4j 存在显著性能有余,Neo4j 社区开源版本只反对单机部署,扩大能力存在比拟大的问题,无奈满足读写性能的线性扩大以及读写拆散的业务需要,并且开源版本 Neo4j 对点和边的总数据量也有限度;而 Neo4j 企业版因果集群也存在单机主节点 Cypher 实时写入的性能瓶颈。
相比于 Neo4j,Nebula Graph 最大的特色便是采纳 shared-nothing 分布式的架构,无单主写入瓶颈问题,读写反对线性扩大,善于解决千亿节点、万亿条边的超大规模数据集。
本文次要介绍如何通过官网 ETL 工具 Exchange 将业务线上数据从 Neo4j 间接导入到 Nebula Graph 以及在导入过程中遇到的问题和优化办法。其中绝大部分问题都曾经通过论坛发帖的形式失去社区的反对和解决,本文会联合问题进行逐个列举。
2 部署环境
零碎环境:
- CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
- CPU Cores:40
- Memory Size:376 GB
- Disk:HDD
- System:CentOS Linux release 7.4.1708 (Core)
软件环境:
- Neo4j:3.4 版本,五节点因果集群
-
Nebula Graph:
- 版本:nebula-graph v1.1.0 源码编译装置,
- 部署:单台服务器部署三节点 Nebula Graph 集群。
- Exchange:nebula-java v1.1.0 源码编译 jar 包
-
数仓环境:
- hadoop-2.7.4
- spark-2.3.1
留神:单台机器部署 Nebula 多节点的端口调配:每个 storage 还会将用户配置的端口号 + 1 的端口作为外部应用。请参考论坛帖子 nebula 从 neo4j 导入数据呈现 Get UUID Failed 谬误
3 全量 & 增量数据导入
3.1 全量导入
依据 Neo4j 点和边的属性信息创立 Nebula Graph 的 Tag 和 Edge 构造,这里须要留神一点,业务可能会依据不同需要只在局部点和边上减少 Neo4j 点和边的属性信息,其余点和边对应的属性为 NULL
,所以须要先跟业务明确一下点和边的全副属性信息,防止脱漏属性。Nebula Graph 的 Schema 信息相似 MySQL,反对 Create 和 Alter 增加属性,并且所有的 Tag 和 Edge 的元数据信息是统一的。
1、Nebula Graph 创立 Tag 和 Edge
# 示例
# 创立图空间,10 个分区,3 个 storage 正本。CREATE SPACE test(partition_num=10,replica_factor=3);
# 抉择图空间 test
USE test;
# 创立标签 tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# 创立标签 tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# 创立边类型 edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);
2、Exchange 导入配置文件
- Exchange 配置目前不反对
bolt+routing
的形式连贯 neo4j,如果是因果集群,能够抉择一个从节点进行bolt
形式直连读取数据,缩小集群压力。 - 咱们业务的 Neo4j 数据点和边的 vid 是 string 类型,Nebula v1.x 版本还不反对 string 间接当做 vid(v2.0 反对),思考到官网文档中的形容:“当点数量达到十亿级别时,用 hash 函数生成 vid 有肯定的抵触概率。因而 Nebula Graph 提供 UUID 函数来防止大量点时的 vid 抵触。”抉择了 uuid() 作为转化函数,然而导入效率要比 hash 低,而且 uuid() 在将来版本可能存在兼容问题。
- partition: 是指 Exchange 从 Neo4j 拉取数据的分页个数。
- batch: 是指批量插入 Nebula 的 batch 大小。
{
# Spark relation config
spark: {
app: {name: Spark Writer}
driver: {
cores: 1
maxResultSize: 1G
}
cores {max: 16}
}
# Nebula Graph relation config
nebula: {
address:{graph:["xxx.xxx.xxx.xx:3699"]
meta:["xxx.xxx.xxx.xx:45500"]
}
user: user
pswd: password
space: test
connection {
timeout: 3000
retry: 3
}
execution {retry: 3}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing tags
tags: [
# Loading tag from neo4j
{
name: tagA
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
fields: [vid, field-a0, field-a1, field-a2]
nebula.fields: [vid, field-a0, field-a1, field-a2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
# Loading tag from neo4j
{
name: tagB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
fields: [vid, field-b0, field-b1, field-b2]
nebula.fields: [vid, field-b0, field-b1, field-b2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
# Processing edges
edges: [
# Loading edges from neo4j
{
name: edgeAB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
fields: [vid, field-e0, field-e1, field-e2]
nebula.fields: [vid, field-e0, field-e1, field-e2]
source: {
field: a.vid
policy: "uuid"
}
target: {
field: b.vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
}
3、执行导入命令
nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &
4、查看导入 Nebula Graph 的数据量
./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB
留神:Nebula 1.x 版本目前还只能用 db_dump 统计,2.0 会反对 nGQL 命令的形式统计数量。
3.2 增量导入
增量数据导入次要是通过 Neo4j 外部点和边的自增 id()
进行切割,在导入配置文件 exec 项执行 Neo4j Cypher 语句时减少 id()
范畴限度,但前提是须要业务停掉删数据操作,因为增量导入时,如果之前的数据被删除后 Neo4j 会复用 id()
,这会导致复用 id()
的增量数据导入时查问不到造成数据失落。 当然业务如果有条件反对 Neo4j Nebula 双写的话,增量导入就不会呈现这种问题 。
exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"
请参考论坛帖子 neo4j 到 nebula 如何做增量导入
3.3 导入问题及解决
应用 Exchange 导入过程中遇到两个问题,及时的失去官网 @nicole 的反对和解决,具体请参考上面两个帖子:
- nebula 从 neo4j 导入数据,局部属性带回车,拼 insert 报错,有什么好方法解决吗?
- 应用 Exchange 从 neo4j 导入 nebula,label 中有些顶点的属性值是 null,导致导入失败
问题 1:Exchange 不反对「换行回车」等特殊字符的本义。如下 string 数据中带有回车,在拼接 insert
语句插入时会因为换行导致插入失败。
PR:https://github.com/vesoft-inc… 曾经合入 exchange v1.0 分支
问题 2:Exchange 不反对属性为 NULL
的数据导入。前文 3.1 中提到,业务可能会依据不同需要为某些点和边减少属性,这时其余点和边属性则是 NULL
,这样在应用 Exchange 导入时会报错。
参考帖子 2 给出的批改倡议解决:批改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue
,减少 NULL
类型的转化值。
case NullType => {fieldTypeMap(field) match {
case StringType => ""
case IntegerType => 0
case LongType => 0L
case DoubleType => 0.0
case BooleanType => false
}
}
4 导入效率优化
对于导入效率的优化,请参考上面两个帖子:
- 对于应用 Exchange 从 neo4j 导入 nebula 的性能问题
- [应用 exchange 并发 spark-submit –master“local\[16\]”报错 ](https://discuss.nebula-graph….
优化 1:通过适当减少导入配置中的 partition 和 batch 值,晋升导入效率。
优化 2:如果是 string 类型做 vid 的话,1.x 版本尽量应用 hash() 函数转化,2.0 版本会反对 string id 类型;如果是 int 类型做 vid 的话,能够间接应用,不必转化效率更高。
优化 3:官网倡议 spark-submit 提交命令 master 配置改为 yarn-cluster
,若不应用 yarn,可配置成 spark://ip:port
;咱们是通过 spark-submit --master "local[16]"
的形式减少 spark 并发,导入效率比应用 "local"
晋升 4 倍 +,测试环境单机三节点 HDD 盘 IO 峰值能到 200-300 MB/s。但在指定 --master "local[16]"
并发导入时遇到 hadoop 缓存问题,采纳减少 hdfs 配置 fs.hdfs.impl.disable.cache=true
后重启 hadoop 解决。具体请参考第二个帖子。
5 总结
应用 Exchange 从 Neo4j 导入 Nebula Graph 过程中遇到一些问题,通过踊跃与社区进行沟通失去了官网 @nicole 及其他小伙伴的疾速响应和大力支持,这一点在 Neo4j 导入 Nebula Graph 的实际过程中起到了非常要害的作用,感激社区的大力支持。期待反对 openCypher 的 Nebula Graph 2.0。
6 参考链接
- https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
- https://github.com/vesoft-inc/nebula-java/tree/v1.0
- https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
- http://arganzheng.life/hadoop-filesystem-closed-exception.html
举荐浏览
- 在 Spark 数据导入中的一些实际细节
- Neo4j 导入 Nebula Graph 的实现原理与实际