乐趣区

关于风险控制:基于图数据库-NebulaGraph-实现的欺诈检测方案及代码示例

本文是一个基于 NebulaGraph 图算法、图数据库、机器学习、GNN 的 Fraud Detection 办法综述。在浏览本文理解欺诈检测的根本实现办法之余,也能够在我给大家筹备的 Playground 上跑下数据。

上面进入本次图数据库的欺诈检测实际:

建设反欺诈图谱

欺诈检测实际的第一步,是面向关联关系对现有的历史数据、标注信息进行属性图建模。一般来说,这种原始数据是由多个表构造中的银行、电子商务或者保险行业里的交易事件记录、用户数据和风控标注组成,而建模过程就是形象出咱们关怀的实体、实体间的关联关系及其中有意义的属性。

一般来说,自然人、公司实体、电话号码、地址、设施(比方终端设备、网络地址、终端设备所连贯的 Wi-Fi SSID 等)、订单都是实体。其余信息,比方:危险标注(是否高风险、危险形容等)、自然人和公司实体的信息(职业、支出、学历等)都作为实体的属性来建模。

下图是一个贷款反欺诈的示例建模,你能够拜访 https://github.com/wey-gu/fraud-detection-datagen 获取这份数据生成器代码和示例数据。

图数据库查问辨认危险

有了一张囊括了人、公司、历史贷款申请记录、电话、线上申请的网络设备的图谱,咱们能够开掘一些有意思的信息。

事实上,很多被发现且被无效阻止,防止损失的骗保行为都是具备群体汇集性的。比方:欺诈团伙可能是一小批人(比方:3 到 5 人)有组织地收集更大规模的身份证信息(比方:30 张),同时申请多个金融机构的大量贷款。在放款后,团伙抛弃这批留下了守约记录的身份证,再抉择下一批身份证如法炮制。

这种团伙作案的形式因为利用了大量新的身份信息,齐全利用历史记录中的黑名单去躲避危险的形式是有效的。不过,从关联关系的角度登程,这些模式是肯定水平上能够被及时辨认进去的。

这些能够被辨认出的欺诈行为,我把它分成两种:

一种是 风控专家能够间接用某种模式来形容的,例如:和曾经被标注为高风险的实体有间接或者间接的关联关系,像是新订单申请人应用了和过往高风险记录雷同的网络设备。这种模式对应到图谱中,通过一个图查问就能够实时给出后果。

另一种是 隐含在数据的关联关系背地,须要通过图算法开掘得出的危险提醒,例如:给定的实体与无限的标注高风险实体没有匹配的关联,然而它在图中造成了汇集性。这就提醒咱们,这可能是一个尚未得手、进行中的团伙贷款欺骗贷款申请。这种状况下,能够通过定期在历史数据中批量执行社区发现算法得出,并在高汇集社区中利用核心性算法给出外围实体,一并提醒给危险专家进行后续评估和危险标注。

基于图谱与专家图模式匹配的欺诈检测示例

在开始之前,咱们利用 Nebula-UP 来一键部署一套 NebulaGraph 图数据库:

curl -fsSL nebula-up.siwei.io/install.sh | bash

首先,咱们把后面建模的图谱加载到 NebulaGraph 里:

# 克隆数据集代码仓库
git clone https://github.com/wey-gu/fraud-detection-datagen.git
cp -r data_sample_numerical_vertex_id data
# 去掉表头
sed -i '1d' data/*.csv
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}:/root/ \
    -v ${PWD}/data/:/data \
    vesoft/nebula-importer:v3.1.0 \
    --config /root/nebula_graph_importer.yaml

有了这样一个图谱,风控专家能够在可视化摸索工具 NebulaGraph Studio 中按需摸索实体之间的关系,绘制相应的危险模式:

如上所示,咱们能够显著看到一个群控设施的危险模式,这个模式能够被交给图数据库开发者,形象成能够被风控利用定期、实时查问的语句:

## 针对一笔交易申请关联查问
MATCH (n) WHERE id(n) == "200000010265"
OPTIONAL MATCH p_shared_d=(n)-[:used_device]->(d)<-[:used_device]-(:applicant)-[:with_phone_num]->(pn:phone_num)<-[e:with_phone_num]-(:applicant)
RETURN p_shared_d

咱们能够很容易在此模型之上,通过批改返回的关联设施计数,作为动向指标查问的判断 API:

## 群控指标
MATCH (n) WHERE id(n) == "200000010265"
OPTIONAL MATCH p_shared_d=(n)-[:used_device]->(d)<-[:used_device]-(:applicant)-[:with_phone_num]->(pn:phone_num)<-[e:with_phone_num]-(:applicant)
RETURN count(e)

基于此,咱们利用无限的标注数据和专家资源,能够建设一个绝对无效的风控系统,去更高效地管制团伙欺诈作案危险。

另一个利用标注危险节点的查问是找到相关联节点高风险属性的数量:

MATCH p_=(p:applicant)-[*1..2]-(p2:applicant) WHERE id(p)=="200000014810" AND p2.applicant.is_risky == "True" RETURN p_ LIMIT 100

能够从这个门路查问看到 200000014810 相连接的申请人中有不少是高风险的(也能看出汇集的设施们)。

因而,咱们能够定义相连高风险点数量为一个指标:

MATCH (p:applicant)-[*1..2]-(p2:applicant) WHERE id(p)=="200000014810" AND p2.applicant.is_risky == "True" RETURN count(p2)

然而,在事实利用中,大多数的标注数据的获取还是过于低廉,那么有没有什么办法是更无效利用无限的危险标注和图构造来预测出危险的呢?

答案是必定的。

利用图裁减标注

Xiaojin Z. 和 Zoubin G. 在论文:Learning from Labeled and Unlabeled Data with Label Propagation(CMU-CALD-02-107)中,利用标签流传 Label Propagation 算法把无限的标注信息在图上通过关联关系流传到更多实体中。

在标签流传算法的加持下,咱们建设的图谱能够很容易地借助无限的高风险标注,去“流传”产生更多的标注信息。这些扩大进去的标注信息一方面能够在实时的图查问中给出更多的后果,另一方面,它还能作为风控专家重要的输出信息,帮忙推动反欺诈考察口头的发展。

一般来说,咱们能够通过定期离线地全图扫描数据,再用图算法裁减、更新标注,最初将无效的更新标注写回到图谱之中。

相似的办法还有 SinDiffusion,感兴趣的同学能够去理解一下。

图算法裁减欺诈危险标注

上面,我给出一个能够跑通的示例。在这个例子中,我用到了经典欺骗辨认数据集 Yelp。这份数据不只会用在这个例子中,后边 GNN 办法中的示例也会用到,所以大家能够释怀把数据导入 NebulaGraph。

数据生成导入的办法在这里:https://github.com/wey-gu/nebulagraph-yelp-frauddetection

cd ~
git clone https://github.com/wey-gu/nebulagraph-yelp-frauddetection
cd nebulagraph-yelp-frauddetection
python3 -m pip install -r requirements.txt
python3 data_download.py

# 导入图库
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}/yelp_nebulagraph_importer.yaml:/root/importer.yaml \
    -v ${PWD}/data:/root \
    vesoft/nebula-importer:v3.1.0 \
    --config /root/importer.yaml

完结之后,咱们能够看一下图上的统计:

~/.nebula-up/console.sh -e "USE yelp; SHOW STATS"

咱们能够看到:

(root@nebula) [(none)]> USE yelp; SHOW STATS
+---------+---------------------------------------+---------+
| Type    | Name                                  | Count   |
+---------+---------------------------------------+---------+
| "Tag"   | "review"                              | 45954   |
| "Edge"  | "shares_restaurant_in_one_month_with" | 1147232 |
| "Edge"  | "shares_restaurant_rating_with"       | 6805486 |
| "Edge"  | "shares_user_with"                    | 98630   |
| "Space" | "vertices"                            | 45954   |
| "Space" | "edges"                               | 8051348 |
+---------+---------------------------------------+---------+
Got 6 rows (time spent 1911/4488 us)

目前,市面上的 LPA 标签流传算法都是用来做社区检测的,很少有实现是用来做标签拓展的(只有 SK-Learn 中有这个实现)。这里,咱们参考 Thibaud M 给出的实现。

为了让这个算法跑的快一点,咱们会从 NebulaGraph 里取一个点的子图。在这个小的子图上做标注的裁减:

咱们先启动一个 Jupyter 的 Playground,参考 https://github.com/wey-gu/nebula-dgl 中的 Playground 过程:

git clone https://github.com/wey-gu/nebula-dgl.git
cd nebula-dgl
# 运行 Jupyter Notebook
docker run -it --name dgl -p 8888:8888 --network nebula-net \
    -v "$PWD":/home/jovyan/work jupyter/datascience-notebook \
    start-notebook.sh --NotebookApp.token='nebulagraph'

拜访:http://localhost:8888/lab/tree/work?token=nebulagraph

装置依赖,这些依赖在后边的 GNN 例子中也会被用到:

!python3 -m pip install git+https://github.com/vesoft-inc/nebula-python.git@8c328c534413b04ccecfd42e64ce6491e09c6ca8
!python3 -m pip install .

当初,咱们从图中读取一个子图,从 2048 这个点开始摸索两步内的所有边。

import torch
import json
from torch import tensor
from dgl import DGLHeteroGraph, heterograph

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

config = Config()
config.max_connection_pool_size = 2
connection_pool = ConnectionPool()
connection_pool.init([('graphd', 9669)], config)

vertex_id = 2048
client = connection_pool.get_session('root', 'nebula')
r = client.execute_json(
    "USE yelp;"
    f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")

r = json.loads(r)
data = r.get('results', [{}])[0].get('data')
columns = r.get('results', [{}])[0].get('columns')

# create node and nodedata
node_id_map = {} # key: vertex id in NebulaGraph, value: node id in dgl_graph
node_idx = 0
features = [[] for _ in range(32)] + [[]]
for i in range(len(data)):
    for index, node in enumerate(data[i]['meta'][0]):
        nodeid = data[i]['meta'][0][index]['id']
        if nodeid not in node_id_map:
            node_id_map[nodeid] = node_idx
            node_idx += 1
            for f in range(32):
                features[f].append(data[i]['row'][0][index][f"review.f{f}"])
            features[32].append(data[i]['row'][0][index]['review.is_fraud'])

rur_start, rur_end, rsr_start, rsr_end, rtr_start, rtr_end = [], [], [], [], [], []
for i in range(len(data)):
    for edge in data[i]['meta'][1]:
        edge = edge['id']
        if edge['name'] == 'shares_user_with':
            rur_start.append(node_id_map[edge['src']])
            rur_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_rating_with':
            rsr_start.append(node_id_map[edge['src']])
            rsr_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_in_one_month_with':
            rtr_start.append(node_id_map[edge['src']])
            rtr_end.append(node_id_map[edge['dst']])

data_dict = {}
if rur_start:
    data_dict[('review', 'shares_user_with', 'review')] = tensor(rur_start), tensor(rur_end)
if rsr_start:
    data_dict[('review', 'shares_restaurant_rating_with', 'review')] = tensor(rsr_start), tensor(rsr_end)
if rtr_start:
    data_dict[('review', 'shares_restaurant_in_one_month_with', 'review')] = tensor(rtr_start), tensor(rtr_end)
# construct a dgl_graph, ref: https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.html

dgl_graph: DGLHeteroGraph = heterograph(data_dict)

# load node features to dgl_graph
dgl_graph.ndata['label'] = tensor(features[32])

# heterogeneous graph to heterogeneous graph, keep ndata and edata
import dgl
hg = dgl.to_homogeneous(dgl_graph, ndata=['label'])

咱们用下面提到的标签流传实现,利用到咱们的图上。

from abc import abstractmethod
import torch

class BaseLabelPropagation:
    """Base class for label propagation models.
    
    Parameters
    ----------
    adj_matrix: torch.FloatTensor
        Adjacency matrix of the graph.
    """
    def __init__(self, adj_matrix):
        self.norm_adj_matrix = self._normalize(adj_matrix)
        self.n_nodes = adj_matrix.size(0)
        self.one_hot_labels = None 
        self.n_classes = None
        self.labeled_mask = None
        self.predictions = None

    @staticmethod
    @abstractmethod
    def _normalize(adj_matrix):
        raise NotImplementedError("_normalize must be implemented")

    @abstractmethod
    def _propagate(self):
        raise NotImplementedError("_propagate must be implemented")

    def _one_hot_encode(self, labels):
        # Get the number of classes
        classes = torch.unique(labels)
        classes = classes[classes != -1]
        self.n_classes = classes.size(0)

        # One-hot encode labeled data instances and zero rows corresponding to unlabeled instances
        unlabeled_mask = (labels == -1)
        labels = labels.clone()  # defensive copying
        labels[unlabeled_mask] = 0
        self.one_hot_labels = torch.zeros((self.n_nodes, self.n_classes), dtype=torch.float)
        self.one_hot_labels = self.one_hot_labels.scatter(1, labels.unsqueeze(1), 1)
        self.one_hot_labels[unlabeled_mask, 0] = 0

        self.labeled_mask = ~unlabeled_mask

    def fit(self, labels, max_iter, tol):
        """Fits a semi-supervised learning label propagation model.
        
        labels: torch.LongTensor
            Tensor of size n_nodes indicating the class number of each node.
            Unlabeled nodes are denoted with -1.
        max_iter: int
            Maximum number of iterations allowed.
        tol: float
            Convergence tolerance: threshold to consider the system at steady state.
        """
        self._one_hot_encode(labels)

        self.predictions = self.one_hot_labels.clone()
        prev_predictions = torch.zeros((self.n_nodes, self.n_classes), dtype=torch.float)

        for i in range(max_iter):
            # Stop iterations if the system is considered at a steady state
            variation = torch.abs(self.predictions - prev_predictions).sum().item()
            
            if variation < tol:
                print(f"The method stopped after {i} iterations, variation={variation:.4f}.")
                break

            prev_predictions = self.predictions
            self._propagate()

    def predict(self):
        return self.predictions

    def predict_classes(self):
        return self.predictions.max(dim=1).indices

class LabelPropagation(BaseLabelPropagation):
    def __init__(self, adj_matrix):
        super().__init__(adj_matrix)

    @staticmethod
    def _normalize(adj_matrix):
        """Computes D^-1 * W"""
        degs = adj_matrix.sum(dim=1)
        degs[degs == 0] = 1  # avoid division by 0 error
        return adj_matrix / degs[:, None]

    def _propagate(self):
        self.predictions = torch.matmul(self.norm_adj_matrix, self.predictions)

        # Put back already known labels
        self.predictions[self.labeled_mask] = self.one_hot_labels[self.labeled_mask]

    def fit(self, labels, max_iter=1000, tol=1e-3):
        super().fit(labels, max_iter, tol)

class LabelSpreading(BaseLabelPropagation):
    def __init__(self, adj_matrix):
        super().__init__(adj_matrix)
        self.alpha = None

    @staticmethod
    def _normalize(adj_matrix):
        """Computes D^-1/2 * W * D^-1/2"""
        degs = adj_matrix.sum(dim=1)
        norm = torch.pow(degs, -0.5)
        norm[torch.isinf(norm)] = 1
        return adj_matrix * norm[:, None] * norm[None, :]

    def _propagate(self):
        self.predictions = (self.alpha * torch.matmul(self.norm_adj_matrix, self.predictions)
            + (1 - self.alpha) * self.one_hot_labels
        )
    
    def fit(self, labels, max_iter=1000, tol=1e-3, alpha=0.5):
        """
        Parameters
        ----------
        alpha: float
            Clamping factor.
        """
        self.alpha = alpha
        super().fit(labels, max_iter, tol)
        
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

nx_hg = hg.to_networkx()
adj_matrix = nx.adjacency_matrix(nx_hg).toarray()
labels = hg.ndata['label']
# Create input tensors
adj_matrix_t = torch.FloatTensor(adj_matrix)
labels_t = torch.LongTensor(labels)

# Learn with Label Propagation
label_propagation = LabelPropagation(adj_matrix_t)
print("Label Propagation:", end="")
label_propagation.fit(labels_t)
label_propagation_output_labels = label_propagation.predict_classes()

# Learn with Label Spreading
label_spreading = LabelSpreading(adj_matrix_t)
print("Label Spreading:", end="")
label_spreading.fit(labels_t, alpha=0.8)
label_spreading_output_labels = label_spreading.predict_classes()

当初咱们看看染色的流传成果:

color_map = {0: "blue", 1: "green"}
input_labels_colors = [color_map[int(l)] for l in labels]
lprop_labels_colors = [color_map[int(l)] for l in label_propagation_output_labels.numpy()]
lspread_labels_colors = [color_map[int(l)] for l in label_spreading_output_labels.numpy()]

plt.figure(figsize=(14, 6))
ax1 = plt.subplot(1, 4, 1)
ax2 = plt.subplot(1, 4, 2)
ax3 = plt.subplot(1, 4, 3)

ax1.title.set_text("Raw data (2 classes)")
ax2.title.set_text("Label Propagation")
ax3.title.set_text("Label Spreading")

pos = nx.spring_layout(nx_hg)
nx.draw(nx_hg, ax=ax1, pos=pos, node_color=input_labels_colors, node_size=50)
nx.draw(nx_hg, ax=ax2, pos=pos, node_color=lprop_labels_colors, node_size=50)
nx.draw(nx_hg, ax=ax3, pos=pos, node_color=lspread_labels_colors, node_size=50)

# Legend
ax4 = plt.subplot(1, 4, 4)
ax4.axis("off")
legend_colors = ["orange", "blue", "green", "red", "cyan"]
legend_labels = ["unlabeled", "class 0", "class 1", "class 2", "class 3"]
dummy_legend = [ax4.plot([], [], ls='-', c=c)[0] for c in legend_colors]
plt.legend(dummy_legend, legend_labels)

plt.show()

能够看到最初画进去的后果:

能够看到有一些蓝色标签被流传开了。事实上这个例子的成果并不现实(因为这个例子里,绿色的才是重要的标签)。不过这里只是做一个示范,大家能够本人来优化这块内容。

带有图特色的机器学习

在风控畛域开始利用图的思维和能力之前,曾经有很多利用机器学习的分类算法基于历史数据预测高风险行为的办法。这些办法把记录中领域专家认为无关的信息(例如:年龄、学历、支出)作为特色,历史标注信息作为标签去训练危险预测模型。

读到的这里,你是否想到在这些办法的根底之上,把基于图构造的属性也思考进来,以此作为特色去训练的模型可能更无效呢?答案是必定的,曾经有很多论文和工程实践证明这样的模型比未思考图特色的算法更加无效。这些被尝试无效的图结构特征可能是实体的 PageRank 值、Degree 值或者是某一个社区发现算法得出的社区 id。

在生产上,咱们能够定期从图谱中取得实时的全图信息,在图计算平台中剖析运算取得所需特色,通过预约的数据管道,导入机器学习模型中周期取得新的危险提醒,并将局部后果写回图谱不便其余零碎和专家抽取、参考。

带有图特色的机器学习欺诈检测

这里,机器学习的办法我就不演示了,就是常见的分类办法。在此之上,咱们能够在数据中通过图算法取得一些新的属性,这些属性再解决一下作为新的特色。我这里只演示一个社区发现的办法,咱们能够对全图跑一个 Louvain 算法,得出不同节点的社区归属,再把社区的值当做一个分类解决成为数值的特色。

这个例子里咱们还用到了数据 https://github.com/wey-gu/fraud-detection-datagen,以及图计算我的项目 nebula-algorithm 来实现图算法。

首先,咱们部署下 Spark 和 NebulaGraph Algorithm。还是用咱们相熟的一键到位工具 Nebula-UP 搞定部署:

curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

集群起来之后,因为所需配置文件我曾经放在了 Nebula-UP 外部,咱们只须要一行就能够运行算法啦!

cd ~/.nebula-up/nebula-up/spark && ls -l

docker exec -it sparkmaster /spark/bin/spark-submit \
    --master "local" --conf spark.rpc.askTimeout=6000s \
    --class com.vesoft.nebula.algorithm.Main \
    --driver-memory 4g /root/download/nebula-algo.jar \
    -p /root/louvain.conf

而最终的后果就在 sparkmaster 容器内的 /output 里:

# docker exec -it sparkmaster bash
ls -l /output

之后,咱们能够对这个 Louvain 的图特色做一些解决,并开始传统的模型训练了。

图神经网络的办法

因为这些图特色的办法未能充分考虑关联关系,特色工程解决起来异样繁琐、代价低廉。在这个章节咱们就要引入本文的大杀器——DGL,Deep Graph library,https://www.dgl.ai/。我也实现了 Nebula-DGL 作为 NebulaGraph 图数据库和 DGL 之间的桥梁。

近几年技术的倒退,某些基于 GNN 的办法反对了图构造与属性信息的嵌入示意,使得咱们能在不进行图特色抽取、特色工程、专家与工程办法的数据标注的状况下,失去相比于基于传统图特色的机器学习更好的成果。有意思的是,这些办法创造、疾速迭代演进的期间,基于图的深度学习是最热门的机器学习钻研方向之一。

同时,图深度学习的一些办法能够做到 Inductive Learning——模型能够在新的点、边上进行推理。这样,配合图数据库线上的子图查问能力,在线实时的危险预测也变得很简略、可行了。

基于图示意的图神经网络欺诈检测零碎

利用 GNN 的办法中,图数据库并不是必须的,数据的存储能够在其余几种常见的介质之中,然而图库可能最大化助力模型训练、模型更新、线上后果的更新。当咱们把图数据库作为数据的繁多数据起源(single source of truth)时,所有在线、离线、图谱的办法能够很容易被集成起来,从而组合所有办法的劣势与后果,做出更无效的欺诈检测复合零碎。

在这个示例中,咱们将它分为:数据处理 模型训练 构建检测 这三个局部。

数据集

本例中,咱们应用的数据集是 Yelp-Fraud,来自于论文 Enhancing Graph Neural Network-based Fraud Detectors against Camouflaged Fraudsters。

这个图中有一种点,三种关系:

  • 顶点:来自 Yelp 中的餐厅、酒店的评估,有两类属性:

    • 每一个评估中有被标注了的是否是虚伪、欺诈评估的标签
    • 32 个曾经被解决过的数值型属性
  • 边:三类评估之间的关联关系

    • R-U-R:两个评估由同一个用户收回 shares_user_with
    • R-S-R:两个评估是同餐厅同评分(评分能够是 1 到 5)shares_restaurant_rating_with
    • R-T-R:两个评估是同餐厅同提交月份 shares_restaurant_in_one_month_with

在开始之前,咱们假如这个图曾经在咱们的 NebulaGraph 里边了。

# 部署 NebulaGraph
curl -fsSL nebula-up.siwei.io/install.sh | bash

# 拉取这个数据的 Repo
git clone https://github.com/wey-gu/nebulagraph-yelp-frauddetection && cd nebulagraph-yelp-frauddetection

# 装置依赖,执行数据下载生成
python3 -m pip install -r requirements.txt
python3 data_download.py

# 导入到 NebulaGraph
docker run --rm -ti \
 --network=nebula-net \
 -v ${PWD}/yelp_nebulagraph_importer.yaml:/root/importer.yaml \
 -v ${PWD}/data:/root \
 vesoft/nebula-importer:v3.1.0 \
 --config /root/importer.yaml

详情参考:https://github.com/wey-gu/nebulagraph-yelp-frauddetection

数据处理

这部分的工作是将图谱中危险相干的子图的拓扑构造示意、无关的特色(属性)进行工程解决,序列化成为 DGL 的图对象。

DGL 自身反对从点、边列表 edgelist 生成 CSV 文件,或者从 NetworkX 和 SciPy 的序列化稠密邻接矩阵(adjacency matrix)的数据来结构图对象。咱们能够把原始的图数据、图库中的数据全量导出为这些模式。但图库中的数据大多数是实时变动的,要可能间接在 NebulaGraph 子图上做 GNN 训练一般来说是更现实。得益于 Nebula-DGL 这个库,做这件事儿是很天然的。

当初,咱们开始这个数据的导入。在这之前,我先介绍一下 Nebula-DGL。

Nebula-DGL 能够依据给定的映射和转换规则(YAML 格局),将 NebulaGraph 中的顶点、边,和它们的属性依照规定解决成为点、边和其中的标注 Label 与特色 Feature,从而结构为 DGL 的图对象。值得一提的是属性到特色的转换。咱们晓得,特色可能是某一个属性值、一个或多个属性值通过肯定的数学变换,亦或是字符型的属性依照枚举规定输入为数字。

相应的,Nebula-DGL 在规定中,针对这几种状况利用 filter 进行表白:

  • 特色间接选取属性的值:

这个例子里,NebulaGraph 图中 follow 这个边将被抽取,边上的属性 degree 的值将间接被作为名为 degree 的特色。

edge_types:
  - name: follow
    start_vertex_tag: player
    end_vertex_tag: player
    features:
      - name: degree
        properties:
          - name: degree
            type: int
            nullable: False
        filter:
          type: value
  • 特色从属性中通过数学变换

这个例子中,咱们把 serve 边之中的两个属性进行 (end_year - start_year) / 30 的解决,变为 service_time 这样的一个特色。

edge_types:
  - name: serve
    start_vertex_tag: player
    end_vertex_tag: team
    features:
      - name: service_time
        properties:
          - name: start_year
            type: int
            nullable: False
          - name: end_year
            type: int
            nullable: False
        # The variable was mapped by order of properties
        filter:
          type: function
          function: "lambda start_year, end_year: (end_year - start_year) / 30"
  • 枚举属性值为数字特色

这个例子中,咱们把 team 顶点中的 name 属性进行枚举,依据:

vertex_tags:
  - name: team
    features:
      - name: coast
        properties:
          - name: name
            type: str
            nullable: False
        filter:
          # 0 stand for the east coast, 1 stand for the west coast
          type: enumeration
          enumeration:
            Celtics: 0
            Nets: 0
            Nuggets: 1
            Timberwolves: 1
            Thunder: 1
# ... not showing all teams here

能够看到这个转换规则非常简单间接,大家也能够参考 Nebula-DGL 的残缺例子理解全副细节 https://github.com/wey-gu/nebula-dgl/tree/main/example。有了下面数据处理规定的理解后,咱们能够开始解决这个 Yelp 图数据了。

先定义如下规定,这里,咱们把顶点 review 和三种边都对应过去了。同时,review 上的属性也依照本来的值对应了过去:

nebulagraph_yelp_dgl_mapper.yaml 配置如下:

---
# If vertex id is string-typed, remap_vertex_id must be true.
remap_vertex_id: True
space: yelp
# str or int
vertex_id_type: int
vertex_tags:
  - name: review
    label:
      name: is_fraud
      properties:
        - name: is_fraud
          type: int
          nullable: False
      filter:
        type: value
    features:
      - name: f0
        properties:
          - name: f0
            type: float
            nullable: False
        filter:
          type: value
      - name: f1
        properties:
          - name: f1
            type: float
            nullable: False
        filter:
          type: value
# ...
      - name: f31
        properties:
          - name: f31
            type: float
            nullable: False
        filter:
          type: value
edge_types:
  - name: shares_user_with
    start_vertex_tag: review
    end_vertex_tag: review
  - name: shares_restaurant_rating_with
    start_vertex_tag: review
    end_vertex_tag: review
  - name: shares_restaurant_in_one_month_with
    start_vertex_tag: review
    end_vertex_tag: review

装置好 Nebula-DGL 之后,只须要这几行代码就能够将 NebulaGraph 中的这张图结构为 DGL 的 DGLHeteroGraph 图对象:

from nebula_dgl import NebulaLoader


nebula_config = {
    "graph_hosts": [('graphd', 9669),
                ('graphd1', 9669),
                ('graphd2', 9669)
            ],
    "nebula_user": "root",
    "nebula_password": "nebula",
}

# load feature_mapper from yaml file
with open('nebulagraph_yelp_dgl_mapper.yaml', 'r') as f:
    feature_mapper = yaml.safe_load(f)

nebula_loader = NebulaLoader(nebula_config, feature_mapper)
g = nebula_loader.load()

g = g.to('cpu')
device = torch.device('cpu')

模型训练

这里,我用 GraphSAGE 算法的点分类 Node Classification 办法来举例。GraphSAGE 的原始版本是一个演绎学习 Inductive Learning 的算法。演绎学习区别于它的背面 Transductive Learning,能够把新的数据用在齐全旧的图之上习得的模型,这样训练进去的模型能够进行线上增量数据的欺诈检测,而不是须要从新加载为全图训练。

模型训练零碎(右边):

  • 输出:带有欺诈标注的历史交易图谱
  • 输入:一个 GraphSAGE 的 DGL 模型

线上推理零碎(左边):

模型:基于带有欺诈标注的历史交易图谱基于 GraphSAGE 训练

  • 输出:一笔新的交易
  • 输入:这笔交易是否涉嫌欺诈
宰割数据集

机器学习训练的过程须要在已有数据、信息中宰割出用来训练、验证和测试的子集。它们能够是不相交的全数据的真子集,也能够彼此有重叠的数据集。事实上,咱们对数据的标注大多是不充沛的,所以依照标注的比例去宰割数据可能更有意义一些。上面的例子是我依照点上是否标注欺诈为规范去宰割数据集。

这里边有两个中央值得注意:

train_test_split 中的 stratify=g.ndata['is_fraud'] 代表放弃 is_fraud 的值的散布去宰割,合乎咱们后面提到的思维。

咱们宰割的是 idx 索引,这样,能够最终取得三个汇合的索引,供训练、验证和测试时候应用。同时,咱们还把对应汇合 mask 放到图对象 g 里边去了。

# Split the graph into train, validation, and test sets

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']
# label is in g.ndata['is_fraud']

# concatenate all features
features = []
for i in range(32):
    features.append(g.ndata['f' + str(i)])

g.ndata['feat'] = torch.stack(features, dim=1)
g.ndata['label'] = g.ndata['is_fraud']
# numpy array as an index of range n

idx = torch.tensor(np.arange(g.number_of_nodes()), device=device, dtype=torch.int64)

# split based on value distribution of label: the property "is_fraud", which is a binary variable.
X_train_and_val_idx, X_test_idx, y_train_and_val, y_test = train_test_split(idx, g.ndata['is_fraud'], test_size=0.2, random_state=42, stratify=g.ndata['is_fraud'])

# split train and val
X_train_idx, X_val_idx, y_train, y_val = train_test_split(X_train_and_val_idx, y_train_and_val, test_size=0.2, random_state=42, stratify=y_train_and_val)

# list of index to mask
train_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
train_mask[X_train_idx] = True

val_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
val_mask[X_val_idx] = True

test_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
test_mask[X_test_idx] = True

g.ndata['train_mask'] = train_mask
g.ndata['val_mask'] = val_mask
g.ndata['test_mask'] = test_mask

异构图转换为同构图

GraphSAGE 是针对同构图且边无 feature 的算法,而咱们当下的 Yelp 图谱是异构的:一类点、三类边。那么,如何能力用 GraphSAGE 去建模 Yelp 图谱呢?

咱们除了抉择用针对异构图的 Inductive Learning 办法之外,还可想方法把同构图转换成异构图。为了在转换中不失落重要的边类型信息,咱们能够把边类型变成数值。

这里我给了一维的 edge feature,当然二维(3-1 维)示意也是能够的。

# shares_restaurant_in_one_month_with: 1, b"001"
# shares_restaurant_rating_with: 2, b"010"
# shares_user_with: 4, b"100"

其实如果想用 0, 1, 2 这样的散布,转换到同构图之后的 hg.edata['_TYPE'] 也是能够间接拿来用的,详见 https://docs.dgl.ai/en/0.9.x/generated/dgl.to_homogeneous.html 中的例子。

代码如下:

# three types of edges
In [1]: g.etypes
Out[1]:
['shares_restaurant_in_one_month_with',
 'shares_restaurant_rating_with',
 'shares_user_with']

In [2]:
g.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(g.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.int64)
g.edges['shares_restaurant_rating_with'].data['he'] = torch.full((g.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.int64)
g.edges['shares_user_with'].data['he'] = torch.full((g.number_of_edges('shares_user_with'),), 4, dtype=torch.int64)

In [3]: g.edata['he']
Out[3]:
{('review',
  'shares_restaurant_in_one_month_with',
  'review'): tensor([1, 1, 1,  ..., 1, 1, 1]),
 ('review',
  'shares_restaurant_rating_with',
  'review'): tensor([2, 2, 2,  ..., 2, 2, 2]),
 ('review', 'shares_user_with', 'review'): tensor([4, 4, 4,  ..., 4, 4, 4])}

将它转换为同构图,把 he 作为要保留的 edata:

hg = dgl.to_homogeneous(g, edata=['he'], ndata=['feat', 'label', 'train_mask', 'val_mask', 'test_mask'])

默认的 GraphSAGE 实现是没思考 edge feature 的,咱们要批改消息传递的步骤,在后边会波及到这部分的实操。

模型训练代码

DGL 官网在 https://github.com/dmlc/dgl/tree/master/examples/pytorch/graphsage 给出了 GraphSAGE 例子,我在测试时修复了一个小 bug。

因为咱们解决过的同构图带有 edge feature,不能照搬官网的 GraphSAGE 例子代码,咱们有两种办法来解决它:

  1. 能够略微改变一下 SAGEConv 消息传递的局部,以 mean 聚合的形式为例:
  graph.update_all(msg_fn, fn.mean('m', 'neigh'))
+ graph.update_all(fn.copy_e('he', 'm'), fn.mean('m', 'neigh'))
- h_neigh = graph.dstdata['neigh']
+ h_neigh = torch.cat((graph.dstdata['neigh'], graph.dstdata['neigh_e'].reshape(-1, 1)), 1)

这个解决中,除了上边消息传递局部减少 edge feature 之外,还须要留神 feature 维度的解决。

  1. 把边参数作为边权重,以 mean 聚合为例:
- graph.update_all(msg_fn, fn.mean('m', 'neigh'))
+ # consdier datatype with different weight, g.edata['he'] as weight here
+ g.update_all(fn.u_mul_e('h', 'he', 'm'), fn.mean('m', 'h'))

上面,咱们以把边的类型作为权重的形式,mean 作为聚合的状况为例来实操:

咱们来继承并笼罩 SAGEConv,其实只是批改 Message Passing 的局部:

from dgl import function as fn
from dgl.utils import check_eq_shape, expand_as_pair

class SAGEConv(dglnn.SAGEConv):
    def forward(self, graph, feat, edge_weight=None):
        r"""

        Description
        -----------
        Compute GraphSAGE layer.

        Parameters
        ----------
        graph : DGLGraph
            The graph.
        feat : torch.Tensor or pair of torch.Tensor
            If a torch.Tensor is given, it represents the input feature of shape
            :math:`(N, D_{in})`
            where :math:`D_{in}` is size of input feature, :math:`N` is the number of nodes.
            If a pair of torch.Tensor is given, the pair must contain two tensors of shape
            :math:`(N_{in}, D_{in_{src}})` and :math:`(N_{out}, D_{in_{dst}})`.
        edge_weight : torch.Tensor, optional
            Optional tensor on the edge. If given, the convolution will weight
            with regard to the message.

        Returns
        -------
        torch.Tensor
            The output feature of shape :math:`(N_{dst}, D_{out})`
            where :math:`N_{dst}` is the number of destination nodes in the input graph,
            :math:`D_{out}` is the size of the output feature.
        """
        self._compatibility_check()
        with graph.local_scope():
            if isinstance(feat, tuple):
                feat_src = self.feat_drop(feat[0])
                feat_dst = self.feat_drop(feat[1])
            else:
                feat_src = feat_dst = self.feat_drop(feat)
                if graph.is_block:
                    feat_dst = feat_src[:graph.number_of_dst_nodes()]
            msg_fn = fn.copy_src('h', 'm')
            if edge_weight is not None:
                assert edge_weight.shape[0] == graph.number_of_edges()
                graph.edata['_edge_weight'] = edge_weight
                msg_fn = fn.u_mul_e('h', '_edge_weight', 'm')

            h_self = feat_dst

            # Handle the case of graphs without edges
            if graph.number_of_edges() == 0:
                graph.dstdata['neigh'] = torch.zeros(feat_dst.shape[0], self._in_src_feats).to(feat_dst)

            # Determine whether to apply linear transformation before message passing A(XW)
            lin_before_mp = self._in_src_feats > self._out_feats

            # Message Passing
            if self._aggre_type == 'mean':
                graph.srcdata['h'] = self.fc_neigh(feat_src) if lin_before_mp else feat_src
                # graph.update_all(msg_fn, fn.mean('m', 'neigh'))
                #########################################################################
                # consdier datatype with different weight, g.edata['he'] as weight here
                g.update_all(fn.u_mul_e('h', 'he', 'm'), fn.mean('m', 'h'))
                #########################################################################
                h_neigh = graph.dstdata['neigh']
                if not lin_before_mp:
                    h_neigh = self.fc_neigh(h_neigh)
            elif self._aggre_type == 'gcn':
                check_eq_shape(feat)
                graph.srcdata['h'] = self.fc_neigh(feat_src) if lin_before_mp else feat_src
                if isinstance(feat, tuple):  # heterogeneous
                    graph.dstdata['h'] = self.fc_neigh(feat_dst) if lin_before_mp else feat_dst
                else:
                    if graph.is_block:
                        graph.dstdata['h'] = graph.srcdata['h'][:graph.num_dst_nodes()]
                    else:
                        graph.dstdata['h'] = graph.srcdata['h']
                graph.update_all(msg_fn, fn.sum('m', 'neigh'))
                graph.update_all(fn.copy_e('he', 'm'), fn.sum('m', 'neigh'))
                # divide in_degrees
                degs = graph.in_degrees().to(feat_dst)
                h_neigh = (graph.dstdata['neigh'] + graph.dstdata['h']) / (degs.unsqueeze(-1) + 1)
                if not lin_before_mp:
                    h_neigh = self.fc_neigh(h_neigh)
            elif self._aggre_type == 'pool':
                graph.srcdata['h'] = F.relu(self.fc_pool(feat_src))
                graph.update_all(msg_fn, fn.max('m', 'neigh'))
                graph.update_all(fn.copy_e('he', 'm'), fn.max('m', 'neigh'))
                h_neigh = self.fc_neigh(graph.dstdata['neigh'])
            elif self._aggre_type == 'lstm':
                graph.srcdata['h'] = feat_src
                graph.update_all(msg_fn, self._lstm_reducer)
                h_neigh = self.fc_neigh(graph.dstdata['neigh'])
            else:
                raise KeyError('Aggregator type {} not recognized.'.format(self._aggre_type))

            # GraphSAGE GCN does not require fc_self.
            if self._aggre_type == 'gcn':
                rst = h_neigh
            else:
                rst = self.fc_self(h_self) + h_neigh

            # bias term
            if self.bias is not None:
                rst = rst + self.bias

            # activation
            if self.activation is not None:
                rst = self.activation(rst)
            # normalization
            if self.norm is not None:
                rst = self.norm(rst)
            return rst

定义模型:

class SAGE(nn.Module):
    def __init__(self, in_size, hid_size, out_size):
        super().__init__()
        self.layers = nn.ModuleList()
        # three-layer GraphSAGE-mean
        self.layers.append(dglnn.SAGEConv(in_size, hid_size, 'mean'))
        self.layers.append(dglnn.SAGEConv(hid_size, hid_size, 'mean'))
        self.layers.append(dglnn.SAGEConv(hid_size, out_size, 'mean'))
        self.dropout = nn.Dropout(0.5)
        self.hid_size = hid_size
        self.out_size = out_size

    def forward(self, blocks, x):
        h = x
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            h = layer(block, h)
            if l != len(self.layers) - 1:
                h = F.relu(h)
                h = self.dropout(h)
        return h

    def inference(self, g, device, batch_size):
        """Conduct layer-wise inference to get all the node embeddings."""
        feat = g.ndata['feat']
        sampler = MultiLayerFullNeighborSampler(1, prefetch_node_feats=['feat'])
        dataloader = DataLoader(g, torch.arange(g.num_nodes()).to(g.device), sampler, device=device,
                batch_size=batch_size, shuffle=False, drop_last=False,
                num_workers=0)
        buffer_device = torch.device('cpu')
        pin_memory = (buffer_device != device)

        for l, layer in enumerate(self.layers):
            y = torch.empty(g.num_nodes(), self.hid_size if l != len(self.layers) - 1 else self.out_size,
                device=buffer_device, pin_memory=pin_memory)
            feat = feat.to(device)
            for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
                x = feat[input_nodes]
                h = layer(blocks[0], x) # len(blocks) = 1
                if l != len(self.layers) - 1:
                    h = F.relu(h)
                    h = self.dropout(h)
                # by design, our output nodes are contiguous
                y[output_nodes[0]:output_nodes[-1]+1] = h.to(buffer_device)
            feat = y
        return y

定义训练、推理的函数:

def evaluate(model, graph, dataloader):
    model.eval()
    ys = []
    y_hats = []
    for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
        with torch.no_grad():
            x = blocks[0].srcdata['feat']
            ys.append(blocks[-1].dstdata['label'])
            y_hats.append(model(blocks, x))
    return MF.accuracy(torch.cat(y_hats), torch.cat(ys))

def layerwise_infer(device, graph, nid, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        pred = pred[nid]
        label = graph.ndata['label'][nid].to(pred.device)
        return MF.accuracy(pred, label)

def train(device, g, model, train_idx, val_idx):
    # create sampler & dataloader
    sampler = NeighborSampler([10, 10, 10],  # fanout for [layer-0, layer-1, layer-2]
                              prefetch_node_feats=['feat'],
                              prefetch_labels=['label'])
    use_uva = False
    train_dataloader = DataLoader(g, train_idx, sampler, device=device,
                                  batch_size=1024, shuffle=True,
                                  drop_last=False, num_workers=0,
                                  use_uva=use_uva)

    val_dataloader = DataLoader(g, val_idx, sampler, device=device,
                                batch_size=1024, shuffle=True,
                                drop_last=False, num_workers=0,
                                use_uva=use_uva)

    opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-4)
    
    for epoch in range(10):
        model.train()
        total_loss = 0
        for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
            x = blocks[0].srcdata['feat']
            y = blocks[-1].dstdata['label']
            y_hat = model(blocks, x)
            loss = F.cross_entropy(y_hat, y)
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss += loss.item()
        acc = evaluate(model, g, val_dataloader)
        print("Epoch {:05d} | Loss {:.4f} | Accuracy {:.4f}"
              .format(epoch, total_loss / (it+1), acc.item()))

从 NebulaGraph 中加载图到 DGL,失去的是一个异构图(一类点、三类边):

from nebula_dgl import NebulaLoader

nebula_config = {
    "graph_hosts": [('graphd', 9669),
                ('graphd1', 9669),
                ('graphd2', 9669)
            ],
    "nebula_user": "root",
    "nebula_password": "nebula",
}

with open('nebulagraph_yelp_dgl_mapper.yaml', 'r') as f:
     feature_mapper = yaml.safe_load(f)

nebula_loader = NebulaLoader(nebula_config, feature_mapper)

g = nebula_loader.load() # This will take you some time

# 作为富人,咱们用 CPU
g = g.to('cpu')
device = torch.device('cpu')

分出训练、验证、测试集,再转换成同构图:

# Split the graph into train, validation and test sets

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']
# label is in g.ndata['is_fraud']

# concatenate all features
features = []
for i in range(32):
    features.append(g.ndata['f'+str(i)])

g.ndata['feat'] = torch.stack(features, dim=1)
g.ndata['label'] = g.ndata['is_fraud']
# numpy array as index of range n

idx = torch.tensor(np.arange(g.number_of_nodes()), device=device, dtype=torch.int64)
# features.append(idx)
# concatenate one dim with index of node
# feature_and_idx = torch.stack(features, dim=1)

# split based on value distribution of label: the property "is_fraud", which is a binary variable.
X_train_and_val_idx, X_test_idx, y_train_and_val, y_test = train_test_split(idx, g.ndata['is_fraud'], test_size=0.2, random_state=42, stratify=g.ndata['is_fraud'])

# split train and val
X_train_idx, X_val_idx, y_train, y_val = train_test_split(X_train_and_val_idx, y_train_and_val, test_size=0.2, random_state=42, stratify=y_train_and_val)

# list of index to mask
train_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
train_mask[X_train_idx] = True

val_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
val_mask[X_val_idx] = True

test_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
test_mask[X_test_idx] = True

g.ndata['train_mask'] = train_mask
g.ndata['val_mask'] = val_mask
g.ndata['test_mask'] = test_mask

# shares_restaurant_in_one_month_with: 1, b"001"
# shares_restaurant_rating_with: 2, b"010"
# shares_user_with: 4, b"100"
# set edata of shares_restaurant_in_one_month_with to n of 1 tensor array
g.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(g.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.float32)
g.edges['shares_restaurant_rating_with'].data['he'] = torch.full((g.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.float32)
g.edges['shares_user_with'].data['he'] = torch.full((g.number_of_edges('shares_user_with'),), 4, dtype=torch.float32)

# heterogeneous graph to heterogeneous graph, keep ndata and edata
hg = dgl.to_homogeneous(g, edata=['he'], ndata=['feat', 'label', 'train_mask', 'val_mask', 'test_mask'])

训练、测试模型:

# create GraphSAGE model
in_size = hg.ndata['feat'].shape[1]
out_size = 2
model = SAGE(in_size, 256, out_size).to(device)

# model training
print('Training...')
train(device, hg, model, X_train_idx, X_val_idx)

# test the model
print('Testing...')

acc = layerwise_infer(device, hg, X_test_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

# 运行后果
# Test Accuracy 0.9996

有了模型之后,咱们能够把它序列化成文件,在须要的时候,只须要把模型的模式和这个序列化文件再加载成一个 pyTorch 就能够用它进行推理了。

# save model
torch.save(model.state_dict(), "fraud_d.model")

# load model
device = torch.device('cpu')
model = SAGE(32, 256, 2).to(device)
model.load_state_dict(torch.load("fraud_d.model"))

最初,咱们如何把模型放到咱们的线上欺诈检测零碎里呢?

推理接口

前边提到过,GraphSAGE 是最简略的反对 Inductive Learning 的模型。但下面咱们的训练推理用的图和测试是同一张,而 Inductive Learning 最佳态是推理所用数据为全新点、边。为了做到 Inductive Learning,咱们只须要把训练和测试分成两个无交加的子图来做训练和最终测试:

# Inductive Learning, our test dataset are new nodes and new edges
hg_train = hg.subgraph(torch.cat([X_train_idx, X_val_idx]))

# model training
print('Training...')
train(device, hg_train, model, torch.arange(X_train_idx.shape[0]), torch.arange(X_train_idx.shape[0], hg_train.num_nodes()))

# test the model
print('Testing...')

hg_test = hg.subgraph(torch.cat([X_test_idx]))

sg_X_test_idx = torch.arange(hg_test.num_nodes())

acc = layerwise_infer(device, hg_test, sg_X_test_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

# 运行后果
# Test Accuracy 0.9990

能够看到,咱们下面的代码里,测试所用到的图和训练的图是齐全不同的两组数据,这使得咱们的线上零碎能够遇到之前齐全没有碰到过的数据。只有把新的交易申请数据写进 NebulaGraph,再从这个点获取一个线上零碎能够返回的小子图,就能够把它作为模型推理的输出,取得子图的标签了!

新的交易申请

还记得咱们前边画的线上推理零碎的流程图么?


      ┌─────────────────────┐                          ┌─────────────────┐      
      │                     │                          │                 │
─────▶│ Transaction Record  ├──────2. Fraud Risk ─────▶│  Inference API  │◀────┐
      │                     │◀────Prediction with ─────┤                 │     │
      │                     │        Sub Graph         │                 │     │
      └─────────────────────┘                          └─────────────────┘     │
           │           ▲                                        │              │
           │           │                                        │              │
       0. Insert   1. Get New                              3.req: Node         │
         Record    Record Sub                            Classification        │
           │         Graph                                      │              │
           ▼           │                                        │              │
┌──────────────────────┴─────────────────┐ ┌────────────────────┘      3.resp: │
│┌──────────────────────────────────────┐│ │                          Predicted│
││   Graph of Historical Transactions   ││ │                             Risk  │
│└──────────────────────────────────────┘│ │                                   │
│                   .─.              .   │ │                                   │
│                  ()◀───────────()  │ │                                   │
│                   `─' '   │ │      ┌──────────────────────┐     │
│  .       .─.       ╲             ◁     │ │      │ GNN Model Λ          │     │
│ ()◀────()       ╲           ╱      │ │  ┌───┴─┐        ╱ ╲      ┌──┴──┐  │
│  '`─'         ╲       . ╱       │ │  ├─────┤       ╱   ╲     ├─────┤  │
│  ╲       ◀            ╲     ( )        │ └─▶├─────┼─────▶▕     ─────├─────┤──┘
│   ╲  .  ╱              ◁     '         │    ├─────┤       ╲   ╱     ├─────┤   
│    ◀( )╱               .─.         .─. │    └───┬─┘        ╲ ╱      └──┬──┘   
│      '                ()◀──────()│        │           V          │      
│                        `─'`─' │        └──────────────────────┘      
└────────────────────────────────────────┘        

当初,咱们假如这个新的交易申请曾经发动,这条交易记录曾经被更新在图谱里,咱们轻易取一个点作为这样的申请吧。

MATCH (n:review) RETURN n LIMIT 1
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| n                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| (2048 :review{f0: 0.0, f1: 0.08034700155258179, f10: 0.3952670097351074, f11: 0.18671999871730804, f12: 0.2836120128631592, f13: 0.2843089997768402, f14: 0.38148200511932373, f15: 0.3816460072994232, f16: 0.9999740123748779, f17: 0.6430919766426086, f18: 0.9999740123748779, f19: 0.5051100254058838, f2: 0.12382200360298157, f20: 0.4940490126609802, f21: 0.7766339778900146, f22: 0.7705119848251343, f23: 0.9480599761009216, f24: 0.4032529890537262, f25: 0.12437800318002701, f26: 0.3184080123901367, f27: 0.5223879814147949, f28: 0.4278610050678253, f29: 0.343284010887146, f3: 0.42868199944496155, f30: 0.37313398718833923, f31: 0.328357994556427, f4: 0.9999849796295166, f5: 0.9999849796295166, f6: 0.9999849796295166, f7: 0.4850809872150421, f8: 0.454602986574173, f9: 0.8863419890403748, is_fraud: 0}) |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

好,它是 2048 这个点。

它的下一步是 1. Get New Record Subgraph 咱们来获取它的子图:

GET SUBGRAPH WITH PROP FROM 2048 YIELD VERTICES AS nodes, EDGES AS relationships;

能够看到返回的后果其实还是很多的,btw 在 NebulaGraph 中这个子图后果返回获取是在 10 ms 左右。假若,这里咱们应用图可视化摸索工具 NebulaGraph Studio 或者 Explorer,能够把后果渲染进去(可视化展现的 Query 能够去掉 WITH PROP,能够给浏览器省点内存),后果就更容易让人脑了解了:

当初,咱们就来实现这一步的代码吧。它的输出是点的 id:vertex_id,输入是一个 dgl_graph,用来传给推理接口。

# get SUBGRAPH of one node

import json
from torch import tensor
from dgl import DGLHeteroGraph, heterograph

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

config = Config()
config.max_connection_pool_size = 2
connection_pool = ConnectionPool()
connection_pool.init([('graphd', 9669)], config)

vertex_id = 2048
client = connection_pool.get_session('root', 'nebula')
r = client.execute_json(
    "USE yelp;"
    f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")

r = json.loads(r)
data = r.get('results', [{}])[0].get('data')

这里我用到了 nebula-python,这个 NebulaGraph 的 Python SDK/Client。通过 execute_json 执行取得了这个交易的子图。

下一步,咱们须要把它结构成一个 dgl_graph:

# create node and nodedata
node_id_map = {} # key: vertex id in NebulaGraph, value: node id in dgl_graph
node_idx = 0
features = [[] for _ in range(32)] + [[]]
for i in range(len(data)):
    for index, node in enumerate(data[i]['meta'][0]):
        nodeid = data[i]['meta'][0][index]['id']
        if nodeid not in node_id_map:
            node_id_map[nodeid] = node_idx
            node_idx += 1
            for f in range(32):
                features[f].append(data[i]['row'][0][index][f"review.f{f}"])
            features[32].append(data[i]['row'][0][index]['review.is_fraud'])


"""
- R-U-R:两个评估由同一个用户收回 shares_user_with
- R-S-R:两个评估是同餐厅同评分(评分能够是 1 到 5)shares_restaurant_rating_with
- R-T-R:两个评估是同餐厅同提交月份 shares_restaurant_in_one_month_with
"""
rur_start, rur_end, rsr_start, rsr_end, rtr_start, rtr_end = [], [], [], [], [], []
for i in range(len(data)):
    for edge in data[i]['meta'][1]:
        edge = edge['id']
        if edge['name'] == 'shares_user_with':
            rur_start.append(node_id_map[edge['src']])
            rur_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_rating_with':
            rsr_start.append(node_id_map[edge['src']])
            rsr_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_in_one_month_with':
            rtr_start.append(node_id_map[edge['src']])
            rtr_end.append(node_id_map[edge['dst']])

data_dict = {}
if rur_start:
    data_dict[('review', 'shares_user_with', 'review')] = tensor(rur_start), tensor(rur_end)
if rsr_start:
    data_dict[('review', 'shares_restaurant_rating_with', 'review')] = tensor(rsr_start), tensor(rsr_end)
if rtr_start:
    data_dict[('review', 'shares_restaurant_in_one_month_with', 'review')] = tensor(rtr_start), tensor(rtr_end)

# construct a dgl_graph
dgl_graph: DGLHeteroGraph = heterograph(data_dict)

实际上我就是依照 DGL 文档 https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.html 中的形式去结构 data_dict。再用 heterograph() 就把后果转换为想要的 dgl_graph 了,其中 node_id_map 是 NebulaGraph 之中 Vertex_ID 到这个对象中 node_id 的字典。

最初,咱们再把 node feature 也加载进去。

# load node features to dgl_graph
for i in range(32):
    dgl_graph.ndata[f"f{i}"] = tensor(features[i])
dgl_graph.ndata['label'] = tensor(features[32])

在开始推理之前,咱们还须要把它转换成同构图,和后面齐全一样:

import torch


# to homogeneous graph
features = []
for i in range(32):
    features.append(dgl_graph.ndata[f"f{i}"])

dgl_graph.ndata['feat'] = torch.stack(features, dim=1)

dgl_graph.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(dgl_graph.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.float32)
dgl_graph.edges['shares_restaurant_rating_with'].data['he'] = torch.full((dgl_graph.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.float32)
dgl_graph.edges['shares_user_with'].data['he'] = torch.full((dgl_graph.number_of_edges('shares_user_with'),), 4, dtype=torch.float32)


# heterogeneous graph to heterogeneous graph, keep ndata and edata
import dgl
hg = dgl.to_homogeneous(dgl_graph, edata=['he'], ndata=['feat', 'label'])

最初,咱们的推理接口就是:

def do_inference(device, graph, node_idx, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        return pred[node_idx]

咱们能够调用一下试试推理咱们这个新的点:

node_idx = node_id_map[vertex_id]
batch_size = 4096

result = do_inference(device, hg, node_idx, model, batch_size)

当然,咱们也能在这个小子图上计算它的正确率:

def test_inference(device, graph, nid, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        pred = pred[nid]
        label = graph.ndata['label'][nid].to(pred.device)
        return MF.accuracy(pred, label)

node_idx = torch.tensor(list(node_id_map.values()))
acc = test_inference(device, hg, node_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

输入后果:

In [307]: def test_inference(device, graph, nid, model, batch_size):
     ...:     model.eval()
     ...:     with torch.no_grad():
     ...:         pred = model.inference(graph, device, batch_size) # pred in buffer
     ...: _device
     ...:         pred = pred[nid]
     ...:         label = graph.ndata['label'][nid].to(pred.device)
     ...:         return MF.accuracy(pred, label)
     ...:
     ...: node_idx = torch.tensor(list(node_id_map.values()))
     ...: acc = test_inference(device, hg, node_idx, model, batch_size=4096)
     ...: print("Test Accuracy {:.4f}".format(acc.item()))
     ...:
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00, 130.31it/s]
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00, 152.29it/s]
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00, 173.55it/s]
Test Accuracy 0.9688

这个示例我的项目的代码在:github.com/wey-gu/NebulaGraph-Fraud-Detection-GNN,如有问题欢送留言、issue 一起交换。

AWS 上生产级别落地参考解决方案

前文的实例都是在最小的试验环境中可操作、可复现的计划。本节针对生产落地进行简短的探讨,并给出了 AWS 上的示例参考架构。

注:能够从这个链接理解 NebulaGraph on AWS:https://aws.amazon.com/quickstart/architecture/vesoft-nebula-graph/

基于图谱与专家图模式匹配办法

本办法实质是一个基于 NebulaGraph 的图谱 OLTP 利用,举荐的生产落地计划是基于 NebulaGraph on AWS。

如下图所示,NebulaGraph on AWS 在 AWS 的基础设施之上,由 NebulaGraph 次要反对团队(Vesoft Inc.)提供了开箱即用的生产级别部署架构,与企业级图数据库 NebulaGraph Core Enterprise 及其配套的丰盛周边工具。

解决方案如图所示:

基于图计算的办法

下面咱们举例了基于图算法的裁减欺诈危险标注实际,并用了单机的计划进行 demo。在生产环境落地时,比方 AWS 上,除了 AWS 上的 NebulaGraph 内核集群之外,咱们还须要 NebulaGraph Algorithm,后者能够运行在 AWS EMR Spark 之上,从 NebulaGraph 中抽取全图数据,并在 Spark 集群中分布式高效进行图算法。

解决方案如图所示:

图神经网络的办法

此外,前边咱们利用 NebulaGraph、DGL、Nebula-DGL 的基于 GNN 的实时风控我的项目在 AWS 生产中能够:

  • 训练、预研应用 AWS SageMaker 和 SageMaker Notebook
  • 线上推理应用 AWS SageMaker Inference。其中,针对金融风控等流量按不同时段出现较大浮动的状况,能够思考应用 AWS SageMaker Inference Serverless,借助其 scale to zero 同时和无线扩容的能力,做到极致的算力的按需付费

解决方案如图所示:

总结

总结起来,欺诈检测的办法有:

  • 在一个交易历史、风控的图谱上,通过图模式查问间接取得危险提醒
  • 定期利用图算法裁减危险标注,写回图库
  • 定期计算图谱中的图特色,和其余特色一起用传统机器学习办法离线预测危险
  • 将图谱中的属性解决成为点、边特色,用图神经网络办法离线预测危险,局部能够 Inductive Learning 的办法联合图库能够实现在线危险预测

延长浏览

  • DGL 内部数据导入文档:https://docs.dgl.ai/guide/graph-external.html
  • 如何将异质图转换为带数据的同质图:https://discuss.dgl.ai/t/how-to-convert-from-a-heterogeneous-graph-to-a-homogeneous-graph-with-data/2764
  • https://docs.dgl.ai/en/latest/guide/graph-heterogeneous.html?highlight=to_homogeneous#converting-heterogeneous-graphs-to-homogeneous-graphs
  • DGL FAQ 第 13 个问题:https://discuss.dgl.ai/t/frequently-asked-questions-faq/1681
  • https://discuss.dgl.ai/t/using-node-and-edge-features-in-message-passing/762

谢谢你读完本文 (///▽///)

要来近距离体验一把图数据库吗?当初能够用用 NebulaGraph Cloud 来搭建本人的图数据系统哟,快来节俭大量的部署安装时间来搞定业务吧~ NebulaGraph 阿里云计算巢现 30 天收费应用中,点击链接来用用图数据库吧~

想看源码的小伙伴能够返回 GitHub 浏览、应用、(^з^)-☆ star 它 -> GitHub;和其余的 NebulaGraph 用户一起交换图数据库技术和利用技能,留下「你的名片」一起游玩呢~

退出移动版