关于后端:如何快速实现-BitSail-Connector

32次阅读

共计 3496 个字符,预计需要花费 9 分钟才能阅读完成。

  1. 目录构造
    首先开发者须要通过 git 下载最新代码到本地,并导入到 IDE 中。同时创立本人的工作分支,应用该分支开发本人的 Connector。
    我的项目地址:github.com/bytedance/b…
    我的项目构造如下:
  1. 开发流程
    BitSail 是一款基于分布式架构的数据集成引擎,Connector 会并发执行。并由 BitSail 框架来负责工作的调度、并发执行、脏数据处理等,开发者只须要实现对应接口即可,具体开发流程如下:

工程配置,开发者须要在 bitsail/bitsail-connectors/pom.xml 模块中注册本人的 Connector,同时在 bitsail/bitsail-dist/pom.xml 减少本人的 Connector 模块,同时为你的连接器注册配置文件,来使得框架能够在运行时动静发现它。

Connector 开发,实现 Source、Sink 提供的形象办法,具体细节参考后续介绍。
数据输入类型,目前反对的数据类型为 BitSail Row 类型,无论是 Source 在 Reader 中传递给上游的数据类型,还是 Sink 从上游生产的数据类型,都应该是 BitSail Row 类型。

  1. Architecture
    以后 Source API 的设计同时兼容了流批一批的场景,换言之就是同时反对 pull & push 的场景。在此之前,咱们须要首先再过一遍传统流批场景中各组件的交互模型。
    3.1 Batch Model
    传统批式场景中,数据的读取个别分为如下几步:

createSplits:个别在 client 端或者核心节点执行,目标是将残缺的数据依照指定的规定尽可能拆分为较多的 rangeSplits,createSplits 在作业生命周期内有且执行一次。
runWithSplit: 个别在执行节点节点执行,执行节点启动后会向核心节点申请存在的 rangeSplit,而后再本地进行执行;执行实现后会再次向核心节点申请直到所有 splits 执行实现。
commit:全副的 split 的执行实现后,个别会在核心节点执行 commit 的操作,用于将数据对外可见。

3.2 Stream Model
传统流式场景中,数据的读取个别分为如下几步:

createSplits:个别在 client 端或者核心节点执行,目标是依据滑动窗口或者滚动窗口的策略将数据流划分为 rangeSplits,createSplits 在流式作业的生命周期中依照划分窗口的会始终执行。
runWithSplit: 个别在执行节点节点执行,核心节点会向可执行节点发送 rangeSplit,而后在可执行节点本地进行执行;执行实现后会将解决完的 splits 数据向上游发送。
commit:全副的 split 的执行实现后,个别会向指标数据源发送 retract message,实时动静展示后果。

3.3 BitSail Model

createSplits:BitSail 通过 SplitCoordinator 模块划分 rangeSplits,在流式作业中的生命周期中 createSplits 会周期性执行,而在批式作业中仅仅会执行一次。
runWithSplit: 在执行节点节点执行,BitSail 中执行节点包含 Reader 和 Writer 模块,核心节点会向可执行节点发送 rangeSplit,而后在可执行节点本地进行执行;执行实现后会将解决完的 splits 数据向上游发送。
commit:writer 在实现数据写入后,committer 来实现提交。在不开启 checkpoint 时,commit 会在所有 writer 都完结后执行一次;在开启 checkpoint 时,commit 会在每次 checkpoint 的时候都会执行一次。

  1. Source Connector

Source: 数据读取组件的生命周期治理,次要负责和框架的交互,构架作业,不参加作业真正的执行。
SourceSplit: 数据读取分片;大数据处理框架的外围目标就是将大规模的数据拆分成为多个正当的 Split。
State:作业状态快照,当开启 checkpoint 之后,会保留以后执行状态。
SplitCoordinator: 既然提到了 Split,就须要有相应的组件去创立、治理 Split;SplitCoordinator 承当了这样的角色。
SourceReader: 真正负责数据读取的组件,在接管到 Split 后会对其进行数据读取,而后将数据传输给下一个算子。

Source Connector 开发流程如下

首先须要创立 Source 类,须要实现 Source 和 ParallelismComputable 接口,次要负责和框架的交互,构架作业,它不参加作业真正的执行。
BitSail 的 Source 采纳流批一体的设计思维,通过 getSourceBoundedness 办法设置作业的解决形式,通过 configure 办法定义 readerConfiguration 的配置,通过 createTypeInfoConverter 办法来进行数据类型转换,能够通过 FileMappingTypeInfoConverter 失去用户在 yaml 文件中自定义的数据源类型和 BitSail 类型的转换,实现自定义化的类型转换。
最初,定义数据源的数据分片格局 SourceSplit 类和闯将治理 Split 的角色 SourceSplitCoordinator 类。
最初实现 SourceReader 实现从 Split 中进行数据的读取。

每个 SourceReader 都在独立的线程中执行,并保障 SourceSplitCoordinator 调配给不同 SourceReader 的切片没有交加。
在 SourceReader 的执行周期中,开发者只须要关注如何从结构好的切片中去读取数据,之后实现数据类型对转换,将内部数据类型转换成 BitSail 的 Row 类型传递给上游即可。

4.1 Reader 示例
public class FakeSourceReader extends SimpleSourceReaderBase<Row> {

  private final BitSailConfiguration readerConfiguration;
  private final TypeInfo<?>[] typeInfos;

  private final transient int totalCount;
  private final transient RateLimiter fakeGenerateRate;
  private final transient AtomicLong counter;

  private final FakeRowGenerator fakeRowGenerator;

  public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) {
    this.readerConfiguration = readerConfiguration;
    this.typeInfos = context.getTypeInfos();
    this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT);
    this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE));
    this.counter = new AtomicLong();
    this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask());
  }

  @Override
  public void pollNext(SourcePipeline<Row> pipeline) throws Exception {
    fakeGenerateRate.acquire();
    pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos));
  }

  @Override
  public boolean hasMoreElements() {
    return counter.incrementAndGet() <= totalCount;
  }
}

正文完
 0