关于数据库:Nebula-Graph-源码解读系列|客户端的通信秘密fbthrift

3次阅读

共计 6770 个字符,预计需要花费 17 分钟才能阅读完成。

概述

Nebula Clients 给用户提供了多种编程语言的 API 用于和 Nebula Graph 交互,并且对服务端返回的数据结构进行了从新封装,便于用户应用。

目前 Nebula Clients 反对的语言有 C++、Java、Python、Golang 和 Rust。

通信框架

Nebula Clients 应用了 fbthrift https://github.com/facebook/fbthrift 作为服务端和客户端之间的 RPC 通信框架,实现了跨语言的交互。

fbthrift 提供了三方面的性能:

  1. 生成代码:fbthrift 可将不同语言序列化成数据结构
  2. 序列化:将生成的数据结构序列化
  3. 通信交互:在客户端、服务端之间传输音讯,收到不同语言的客户端的申请时,调用相应的服务端函数

例子

这里以 Golang 客户端为例,展现 fbthrift 在 Nebula Graph 中的利用。

  1. Vertex 构造在服务端的定义:

    struct Vertex {
     Value vid;
     std::vector<Tag> tags;
    
     Vertex() = default;};
  2. 首先, 在 src/interface/common.thrift 中定义一些数据结构:
struct Tag {
        1: binary name,
        // List of <prop_name, prop_value>
        2: map<binary, Value> (cpp.template = "std::unordered_map") props,
} (cpp.type = "nebula::Tag")

struct Vertex {
        1: Value     vid,
        2: list<Tag> tags,
} (cpp.type = "nebula::Vertex")

在这里咱们定义了一个 Vertex 的构造,其中 (cpp.type = "nebula::Vertex") 标注出了这个构造对应了服务端的 nebula::Vertex

  1. fbthrift 会主动为咱们生成 Golang 的数据结构:

    // Attributes:
    //  - Vid
    //  - Tags
    type Vertex struct {
     Vid *Value `thrift:"vid,1" db:"vid" json:"vid"`
     Tags []*Tag `thrift:"tags,2" db:"tags" json:"tags"`}
    
    func NewVertex() *Vertex {return &Vertex{}
    }
    
    ...
    
    func (p *Vertex) Read(iprot thrift.Protocol) error { // 反序列化
     ...
    }
    
    func (p *Vertex) Write(oprot thrift.Protocol) error { // 序列化
     ...
    }
  2. MATCH (v:Person) WHERE id(v) == "ABC" RETURN v 这条语句中:客户端向服务端申请了一个顶点 (nebula::Vertex),服务端找到这个顶点后会进行 序列化 ,通过 RPC 通信框架的 transport 发送到客户端,在客户端收到这份数据时,会进行 反序列化,生成对应客户端中定义的数据结构(type Vertex struct)。

客户端模块

在这个章节会以 nebula-go 为例,介绍客户端的各个模块和其次要接口。

  1. 配置类 Configs,提供全局的配置选项。

    type PoolConfig struct {
     // 设置超时工夫,0 代表不超时,单位 ms。默认是 0
     TimeOut time.Duration
     // 每个连贯最大闲暇工夫,当连贯超过该工夫没有被应用将会被断开和删除,0 示意永恒 idle,连贯不会敞开。默认是 0
     IdleTime time.Duration
     // max_connection_pool_size: 设置最大连接池连贯数量,默认 10
     MaxConnPoolSize int
     // 最小闲暇连接数,默认 0
     MinConnPoolSize int
    }
  2. 客户端会话 Session,提供用户间接调用的接口。

    // 治理 Session 特有的信息
    type Session struct {
     // 用于执行命令的时候的身份校验或者音讯重试
     sessionID  int64
     // 以后持有的连贯
     connection *connection
     // 以后应用的连接池
     connPool   *ConnectionPool
     // 日志工具
     log        Logger
     // 用于保留以后 Session 所用的时区
     timezoneInfo
    }
  3. 接口定义有以下

     // 执行 nGQL,返回的数据类型为 ResultSet,该接口是非线程平安的。func (session *Session) Execute(stmt string) (*ResultSet, error) {...}
     // 从新为以后 Session 从连接池中获取连贯
     func (session *Session) reConnect() error {...}
     // 做 signout,开释 Session ID,偿还 connection 到 pool
     func (session *Session) Release() {
  4. 连接池 ConnectionPool,治理所有的连贯,次要接口有以下

    // 创立新的连接池, 并用输出的服务地址实现初始化
    func NewConnectionPool(addresses []HostAddress, conf PoolConfig, log Logger) (*ConnectionPool, error) {...}
    // 验证并获取 Session 实例
    func (pool *ConnectionPool) GetSession(username, password string) (*Session, error) {...}
  5. 连贯 Connection,封装 thrift 的网络,提供以下接口

    // 和指定的 ip 和端口的建设连贯
    func (cn *connection) open(hostAddress HostAddress, timeout time.Duration) error {...}
    // 验证用户名和明码
    func (cn *connection) authenticate(username, password string) (*graph.AuthResponse, error) {
    // 执行 query
    func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {...}
    // 通过 SessionId 为 0 发送 "YIELD 1" 来判断连贯是否是可用的
    func (cn *connection) ping() bool {...}
    // 向 graphd 开释 sessionId
    func (cn *connection) signOut(sessionID int64) error {...}
    // 断开连接
    func (cn *connection) close() {...}
  6. 负载平衡 LoadBalance,在连接池外面应用该模块

    • 策略:轮询策略

模块交互解析

  1. 连接池

    • 初始化:

      • 在应用时用户须要先创立并初始化一个 连接池 ConnectionPool,连接池会在初始化时会对用户指定的 Nebula 服务所在地址建设 连贯 Connection,如果在用集群部署形式部署了多个 Graph 服务,连接池会采纳 轮询的策略 来均衡负载,对每个地址建设近乎等量的连贯。
    • 治理连贯:

      • 连接池内保护了两个队列,闲暇连贯队列 idleConnectionQueue 和 应用中的连贯队列 idleConnectionQueue,连接池会定期检测过期闲暇的连贯并将其敞开。这两个队列在增删元素的时候会通过 读写锁 来确保多线程执行的正确性。
      • 当 Session 向连接池申请连贯时,会查看闲暇连贯队列中是否有可用的连贯,如果有则间接返回给 Session 供用户应用;如果没有可用连贯并且以后的总连接数没有超过配置中限定的最大连接数,则新建一个连贯给 Session;如果曾经达到了最大连接数的限度,返回谬误。
    • 个别只有在程序退出时才须要敞开连接池, 在敞开时池中所有的连贯都会被断开。
  2. 客户端会话

    • 客户端会话 Session 通过连接池生成,用户须要提供用户明码进行校验,在校验胜利后用户会取得一个 Session 实例,并通过 Session 中的连贯与服务端进行通信。最罕用的接口是 execute(),如果在执行时产生谬误,客户端会查看谬误的类型,如果是网络起因则会 主动重连 并尝试再次执行语句。
    • 须要留神,一个 Session 不反对被多个线程同时应用,正确的形式是用多个线程申请多个 Session,每个线程应用一个 Session。
    • Session 被开释时,其持有的连贯会被放回到连接池的 闲暇连贯队列 中,以便于之后被其余 Session 复用。
  3. 连贯

    • 每个连贯实例都是等价的,能够被任意 Session 持有,这样设计的目标是这些连贯能够被不同的 Session 复用,缩小重复开关 Transport 的开销。
    • 连贯会将客户端的申请发送到服务端并将其后果返回给 Session。
  4. 用户应用示例

    // Initialize connection pool
    pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
    if err != nil {log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
    }
    // Close all connections in the pool when program exits
    defer pool.Close()
    
    // Create session
    session, err := pool.GetSession(username, password)
    if err != nil {
     log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
         username, password, err.Error()))
    }
    // Release session and return connection back to connection pool when program exits
    defer session.Release()
    
    // Excute a query
    resultSet, err := session.Execute(query)
    if err != nil {fmt.Print(err.Error())
    }

返回数据结构

客户端对局部简单的服务端返回的查问后果进行了封装并增加了接口,以便于用户应用。

查问后果根本类型 封装后的类型
Null
Bool
Int64
Double
String
Time TimeWrapper
Date
DateTime DateTimeWrapper
List
Set
Map
Vertex Node
Edge Relationship
Path PathWrraper
DateSet ResultSet
Record(用于 ResultSet 的行操作)

对于 nebula::Value,在客户端会被包装成 ValueWrapper,并通过接口转换成其余构造。(i.g. node = ValueWrapper.asNode())

数据结构的解析

对于语句 MATCH p= (v:player{name:"Tim Duncan"})-[]->(v2) RETURN p,返回后果为:

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| p                                                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| <("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"})<-[:teammate@0 {end_year: 2016, start_year: 2002}]-("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 11550/12009 us)

咱们能够看到返回的后果蕴含了一行,类型是一条门路. 此时如果须要获得门路起点 (v2) 的属性,能够通过如下操作实现:

// Excute a query
resultSet, _ := session.Execute("MATCH p= (v:player{name:"\"Tim Duncan"\"})-[]->(v2) RETURN p")

// 获取后果的第一行, 第一行的 index 为 0
record, err := resultSet.GetRowValuesByIndex(0)
if err != nil {t.Fatalf(err.Error())
}

// 从第一行中取第一列那个 cell 的值
// 此时 valInCol0 的类型为 ValueWrapper 
valInCol0, err := record.GetValueByIndex(0)

// 将 ValueWrapper 转化成 PathWrapper 对象
pathWrap, err = valInCol0.AsPath()

// 通过 PathWrapper 的 GetEndNode() 接口间接失去起点
node, err = pathWrap.GetEndNode()

// 通过 node 的 Properties() 失去所有属性
// props 的类型为 map[string]*ValueWrapper
props, err = node.Properties()

客户端地址

各语言客户端 GitHub 地址:

  • https://github.com/vesoft-inc/nebula-cpp
  • https://github.com/vesoft-inc/nebula-java
  • https://github.com/vesoft-inc/nebula-python
  • https://github.com/vesoft-inc/nebula-go
  • https://github.com/vesoft-inc/nebula-rust

举荐浏览

  • Nebula Graph 源码解读系列 | Vol.00 序言
  • Nebula Graph 源码解读系列 | Vol.01 Nebula Graph Overview
  • Nebula Graph 源码解读系列 |Vol.02 详解 Validator
  • Nebula Graph 源码解读系列 |Vol.03 Planner 的实现
  • Nebula Graph 源码解读系列 |Vol.04 基于 RBO 的 Optimizer 实现
  • Nebula Graph 源码解读系列 |Vol.05 Scheduler 和 Executor 两兄弟
  • Nebula Graph 源码解读 |Vol.06 MATCH 中变长 Pattern 的实现

《开源分布式图数据库 Nebula Graph 齐全指南》,又名:Nebula 小书,外面具体记录了图数据库以及图数据库 Nebula Graph 的知识点以及具体的用法,浏览传送门:https://docs.nebula-graph.com.cn/site/pdf/NebulaGraph-book.pdf

交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~

正文完
 0