关于Flink:伴鱼基于-Flink-构建数据集成平台的设计与实现

40次阅读

共计 6405 个字符,预计需要花费 17 分钟才能阅读完成。

数据仓库有四个根本的特色:面向主题的、集成的、绝对稳固的、反映历史变动的。其中数据集成是数据仓库构建的首要前提,指将多个扩散的、异构的数据源整合在一起以便于后续的数据分析。将数据集成过程平台化,将极大晋升数据开发人员的效率。本文次要内容为:

  1. 数据集成 VS 数据同步
  2. 集成需要
  3. 数据集成 V1
  4. 数据集成 V2
  5. 线上成果
  6. 总结

Flink 中文学习网站
https://flink-learning.org.cn

A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.

—— Bill Inmon

一、数据集成 VS 数据同步

「数据集成」往往和「数据同步」在概念上存在肯定的混同,为此咱们对这二者进行了辨别。

  • 「数据集成」特指面向数据仓库 ODS 层的数据同步过程,
  • 「数据同步」面向的是一般化的 Source 到 Sink 的数据传输过程。

二者的关系如下图所示:

「数据同步平台」提供根底能力,不掺杂具体的业务逻辑。「数据集成平台」是构建在「数据同步平台」之上的,除了将原始数据同步之外还蕴含了一些聚合的逻辑(如通过数据库的日志数据对快照数据进行复原,下文将会具体开展)以及数仓标准相干的内容(如数仓 ODS 层库表命名标准)等。目前「数据同步平台」的建设正在咱们的布局之中,但这并不影响「数据集成平台」的搭建,一些同步的需要可提前在「实时计算平台」创立,以「约定」的形式解耦。

值得一提的是「数据集成」也该当涵盖「数据采集」(由特定的工具反对)和「数据荡涤」(由采集粒度、日志标准等因素决定)两局部内容,这两局部内容各个公司都有本人的实现,本文将不做具体介绍。

二、集成需要

目前伴鱼外部数据的集成需要次要体现在三块:Stat Log (业务标准化日志或称统计日志)、TiDB 及 MongoDB。除此之外还有一些 Service Log、Nginx Log 等,此类不具备代表性不在本文介绍。另外,因为实时数仓正处于建设过程中,目前「数据集成平台」只涵盖离线数仓(Hive)。

  • Stat Log:业务落盘的日志将由 FileBeat 组件收集至 Kafka。因为日志为 Append Only 类型,因而 Stat Log 集成绝对简略,只需将 Kafka 数据同步至 Hive 即可。
  • DB(TiDB、MongoDB):DB 数据绝对麻烦,外围诉求是数仓中可能存在业务数据库的镜像,即存在业务数据库中某一时刻(天级 or 小时级)的数据快照,当然有时也有对数据变更过程的剖析需要。因而 DB 数据集成须要将这两个方面都思考进去。

因为以上两种类型的数据集成形式差别较大,下文将别离予以探讨。

三、数据集成 V1

伴鱼晚期「数据集成平台」已具备雏形,这个阶段次要是借助一系列开源的工具实现。随着工夫推动,这个版本裸露的问题也逐步增多,接下来将次要从数据流的角度对 V1 进行论述,更多的细节问题将在 V2 版本的设计中体现。

3.1 Stat Log

日志的集成并未接入平台,而是烟囱式的开发方式,数据集成的链路如下图所示:

Kafka 中的数据先通过 Flume 同步至 HDFS,再由 Spark 工作将数据从 HDFS 导入至 Hive 并创立分区。整体链路较长且引入了第三方组件(Flume)减少了运维的老本,另外 Kafka 的原始数据在 HDFS 冗余存储也减少了存储的开销。

3.2 DB

DB 数据的集成次要是基于查问的形式(批的形式,通过 Select 查问进行全表扫描失去快照数据)实现,其链路如下图所示:

用户通过平台提交集成工作,由 Airflow 定时工作扫描集成平台元数据库,生成对应的取数工作(TiDB 的数据通过 Sqoop 工具,MongoDB 的数据则通过 Mongoexport 工具)。能够看到 V1 版本并没有获取数据库的变更的日志数据,不能满足对数据变更过程的剖析诉求。

因为 Sqoop 工作最终要从 TiDB 生产环境的业务数据库获取数据,数据量大的状况下势必对业务数据库造成肯定的影响。Mongoexport 工作间接作用在 MongoDB 的暗藏节点(无业务数据申请),对于线上业务的影响能够忽略不计。基于此,DBA 独自搭建了一套 TiDB 大数据集群,用于将体量较大的业务数据库同步至此(基于 TiDB Pump 和 Drainer 组件),因而局部 Sqoop 工作能够从此集群拉群数据以打消对业务数据库的影响。从数据流的角度,整个过程如下图所示:

是否将生产环境 TiDB 业务数据库同步至 TiDB 大数据集群由数仓的需要以及 DBA 对于数据量评估决定。能够看出,这种模式也存在着大量数据的冗余,集群的资源随着同步工作的减少时长达到瓶颈。并且随着后续的演进,TiDB 大数据集群也涵盖一部分数据利用生产环境的业务数据库,集群作用域逐步含糊。

四、数据集成 V2

V2 版本咱们引入了 Flink,将同步的链路进行了简化,DB 数据集成从之前的基于查问的形式改成了基于日志的形式(流的形式),大大降低了冗余的存储。

4.1 Stat Log

借助 Flink 1.11 版本后对于 Hive Integration 的反对,咱们能够轻松的将 Kafka 的数据写入 Hive,因而 Stat Log 的集成也就变得异样简略(相比 V1 版本,去除了对 Flume 组件的依赖,数据冗余也打消了),同时 Flink Exactly-Once 的语义也确保了数据的准确性。从数据流的角度,整个过程如下图所示:

目前依照小时粒度生成日志分区,几项 Flink 工作配置参数如下:

checkpoint: 10 min
watermark: 1 min
partition.time-extractor.kind:‘custom’
sink.partition-commit.delay:‘3600s’
sink.partition-commit.policy.kind:‘metastore,success-file’
sink.partition-commit.trigger:‘partition-time’

4.2 DB

基于日志的形式对 DB 数据进行集成,意味着须要采集 DB 的日志数据,在咱们目前的实现中 TiDB 基于 Pump 和 Drainer 组件(目前生产环境数据库集群版本暂不反对开启 TICDC),MongoDB 基于 MongoShake 组件,采集的数据将输送至 Kafka。采纳这种形式,一方面升高了业务数据库的查问压力,另一方面能够捕获数据的变更过程,同时冗余的数据存储也打消了。不过因为原始数据是日志数据,须要通过肯定的伎俩还原出快照数据。新的链路如下图所示:

用户提交集成工作后将同步创立三个工作:

  • 增量工作(流):「增量工作」将 DB 日志数据由 Kafka 同步至 Hive。因为采集组件都是依照集群粒度进行采集,且集群数量无限,目前都是手动的形式将同步的工作在「实时计算平台」创立,集成工作创立时默认假设同步工作曾经 ready,待「数据同步平台」落地后能够同步做更多的自动化操作和校验。
  • 存量工作(批):要想还原出快照数据则至多须要一份初始的快照数据,因而「存量工作」的目标是从业务数据库拉取集成时数据的初始快照数据。
  • Merge 工作(批):「Merge 工作」将存量数据和增量数据进行聚合以还原快照数据。还原后的快照数据可作为下一日的存量,因而「存量工作」只需调度执行一次,获取初始快照数据即可。

「存量工作」和「Merge 工作」由离线调度平台 Dolphinscheduler(简称 DS)调度执行,工作执行过程中将从集成工作的元数据库中获取所需的信息。目前「Merge 工作」按小时粒度调度,即每小时还原快照数据。

从数据流的角度,整个过程如下图所示:

DB 的数据集成相较于 Stat Log 复杂性高,接下来以 TiDB 的数据集成为例讲述设计过程中的一些要点(MongoDB 流程相似,区别在于存量同步工具及数据解析)。

4.2.1 需要表白

对于用户而言,集成工作须要提供以下两类信息:

  • TiDB 源信息:包含集群、库、表
  • 集成形式:集成形式示意的是快照数据的聚合粒度,包含全量和增量。全量示意须要将存量的快照数据与今日的增量日志数据聚合,而增量示意只须要将今日的增量日志数据聚合(即使增量形式无需和存量的快照数据聚合,但初始存量的获取仍旧是有必要的,具体的应用模式由数仓人员自行决定)。

4.2.2 存量工作

存量工作尽管有且仅执行一次,但为了齐全打消数据集成对业务数据库的影响,咱们抉择数据库的备份 - 复原机制来实现。公司外部数据库的备份和复原操作曾经平台化,集群将定期进行备份(天粒度),通过平台能够查问到集群的最新备份,并且可由接口触发备份复原操作,故存量的获取可间接作用于复原的数据库。

因为数据库备份的工夫点与集成工作提交的工夫点并不一定是同一天,这之间存在着肯定的时间差将导致存量快照数据不合乎咱们的预期,各工夫点的关系如下图所示:

依照咱们的设定,存量快照数据该当是蕴含 T4 之前的全副数据,而理论备份的快照数据仅蕴含 T1 之前的全副数据,这之间存在这 N 天的数据差。

注:这里之所以不说数据差集为 T1 至 T4 区间的数据,是因为增量的 Binlog 数据是以整点为分区的,在 Merge 的时候也是将整点的分区数据与存量数据进行聚合,并反对了数据去重。因而 T1 时刻的存量数据与 T0-T3 之间的增量数据的 Merge 后果等效于 T0 时刻的存量数据与 T0-T3 之间的增量数据的 Merge 后果。所以 T1 至 T4 的数据差集等效 T0 至 T3 的数据差集,即图示中的 N 天数据。

对于缺失的这部分数据实则是能够在「存量工作」中进行补全,仔细分析这其实是能够通过执行的「Merge 工作」的补数操作实现。

整个「存量工作」的工作流如下图所示:

  • 同步触发数据库平台进行备份复原,产生回执 ID。
  • 通过回执 ID 轮训备份复原状态,复原失败须要 DBA 定位异样,故将下线整个工作流,待复原胜利可在平台从新复原执行「存量工作」。复原进行中,工作流间接退出,借助 DS 定时调度期待下次唤醒。复原胜利,进入后续逻辑。
  • 从复原库中拉取存量,断定存量是否存在数据差,若存在则执行 Merge 工作的补数操作,整个操作可幂等执行,如若失败退出此次工作流,期待下次调度。
  • 胜利,下线整个工作流,工作实现。

4.2.3 Merge 工作

Merge 工作的前提是存量数据与增量数据都曾经 ready,咱们通过 _SUCCESS 文件进行标记。整个「Merge 工作」的工作流如下图所示:

  • 校验文件标记是否存在,若不存在阐明数据未 ready,进行报警并退出工作流期待下次调度。
  • 执行 Merge 操作,失败报警并退出工作流期待下次调度。
  • 胜利,退出工作流期待下次调度。

Merge 操作通过 Flink DataSet API 实现。外围逻辑如下:

  • 加载存量、增量数据,对立数据格式(外围字段:主键 Key 作为同一条数据的聚合字段;CommitTs 标识 binlog 的提交工夫,存量数据默认为 0 早于增量数据;OpType 标识数据操作类型,包含:Insert、Update、Delete,存量数据默认为 Insert 类型),将两份数据进行 union。
  • 依照主键聚合。
  • 保留聚合后 CommitTs 最大的数据条目,其余抛弃。
  • 过滤 OpType 为 Delete 类型的数据条目。
  • 输入聚合后果。

外围代码:

allMergedData.groupBy(x -> x.getKeyCols())
             .reduce(new ReduceFunction<MergeTransform>() {public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception {if (value1.getCommitTS() > value2.getCommitTS()){return value1;}
                     return value2;
                 }
             })
             .filter(new FilterFunction<MergeTransform>() { // 增量:过滤掉 op=delete
                 
                 public boolean filter(MergeTransform merge) throws Exception {if (merge.getOpType().equals(OPType.DELETE)){return false;}
                     return true;
                 }
             })
             .map(x -> x.getHiveColsText())
             .writeAsText(outPath);

次要思维为「后来者居上」,针对于 Insert、Update 操作,最新值间接笼罩旧值,针对 Delete 操作,间接抛弃。这种形式也人造的实现了数据去重操作。

4.2.4 容错性与数据一致性保障

咱们大体能够从三个工作故障场景下的解决形式来验证计划的容错性。

  • 「存量工作」异样失败:通常是备份复原失败导致,DS 工作将发送失败报警,因「数据库平台」暂不反对复原重试,需人工染指解决。同时「Merge 工作」检测不到存量的 _SUCCESS 标记,工作流不会向后推动。
  • 「增量工作」异样失败:Flink 本身的容错机制以及「实时计算平台」的内部检测机制保障「增量工作」的容错性。若在「Merge 工作」调度执行期间「增量工作」尚未复原,将误以为该小时无增量数据跳过执行,此时相当于快照更新提早(Merge 是将全天的增量数据与存量聚合,在之后的调度工夫点如果「增量工作」复原又能够聚合失去最新的快照),或者在「增量工作」复原后可人为触发「Merge 工作」补数。
  • 「Merge 工作」异样失败:工作具备幂等性,通过设置 DS 工作失败后的重试机制保障容错性,同时发送失败报警。

以上,通过主动复原机制和报警机制确保了整个工作流的正确执行。接下来咱们能够从数据的角度看一下计划对于一致性的保障。

数据的一致性体现在 Merge 操作。两份数据聚合,从代码层面肯定能够确保算法的正确性(这是可验证的、可测试的),那么惟一可能导致数据不统一的状况呈现在两份输出的数据上,即存量和增量,存在两种状况:

  • 存量和增量数据有交叠:体现在初始存量与整点的增量数据聚合场景,因为算法人造的去重性能够保证数据的统一。
  • 存量和增量数据有缺失:体现在增量数据的缺失上,而增量数据是由 Flink 将 Kafka 数据写入 Hive 的,这个过程中是有肯定的可能性造成数据的不统一,即分区提交后的乱序数据。尽管说乱序数据到来后的下一次 checkpoint 工夫点分区将再次提交,但上游工作个别是检测到首次分区提交就会触发执行,造成上游工作的数据不统一。

针对 Flink 流式写 Hive 过程中的乱序数据处理能够采取两种伎俩:一是 Kafka 设置单分区,多分区是产生导致乱序的根因,通过防止多分区打消数据乱序。二是报警弥补,乱序一旦产生流式工作是无奈完全避免的(可通过 watermark 设置乱序容忍工夫,但终有一个界线),那么只能通过报警做事后弥补。问题转换成了如何感知到乱序,咱们能够进一步剖析,既然乱序数据会触发前一个分区的二次提交,那么只须要在提交分区的时候检测前一个分区是否存在 _SUCCESS 标记便能够通晓是否是乱序数据以及触发报警。

五、线上成果

总览

存量工作

Merge 工作

六、总结

本文论述了伴鱼「数据集成平台」外围设计思路,整个计划还有一些细节未在文章中体现,如数据 Schema 的变更、DB 日志数据的解析等,这些细节对于平台构建也至关重要。目前伴鱼绝大部分的集成工作已切换至新的形式并稳固运行。咱们也正在推动实时数仓集成工作的接入,以提供更对立的体验。

原文:伴鱼数据集成平台的设计与实现


近期热点

  • Flink Forward Asia 2021 延期,线上相见
  • 奖金翻倍!Flink Forward Asia Hackathon 最新参赛指南请查收


更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0