作者:张武科

概述

严密核心度(Closeness Centrality)计量了一个节点到其余所有节点的紧密性,即该节点到其余节点的间隔的倒数;节点对应的值越高示意紧密性越好,可能在图中流传信息的能力越强,可用以掂量信息流入或流出该节点的能力,多用与社交网络中要害节点挖掘等场景。

算法介绍

对于图中一个给定节点,紧密性核心性是该节点到其余所有节点的最小间隔和的倒数:

其中,u示意待计算严密核心度的节点,d(u, v)示意节点u到节点v的最短门路间隔;理论场景中,更多地应用归一化后的严密核心度,即计算指标节点到其余节点的均匀间隔的倒数:

其中,n示意图中节点数。

算法实现

首先,基于AlgorithmUserFunction接口实现ClosenessCentrality,如下:

@Description(name = "closeness_centrality", description = "built-in udga for ClosenessCentrality")public class ClosenessCentrality implements AlgorithmUserFunction<Long, Long> {    private AlgorithmRuntimeContext context;    private long sourceId;    @Override    public void init(AlgorithmRuntimeContext context, Object[] params) {        this.context = context;        if (params.length != 1) {            throw new IllegalArgumentException("Only support one arguments, usage: func(sourceId)");        }        this.sourceId = ((Number) params[0]).longValue();    }    @Override    public void process(RowVertex vertex, Iterator<Long> messages) {        List<RowEdge> edges = context.loadEdges(EdgeDirection.OUT);        if (context.getCurrentIterationId() == 1L) {            context.sendMessage(vertex.getId(), 1L);            context.sendMessage(sourceId, 1L);        } else if (context.getCurrentIterationId() == 2L) {            context.updateVertexValue(ObjectRow.create(0L, 0L));            if (vertex.getId().equals(sourceId)) {                long vertexNum = -2L;                while (messages.hasNext()) {                    messages.next();                    vertexNum++;                }                // 统计节点数                context.updateVertexValue(ObjectRow.create(0L, vertexNum));                // 向邻接点发送与指标点间隔                sendMessageToNeighbors(edges, 1L);            }        } else {            if (vertex.getId().equals(sourceId)) {                long sum = (long) vertex.getValue().getField(0, LongType.INSTANCE);                while (messages.hasNext()) {                    sum += messages.next();                }                long vertexNum = (long) vertex.getValue().getField(1, LongType.INSTANCE);                // 记录最短距离和                context.updateVertexValue(ObjectRow.create(sum, vertexNum));            } else {                if (((long) vertex.getValue().getField(1, LongType.INSTANCE)) < 1) {                    Long meg = messages.next();                    context.sendMessage(sourceId, meg);                    // 向邻接点发送与指标点间隔                    sendMessageToNeighbors(edges, meg + 1);                    // 标记该点已向指标点发送过音讯                    context.updateVertexValue(ObjectRow.create(0L, 1L));                }            }        }    }    private void sendMessageToNeighbors(List<RowEdge> outEdges, Object message) {        for (RowEdge rowEdge : outEdges) {            context.sendMessage(rowEdge.getTargetId(), message);        }    }    @Override    public void finish(RowVertex vertex) {        if (vertex.getId().equals(sourceId)) {            long len = (long) vertex.getValue().getField(0, LongType.INSTANCE);            long num = (long) vertex.getValue().getField(1, LongType.INSTANCE);            context.take(ObjectRow.create(vertex.getId(), (double) num / len));        }    }    @Override    public StructType getOutputType() {        return new StructType(            new TableField("id", LongType.INSTANCE, false),            new TableField("cc", DoubleType.INSTANCE, false)        );    }}

而后,可在 DSL 中引入自定义算法,间接调用应用,语法模式如下:

CREATE Function closeness_centrality AS 'com.antgroup.geaflow.dsl.udf.ClosenessCentrality';INSERT INTO tbl_resultCALL closeness_centrality(1) YIELD (vid, ccValue)RETURN vid, ROUND(ccValue, 3);

示例示意,计算图中 id = 1节点的严密核心度。

算法运行

在运行算法之前,要结构算法运行的底图数据。

图定义

首先,进行图定义:

CREATE GRAPH modern (    Vertex person (      id bigint ID,      name varchar,      age int    ),    Vertex software (      id bigint ID,      name varchar,      lang varchar    ),    Edge knows (      srcId bigint SOURCE ID,      targetId bigint DESTINATION ID,      weight double    ),    Edge created (      srcId bigint SOURCE ID,      targetId bigint DESTINATION ID,      weight double    )) WITH (    storeType='rocksdb',    shardNum = 1);

图构建

实现图定义之后,导入点边数据,结构数据底图:

CREATE TABLE modern_vertex (  id varchar,  type varchar,  name varchar,  other varchar) WITH (  type='file',  geaflow.dsl.file.path = 'resource:///data/modern_vertex.txt');CREATE TABLE modern_edge (  srcId bigint,  targetId bigint,  type varchar,  weight double) WITH (  type='file',  geaflow.dsl.file.path = 'resource:///data/modern_edge.txt');INSERT INTO modern.personSELECT cast(id as bigint), name, cast(other as int) as ageFROM modern_vertex WHERE type = 'person';INSERT INTO modern.softwareSELECT cast(id as bigint), name, cast(other as varchar) as langFROM modern_vertex WHERE type = 'software';INSERT INTO modern.knowsSELECT srcId, targetId, weightFROM modern_edge WHERE type = 'knows';INSERT INTO modern.createdSELECT srcId, targetId, weightFROM modern_edge WHERE type = 'created';

计算输入

最初,在底图数据上实现算法计算和后果输入;

CREATE TABLE tbl_result (  vid int,    ccValue double) WITH (    type='file',    geaflow.dsl.file.path='/tmp/result');CREATE Function closeness_centrality AS 'com.antgroup.geaflow.dsl.udf.ClosenessCentrality';USE GRAPH modern;INSERT INTO tbl_resultCALL closeness_centrality(1) YIELD (vid, ccValue)RETURN vid, ROUND(ccValue, 3);

运行示例

  • input

    // vertex1,person,marko,292,person,vadas,273,software,lop,java4,person,josh,325,software,ripple,java6,person,peter,35// edge1,3,created,0.41,2,knows,0.51,4,knows,1.04,3,created,0.44,5,created,1.03,6,created,0.2
  • output

    // result1,0.714

结语

在本篇文章中咱们介绍了如何在TuGraph Analytics上实现严密核心度算法,如果你感觉比拟乏味,欢送关注咱们的社区(https://github.com/TuGraph-family/tugraph-analytics)。开源不易,如果你感觉还不错,能够给咱们star反对一下~


GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢送大家关注!!!

欢送给咱们 Star 哦!

Welcome to give us a Star!

GitHubhttps://github.com/TuGraph-family/tugraph-analytics

更多精彩内容,关注咱们的博客 https://geaflow.github.io/