关于flink:Nebula-Flink-Connector-的原理和实践

59次阅读

共计 13398 个字符,预计需要花费 34 分钟才能阅读完成。

摘要:本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采纳相似 Flink 提供的 Flink Connector 模式,反对 Flink 读写分布式图数据库 Nebula Graph。

文章首发 Nebula Graph 官网博客:https://nebula-graph.com.cn/posts/nebula-flink-connector/

在关系网络分析、关系建模、实时举荐等场景中利用图数据库作为后盾数据撑持已绝对遍及,且局部利用场景对图数据的实时性要求较高,如举荐零碎、搜索引擎。为了晋升数据的实时性,业界广泛应用流式计算对更新的数据进行增量实时处理。为了反对对图数据的流式计算,Nebula Graph 团队开发了 Nebula Flink Connector,反对利用 Flink 进行 Nebula Graph 图数据的流式解决和计算。

Flink 是新一代流批对立的计算引擎,它从不同的第三方存储引擎中读取数据,并进行解决,再写入另外的存储引擎中。Flink Connector 的作用就相当于一个连接器,连贯 Flink 计算引擎跟外界存储系统。

与外界进行数据交换时,Flink 反对以下 4 种形式:

  • Flink 源码外部预约义 Source 和 Sink 的 API;
  • Flink 外部提供了 Bundled Connectors,如 JDBC Connector。
  • Apache Bahir 我的项目中提供连接器
    Apache Bahir 最后是从 Apache Spark 中独立进去的我的项目,以提供不限于 Spark 相干的扩大 / 插件、连接器和其余可插入组件的实现。
  • 通过异步 I/O 形式。

流计算中常常须要与内部存储系统交互,比方须要关联 MySQL 中的某个表。一般来说,如果用同步 I/O 的形式,会造成零碎中呈现大的等待时间,影响吞吐和提早。异步 I/O 则能够并发解决多个申请,进步吞吐,缩小提早。

本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采纳相似 Flink 提供的 Flink Connector 模式,反对 Flink 读写分布式图数据库 Nebula Graph。

一、Connector Source

Flink 作为一款流式计算框架,它可解决有界数据,也可解决无界数据。所谓无界,即源源不断的数据,不会有终止,实时流解决所解决的数据便是无界数据;批处理的数据,即有界数据。而 Source 便是 Flink 解决数据的数据起源。

Nebula Flink Connector 中的 Source 便是图数据库 Nebula Graph。Flink 提供了丰盛的 Connector 组件容许用户自定义数据源来连贯内部数据存储系统。

1.1 Source 简介

Flink 的 Source 次要负责内部数据源的接入,Flink 的 Source 能力次要是通过 read 相干的 API 和 addSource 办法这 2 种形式来实现数据源的读取,应用 addSource 办法对接内部数据源时,能够应用 Flink Bundled Connector,也能够自定义 Source。

Flink Source 的几种应用形式如下:

本章次要介绍如何通过自定义 Source 形式实现 Nebula Graph Source。

1.2 自定义 Source

在 Flink 中能够应用 StreamExecutionEnvironment.addSource(sourceFunction)ExecutionEnvironment.createInput(inputFormat) 两种形式来为你的程序增加数据起源。

Flink 曾经提供多个内置的 source functions,开发者能够通过继承 RichSourceFunction 来自定义非并行的 source,通过继承 RichParallelSourceFunction 来自定义并行的 SourceRichSourceFunction 和 RichParallelSourceFunction 是 SourceFunction 和 RichFunction 个性的联合。其中 SourceFunction 负责数据的生成,RichFunction 负责资源的治理。当然,也能够只实现 SourceFunction 接口来定义最简略的只具备获取数据性能的 dataSource

通常自定义一个欠缺的 Source 节点是通过实现 RichSourceFunction 类来实现的,该类兼具 RichFunctionSourceFunction 的能力,因而自定义 Flink 的 Nebula Graph Source 性能咱们须要实现 RichSourceFunction 中提供的办法。

1.3 自定义 Nebula Graph Source 实现原理

Nebula Flink Connector 中实现的自定义 Nebula Graph Source 数据源提供了两种应用形式,别离是 addSource 和 createInput 形式。

Nebula Graph Source 实现类图如下:

(1)addSource

该形式是通过 NebulaSourceFunction 类实现的,该类继承自 RichSourceFunction 并实现了以下办法:

  • open

筹备 Nebula Graph 连贯信息,并获取 Nebula Graph Meta 服务和 Storage 服务的连贯。

  • close

数据读取实现,开释资源。敞开 Nebula Graph 服务的连贯。

  • run

开始读取数据,并将数据填充到 sourceContext。

  • cancel

勾销 Flink 作业时调用,敞开资源。

(2)createInput

该形式是通过 NebulaInputFormat 类实现的,该类继承自 RichInputFormat 并实现了以下办法:

  • openInputFormat

筹备 inputFormat,获取连贯。

  • closeInputFormat

数据读取实现,开释资源,敞开 Nebula Graph 服务的连贯。

  • getStatistics

获取数据源的根本统计信息。

  • createInputSplits

基于配置的 partition 参数创立 GenericInputSplit。

  • getInputSplitAssigner

返回输出的 split 分配器,按原始计算的程序返回 Source 的所有 split。

  • open

开始 inputFormat 的数据读取,将读取的数据转换 Flink 的数据格式,结构迭代器。

  • close

数据读取实现,打印读取日志。

  • reachedEnd

是否读取实现

  • nextRecord

通过迭代器获取下一条数据

通过 addSource 读取 Source 数据失去的是 Flink 的 DataStreamSource,示意 DataStream 的终点。

通过 createInput 读取数据失去的是 Flink 的 DataSource,DataSource 是一个创立新数据集的 Operator,这个 Operator 可作为进一步转换的数据集。DataSource 能够通过 withParameters 封装配置参数进行其余的操作。

1.4 自定义 Nebula Graph Source 利用实际

应用 Flink 读取 Nebula Graph 图数据时,须要结构 NebulaSourceFunction 和 NebulaOutputFormat,并通过 Flink 的 addSource 或 createInput 办法注册数据源进行 Nebula Graph 数据读取。

结构 NebulaSourceFunction 和 NebulaOutputFormat 时须要进行客户端参数的配置和执行参数的配置,阐明如下:

配置项阐明:

  • NebulaClientOptions

    • 配置 address,NebulaSource 须要配置 Nebula Graph Metad 服务的地址。
    • 配置 username
    • 配置 password
  • VertexExecutionOptions

    • 配置 GraphSpace
    • 配置要读取的 tag
    • 配置要读取的字段集
    • 配置是否读取所有字段,默认为 false,若配置为 true 则字段集配置有效
    • 配置每次读取的数据量 limit,默认 2000
  • EdgeExecutionOptions

    • 配置 GraphSpace
    • 配置要读取的 edge
    • 配置要读取的字段集
    • 配置是否读取所有字段,默认为 false,若配置为 true 则字段集配置有效
    • 配置每次读取的数据量 limit,默认 2000
// 结构 Nebula Graph 客户端连贯须要的参数
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("127.0.0.1:45500")
                .build();
// 创立 connectionProvider
NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

// 结构 Nebula Graph 数据读取须要的参数
List<String> cols = Arrays.asList("name", "age");
VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSource")
                .setTag(tag)
                .setFields(cols)
                .setLimit(100)
                .builder();

// 结构 NebulaInputFormat
NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);

// 形式 1 应用 createInput 形式注册 Nebula Graph 数据源
DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
                              .createInput(inputFormat);
 
// 形式 2 应用 addSource 形式注册 Nebula Graph 数据源
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);
 DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment()
                              .addSource(sourceFunction);

Nebula Source Demo 编写实现后能够打包提交到 Flink 集群执行。

示例程序读取 Nebula Graph 的点数据并打印,该作业以 Nebula Graph 作为 Source,以 print 作为 Sink,执行后果如下:

Source sent 数据为 59,671,064 条,Sink received 数据为 59,671,064 条。

二、Connector Sink

Nebula Flink Connector 中的 Sink 即 Nebula Graph 图数据库。Flink 提供了丰盛的 Connector 组件容许用户自定义数据池来接管 Flink 所解决的数据流。

2.1 Sink 简介

Sink 是 Flink 解决完 Source 后数据的输入,次要负责实时计算结果的输入和长久化。比方:将数据流写入规范输入、写入文件、写入 Sockets、写入内部零碎等。

Flink 的 Sink 能力次要是通过调用数据流的 write 相干 API 和 DataStream.addSink 两种形式来实现数据流的内部存储。

相似于 Flink Connector 的 Source,Sink 也容许用户自定义来反对丰盛的内部数据系统作为 Flink 的数据池。

Flink Sink 的应用形式如下:

本章次要介绍如何通过自定义 Sink 的形式实现 Nebula Graph Sink。

2.2 自定义 Sink

在 Flink 中能够应用 DataStream.addSinkDataStream.writeUsingOutputFormat 的形式将 Flink 数据流写入内部自定义数据池。

Flink 曾经提供了若干实现好了的 Sink Functions,也能够通过实现 SinkFunction 以及继承 RichOutputFormat 来实现自定义的 Sink。

2.3 自定义 Nebula Graph Sink 实现原理

Nebula Flink Connector 中实现了自定义的 NebulaSinkFunction,开发者通过调用 DataSource.addSink 办法并将 NebulaSinkFunction 对象作为参数传入即可实现将 Flink 数据流写入 Nebula Graph。

Nebula Flink Connector 应用的是 Flink 的 1.11-SNAPSHOT 版本,该版本中曾经废除了应用 writeUsingOutputFormat 办法来定义输入端的接口。

源码如下,所以请留神在应用自定义 Nebula Graph Sink 时请采纳 DataStream.addSink 的形式。

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {return this.addSink(new OutputFormatSinkFunction(format));
    }

Nebula Graph Sink 实现类图如下:

其中最重要的两个类是 NebulaSinkFunction 和 NebulaBatchOutputFormat。

NebulaSinkFunction 继承自 AbstractRichFunction 并实现了以下办法:

  • open

调用 NebulaBatchOutputFormat 的 open 办法,进行资源筹备。

  • close

调用 NebulaBatchOutputFormat 的 close 办法,进行资源开释。

  • invoke

是 Sink 中的外围办法,调用 NebulaBatchOutputFormat 中的 write 办法进行数据写入。

  • flush

调用 NebulaBatchOutputFormat 的 flush 办法进行数据的提交。

NebulaBatchOutputFormat 继承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 继承自 RichOutputFormat,次要实现的办法有:

  • open

筹备图数据库 Nebula Graph 的 Graphd 服务的连贯,并初始化数据写入执行器 nebulaBatchExecutor

  • close

提交最初批次数据,期待最初提交的回调后果并敞开服务连贯等资源。

  • writeRecord

外围办法,将数据写入 nebulaBufferedRow 中,并在达到配置的批量写入 Nebula Graph 下限时提交写入。Nebula Graph Sink 的写入操作是异步的,所以须要执行回调来获取执行后果。

  • flush

当 bufferRow 存在数据时,将数据提交到 Nebula Graph 中。

在 AbstractNebulaOutputFormat 中调用了 NebulaBatchExecutor 进行数据的批量治理和批量提交,并通过定义回调函数接管批量提交的后果,代码如下:

    /**
     * write one record to buffer
     */
    @Override
    public final synchronized void writeRecord(T row) throws IOException {nebulaBatchExecutor.addToBatch(row);

        if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {commit();
        }
    }

    /**
     * put record into buffer
     *
     * @param record represent vertex or edge
     */
    void addToBatch(T record) {boolean isVertex = executionOptions.getDataType().isVertex();

        NebulaOutputFormatConverter converter;
        if (isVertex) {converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions);
        } else {converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions);
        }
        String value = converter.createValue(record, executionOptions.getPolicy());
        if (value == null) {return;}
        nebulaBufferedRow.putRow(value);
    }

    /**
     * commit batch insert statements
     */
    private synchronized void commit() throws IOException {graphClient.switchSpace(executionOptions.getGraphSpace());
        future = nebulaBatchExecutor.executeBatch(graphClient);
        // clear waiting rows
        numPendingRow.compareAndSet(executionOptions.getBatch(),0);
    }

    /**
     * execute the insert statement
     *
     * @param client Asynchronous graph client
     */
    ListenableFuture executeBatch(AsyncGraphClientImpl client) {String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
        String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
        // construct insert statement
        String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
        // execute insert statement
        ListenableFuture<Optional<Integer>> execResult = client.execute(exec);
        // define callback function
        Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() {
            @Override
            public void onSuccess(Optional<Integer> integerOptional) {if (integerOptional.isPresent()) {if (integerOptional.get() == ErrorCode.SUCCEEDED) {LOG.info("batch insert Succeed");
                    } else {
                        LOG.error(String.format("batch insert Error: %d",
                                integerOptional.get()));
                    }
                } else {LOG.error("batch insert Error");
                }
            }

            @Override
            public void onFailure(Throwable throwable) {LOG.error("batch insert Error");
            }
        });
        nebulaBufferedRow.clean();
        return execResult;
    }

因为 Nebula Graph Sink 的写入是批量、异步的,所以在最初业务完结 close 资源之前须要将缓存中的批量数据提交且期待写入操作的实现,以防在写入提交之前提前把 Nebula Graph Client 敞开,代码如下:

    /**
     * commit the batch write operator before release connection
     */
    @Override
    public  final synchronized void close() throws IOException {if(numPendingRow.get() > 0){commit();
        }
        while(!future.isDone()){
            try {Thread.sleep(10);
            } catch (InterruptedException e) {LOG.error("sleep interrupted,", e);
            }
        }

        super.close();}

2.4 自定义 Nebula Graph Sink 利用实际

Flink 将解决实现的数据 Sink 到 Nebula Graph 时,须要将 Flink 数据流进行 map 转换成 Nebula Graph Sink 可接管的数据格式。自定义 Nebula Graph Sink 的应用形式是通过 addSink 模式,将 NebulaSinkFunction 作为参数传给 addSink 办法来实现 Flink 数据流的写入。

  • NebulaClientOptions

    • 配置 address,NebulaSource 须要配置 Nebula Graph Graphd 服务的地址。
    • 配置 username
    • 配置 password
  • VertexExecutionOptions

    • 配置 GraphSpace
    • 配置要写入的 tag
    • 配置要写入的字段集
    • 配置写入的点 ID 所在 Flink 数据流 Row 中的索引
    • 配置批量写入 Nebula Graph 的数量,默认 2000
  • EdgeExecutionOptions

    • 配置 GraphSpace
    • 配置要写入的 edge
    • 配置要写入的字段集
    • 配置写入的边 src-id 所在 Flink 数据流 Row 中的索引
    • 配置写入的边 dst-id 所在 Flink 数据流 Row 中的索引
    • 配置写入的边 rank 所在 Flink 数据流 Row 中的索引,不配则无 rank
    • 配置批量写入 Nebula Graph 的数量,默认 2000
/// 结构 Nebula Graphd 客户端连贯须要的参数
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("127.0.0.1:3699")
                .build();
NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);


// 结构 Nebula Graph 写入操作参数
List<String> cols = Arrays.asList("name", "age")
ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag(tag)
                .setFields(cols)
                .setIdIndex(0)
                .setBatch(20)
                .builder();
  
// 写入 Nebula Graph
dataSource.addSink(nebulaSinkFunction);

Nebula Graph Sink 的 Demo 程序以 Nebula Graph 的 space:flinkSource 作为 Source 读取数据,进行 map 类型转换后 Sink 入 Nebula Graph 的 space:flinkSink,对应的利用场景为将 Nebula Graph 中一个 space 的数据流入另一个 space 中。

三、Catalog

Flink 1.11.0 之前,用户如果依赖 Flink 的 Source/Sink 读写内部数据源时,必须要手动读取对应数据系统的 Schema。比方,要读写 Nebula Graph,则必须先保障明确地通晓在 Nebula Graph 中的 Schema 信息。然而这样会有一个问题,当 Nebula Graph 中的 Schema 发生变化时,也须要手动更新对应的 Flink 工作以放弃类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。

1.11.0 版本后,用户应用 Flink Connector 时能够主动获取表的 Schema。能够在不理解内部零碎数据 Schema 的状况下进行数据匹配。

目前 Nebula Flink Connector 中已反对数据的读写,要实现 Schema 的匹配则须要为 Flink Connector 实现 Catalog 的治理。但为了确保 Nebula Graph 中数据的安全性,Nebula Flink Connector 只反对 Catalog 的读操作,不容许进行 Catalog 的批改和写入。

拜访 Nebula Graph 指定类型的数据时,残缺门路应该是以下格局:<graphSpace>.<VERTEX.tag> 或者 <graphSpace>.<EDGE.edge>

具体应用形式如下:

String catalogName  = "testCatalog";
String defaultSpace = "flinkSink";
String username     = "root";
String password     = "nebula";
String address      = "127.0.0.1:45500";
String table        = "VERTEX.player"

// define Nebula catalog
Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password);
// define Flink table environment
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(bsEnv);
// register customed nebula catalog
tEnv.registerCatalog(catalogName, catalog);
// use customed nebula catalog
tEnv.useCatalog(catalogName);

// show graph spaces of Nebula Graph
String[] spaces = tEnv.listDatabases();

// show tags and edges of Nebula Graph
tEnv.useDatabase(defaultSpace);
String[] tables = tEnv.listTables();

// check tag player exist in defaultSpace
ObjectPath path = new ObjectPath(defaultSpace, table);
assert catalog.tableExists(path) == true
    
// get nebula tag schema
CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table));
table.getSchema();

Nebula Flink Connector 反对的其余 Catalog 接口请查看 GitHub 代码 NebulaCatalog.java。

四、Exactly-once

Flink Connector 的 Exactly-once 是指 Flink 借助于 checkpoint 机制保障每个输出事件只对最终后果影响一次,在数据处理过程中即便呈现故障,也不会存在数据反复和失落的状况。

为了提供端到端的 Exactly-once 语义,Flink 的内部数据系统也必须提供提交或回滚的办法,而后通过 Flink 的 checkpoint 机制协调。Flink 提供了实现端到端的 Exactly-once 的形象,即实现二阶段提交的抽象类 TwoPhaseCommitSinkFunction。

想为数据输入端实现 Exactly-once,则须要实现四个函数:

  • beginTransaction

在事务开始前,在指标文件系统的长期目录创立一个临时文件,随后能够在数据处理时将数据写入此文件。

  • preCommit

在预提交阶段,敞开文件不再写入。为下一个 checkpoint 的任何后续文件写入启动一个新事务。

  • commit

在提交阶段,将预提交阶段的文件原子地挪动到真正的目标目录。二阶段提交过程会减少输入数据可见性的提早。

  • abort

在终止阶段,删除临时文件。

根据上述函数可看出,Flink 的二阶段提交对外部数据源有要求,即 Source 数据源必须具备重发性能,Sink 数据池必须反对事务提交和幂等写。

Nebula Graph v1.1.0 尽管不反对事务,但其写入操作是幂等的,即同一条数据的屡次写入后果是统一的。因而能够通过 checkpoint 机制实现 Nebula Flink Connector 的 At-least-Once 机制,依据屡次写入的幂等性能够间接实现 Sink 的 Exactly-once。

要应用 Nebula Graph Sink 的容错性,请确保在 Flink 的执行环境中开启了 checkpoint 配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000) // checkpoint every 10000 msecs
   .getCheckpointConfig()
   .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Reference

  • Nebula Source Demo [testNebulaSource]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
  • Nebula Sink Demo [testSourceSink]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
  • Apache Flink 源码:https://github.com/apache/flink
  • ApacheFlink 零根底入门:https://www.infoq.cn/theme/28
  • Flink 文档:https://flink.apache.org/flink-architecture.html
  • Flink 实际文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/
  • flink-connector-jdbc 源码:https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc
  • Flink JDBC Catalog 详解:https://cloud.tencent.com/developer/article/1697913

喜爱这篇文章?来来来,给咱们的 GitHub 点个 star 表激励啦~~ ????‍♂️????‍♀️ [手动跪谢]

交换图数据库技术?交个敌人,Nebula Graph 官网小助手微信:NebulaGraphbot 拉你进交换群~~

正文完
 0