本文首发于 Nebula Graph Community 公众号
1. 图计算介绍
1.1 图数据库 vs 图计算
图数据库是面向 OLTP 场景,强调增删改查,并且一个查问往往只波及到全图中的大量数据,而图计算是面向 OLAP 场景,往往是针对全图数据进行剖析计算。
1.2 图计算零碎散布架构
依照散布架构,图计算零碎分为单机和分布式。
单机图计算零碎劣势在于模型简略,无需思考分布式通信,也无需进行图切分,但受制于单机系统资源,无奈进行更大规模的图数据分析。
分布式图计算平台将图数据划分到多个机器上,从而解决更大规模的图数据,但不可避免的引入了分布式通信的开销问题。
1.3 图的划分
图划分次要有两种形式边切割(Edge- Cut)和点切割(Vertex-Cut)。
边宰割:每个点的数据只会存储在一台机器上,但有的边会被打断分到多台机器上。
如图 (a) 所示,点 A 的数据只寄存在机器 1 上,点 B 的数据只寄存在机器 2 上。对于边 AB 而言,会存储在机器 1 和机器 2 上。因为点 A 和点 B 散布在不同的机器上,在迭代计算过程中,会带来通信上的开销。
点宰割:每条边只会存储在一台机器上,但有的点有可能宰割,调配在多台机器上。
如图 (b) 所示, 边 AB 存储在机器 1 上,边 BC 存储在机器 2 上,边 CD 存储在机器 3 上,而点 B 被调配到了 1, 2 两台机器上,点 C 被调配到了 2,3 两台机器上。因为点被存储在多台机器上,保护顶点数据的一致性同样也会带来通信上的开销。
1.4 计算模型
编程模型是针对图计算利用开发者,可分为以节点为核心的编程模型、以边或门路为核心的编程模型、以子图为核心的编程模型。
计算模型是图计算零碎开发者面临的问题,次要有同步执行模型和异步执行模型。比拟常见的有 BSP 模型(Bulk Synchronous Parallel Computing Model)和 GAS 模型。
BSP 模型:BSP 模型的计算过程是由一系列的迭代步组成,每个迭代步被称为超步。采纳 BSP 模型的零碎次要有 Pregel、Hama、Giraph 等。
BSP 模型具备程度和垂直两个方面的构造。垂直上看,BSP 模型有一系列串行的超步组成。程度上看(如图所示),一个超步又分三个阶段:
- 本地计算阶段,每个处理器只对存储本地内存中的数据进行计算。
- 全局通信阶段,机器节点之间相互交换数据。
- 栅栏同步阶段,期待所有通信行为的完结。
GAS 模型:GAS 模型是在 PowerGraph 零碎提出,分为信息收集阶段(Gather)、利用阶段(Apply)和散发阶段(Scatter)。
- Gather 阶段,负责从街坊顶点收集信息。
- Apply 阶段,负责将收集的信息在本地解决,更新到顶点上。
- Scatter 阶段,负责发送新的信息给街坊顶点。
2. Gemini 图计算零碎介绍
Gemini 在工业界较有影响力,它的次要技术点包含:CSR/CSC、push/pull、master 和 mirror、稠密和浓密图、通信与计算协同工作、chunk-based 式分区、NUMA 感知的子分区等。
Gemini 采纳边切割形式将图数据依照 chunk-based 的形式分区,并反对 Numa 构造。分区后的数据,用 CSR 存储出边信息,用 CSC 存储入边信息。在迭代计算过程中,对稠密图采纳 push 的形式更新其出边街坊,对浓密图采纳 pull 的形式拉取入边街坊的信息。
如果一条边被切割,边的一端顶点为 master,另一端顶点则为 mirror。mirror 被称为占位符(placeholder),在 pull 的计算过程中,各个机器上的 mirror 顶点会拉取其入边街坊 master 顶点的信息进行一次计算,在 BSP 的计算模型下通过网络同步给其 master 顶点。在 push 的计算过程中,各个机器的 master 顶点会将其信息先同步给它的 mirror 顶点,再由 mirror 更新其出边街坊。
在 BSP 的通信阶段,每台机器 Node_i
发送给它的下一个机器 Node_i+1
,最初一个机器会发送给第一个机器。在每台机器发送的同时也会收到 Node_i-1
的信息,收到信息后会立刻执行本地计算。通信和计算的重叠能够暗藏通信工夫,晋升整体的效率。
更多细节能够参考论文《Gemini: A Computation-Centric Distributed Graph Processing System》。
3. Plato 图计算零碎与 Nebula Graph 的集成
3.1 Plato 图计算零碎介绍
Plato 是腾讯开源的基于 Gemni 论文实现的工业级图计算零碎。Plato 可运行在通用的 x86 集群,如 Kubernetes 集群、Yarn 集群等。在文件系统层面,Plato 提供了多种接口反对支流的文件系统,如 HDFS、Ceph 等等。
3.2 与 Nebula Graph 的集成
咱们基于 Plato 做了二次开发,以接入 Nebula Graph 数据源。
3.2.1 Nebula Graph 作为输出和输入数据源
减少 Plato 的数据源,反对将 Nebula Graph 作为输出和输入数据源,间接从 Nebula Graph 中读取数据进行图计算,并将计算结果间接写回到 Nebula Graph 中。
Nebula Graph 的存储层提供了针对 partition 的 scan 接口,很容易通过该接口批量扫出顶点和边数据:
ScanEdgeIter scanEdgeWithPart(std::string spaceName,
int32_t partID,
std::string edgeName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
ScanVertexIter scanVertexWithPart(std::string spaceName,
int32_t partId,
std::string tagName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
实际中,咱们首先获取指定 space 下的 partition 散布状况,并将每个 partition 的 scan 工作别离调配给 Plato 集群的各个节点上,每个节点再进一步将 partition 的 scan 任务分配给运行在该节点的各个线程上,以达到并行疾速的读取数据。图计算实现之后,将计算结果通过 Nebula client 并行写入 Nebula Graph。
3.2.2 分布式 ID 编码器
Gemini 和 Plato 的要求顶点 ID 从 0 开始间断递增,但绝大多数的实在数据顶点 ID 并不满足这个需要,尤其是 Nebula Graph 从 2.0 版本开始反对 string 类型 ID。
因而,在计算之前,咱们须要将原始的 ID 从 int 或 string 类型转换为从 0 开始间断递增的 int。Plato 外部实现了一个单机版的 ID 编码器,即 Plato 集群的每台机器均冗余存储所有 ID 的映射关系。当点的数量比拟多时,每台机器仅 ID 映射表的存储就需上百 GB 的内存,因为咱们须要实现分布式的 ID 映射器,将 ID 映射关系切成多份,离开存储。
咱们通过哈希将原始 ID 打散在不同的机器,并行地调配全局从 0 开始间断递增的 ID。生成 ID 映射关系后,每台机器都会存有 ID 映射表的一部分。随后再将边数据别离按终点和起点哈希,发送到对应的机器进行编码,最终失去的数据即为可用于计算的数据。当计算运行完结后,须要数据须要映射回业务 ID,其过程和上述也是相似的。
3.2.3 补充算法
咱们在 Plato 的根底上减少了 sssp、apsp、jaccard similarity、三角计数等算法,并为每个算法减少了输出和输入到 Nebula Graph 数据源的反对。目前反对的算法有:
文件名 | 算法名称 | 分类 |
---|---|---|
apsp.cc | 全对最短门路 | 门路 |
sssp.cc | 单源最短门路 | 门路 |
tree_stat.cc | 树深度 / 宽度 | 图特色 |
nstepdegrees.cc | n 阶度 | 图特色 |
hyperanf.cc | 图均匀间隔估算 | 图特色 |
triangle_count.cc | 三角计数 | 图特色 |
kcore.cc | 节点核心性 | |
pagerank.cc | Pagerank | 节点核心性 |
bnc.cc | Betweenness | 节点核心性 |
cnc.cc | 靠近核心性(Closeness Centrality) | 节点核心性 |
cgm.cc | 连通重量计算 | 社区发现 |
lpa.cc | 标签流传 | 社区发现 |
hanp.cc | HANP | 社区发现 |
metapath_randomwalk.cc | 图示意学习 | |
node2vec_randomwalk.cc | 图示意学习 | |
fast_unfolding.cc | louvain | 聚类 |
infomap_simple.cc | 聚类 | |
jaccard_similarity.cc | 类似度 | |
mutual.cc | 其余 | |
torch.cc | 其余 | |
bfs.cc | 广度优先遍历 | 其余 |
4. Plato 部署装置与运行
4.1 集群部署
Plato 采纳 MPI 进行过程间通信,在集群上部署 Plato 时,须要将 Plato 装置在雷同的目录下,或者应用 NFS。操作方法见:https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/
4.2 运行算法的脚本及配置文件
scripts/run_pagerank_local.sh
#!/bin/bash
PROJECT="$(cd"$(dirname "$0")"&& pwd)/.."
MAIN="./bazel-bin/example/pagerank" # process name
WNUM=3
WCORES=8
#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:='hdfs://192.168.8.149:9000/_test/output'}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true} # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}
ALPHA=-1
PART_BY_IN=false
EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}
export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}
PARAMS+="--threads ${WCORES}"
PARAMS+="--input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+="--iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"
# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk '{path=path":"$0}END{print path}'`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}
chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?
参数阐明
INPUT
参数和OUPUT
参数别离指定算法的输出数据源和输入数据源,目前反对本地 csv 文件、HDFS 文件、Nebula Graph。当输入输出数据源为 Nebula Graph 时,INPUT
和OUPUT
模式为nebula:/path/to/nebula.conf
- WNUM 为集群所有机器所运行的过程数之和,举荐每台机器运行为 1 或者 NUMA node 数个过程,WCORE 为每个过程的线程数,举荐最大设置为机器的硬件线程数。
scripts/nebula.conf
## read/write
--retry=3 # 连贯 Nebula Graph 时的重试次数
--space=sf30 # 要读取或写入的 space 名称
## read from nebula
--meta_server_addrs=192.168.8.94:9559 # Nebula Graph 的 metad 服务地址
--edge=LIKES # 要读取的边的名称
#--edge_data_field # 要读取的作为边的权重属性的名称
--read_batch_size=10000 # 每次 scan 时的 batch 的大小
## write to nebula
--graph_server_addrs=192.168.8.94:9669 # Nebula Graph 的 graphd 服务地址
--user=root # graphd 服务的登陆用户名
--password=nebula # graphd 服务的登陆密码
# insert or update
--mode=insert # 写回 Nebula Graph 时采纳的模式: insert/update
--tag=pagerank # 写回到 Nebula Graph 的 tag 名称
--prop=pr # 写回到 Nebula Graph 的 tag 对应的属性名称
--type=double # 写回到 Nebula Graph 的 tag 对应的属性的类型
--write_batch_size=1000 # 写回时的 batch 大小
--err_file=/home/plato/err.txt # 写回失败的数据所存储的文件
scripts/cluster
cluster 文件指定要运行该算法所在的集群机器的 IP
192.168.15.3
192.168.15.5
192.168.15.6
以上为 Plato 在 Nebula Graph 中的利用,目前该性能集成在 Nebula Graph 企业版中,如果你应用的是开源版本的 Nebula Graph,需依照本人的需要本人对接 Plato。
交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~
关注公众号