乐趣区

关于后端:基于-Flink-的小米数据集成实践

摘要:本文整顿自小米计算平台高级工程师胡焕,在 FFA 数据集成专场的分享。本篇内容次要分为四个局部:

  1. 倒退现状
  2. 思考实际
  3. 引擎设计
  4. 将来布局

点击查看直播回放 & 演讲 PPT

一、倒退现状

首先介绍一下小米计算平台,小米计算平台次要负责小米团体的数据开发平台的建设,体现在产品上是小米数据工场,底层引擎上常见的 Flink、Spark、Iceberg、Hive 等等都是由计算平台在负责。

上图是小米数据工场的技术架构图。

正中间的蓝色高亮框是小米自研的消息中间件 Talos,能够把它替换成大家比拟相熟的 Kafka,这对明天的分享内容来说简直没有任何差异。

Talos 右下方的蓝色高亮框是表格存储的技术选型,小米的数据湖技术选型抉择了 Iceberg,Iceberg 是小米数据集成的次要场景之一。

右下角的红色高亮框就是数据集成。小米数据集成目前的次要场景还是数据的入湖入仓,但数据出湖出仓的场景也在快速增长。咱们的最终建设指标是建设各种异构数据系统之间对接的能力,所以目前咱们把数据集成作为一种根底服务在建设。

在产品层面,咱们将数据集成划分为四个次要场景。

  1. 第一个次要场景是数据采集,次要是一些工夫敏感数据的采集,比方客户端和服务端的埋点数据的采集、日志文件采集、物联网数据采集等等。这些场景须要一些专用的采集技术来反对,基本上没有方法整合到一个引擎中,咱们将这些数据集成场景拆分为独立的采集和集成两个局部,两者通过音讯队列进行对接。

    采集的局部设计了独自的数据采集核心,用来配置各种采集服务。这些采集服务对立都输入到音讯队列,这样咱们能够把集成的局部对立起来,通过组合不同的实时集成作业来反对各种数据集成的场景。

  2. 第二个次要场景是实时集成作业,目前只反对音讯队列和数据库作为上游数据源。音讯队列场景里很大一部分的数据起源就是来源于数据采集核心,不同的采集场景对应的支流实时集成场景略有不同,例如埋点场景通常写入到 Iceberg 进行进一步的加工和剖析,日志场景通常不做额定的加工间接写入到 ES 里,还有一些比拟非凡的是 Schema On Read 场景,它们间接写入 HDFS 文件。

    数据库场景与音讯队列场景略有不同,音讯队列场景里的采集和集成这两个局部是齐全互相独立的,在数据库场景里,这两个局部则是紧密结合的。

  3. 第三个次要场景是离线集成作业,离线场景相对来说简略很多,稍微麻烦一点的就是 MySQL 的分库分表和 Doris 这两个场景,前面为大家做具体介绍。
  4. 第四个次要场景是跨集群同步作业,目前咱们只反对 Hive 和 Iceberg 做跨集群的同步,这是因为会波及到跨国的数据传输,在合规和网络方面都会存在一些额定要求,所以咱们尽量避免在采集或集成的环节做跨集群同步。

这四个场景除了数据采集核心无奈整合到一个引擎里,其余三个场景都通过小米数据集成引擎反对了。

上图是集成作业的次要场景的作业数量,红色是实时作业,蓝色是离线作业,灰色字体的是存量作业。

咱们将实时集成作业分为音讯队列场景和数据库场景,再加上离线集成作业、跨集群同步作业,形成了四个次要场景。

留神到每个场景里还有相当多的存量作业,在孵化小米数据集成引擎以前,这些场景都是用不同的引擎来反对的。

上图展现的是小米数据集成的演进。

音讯队列场景过来是由 Spark Streaming 反对的,当初换成了 Flink SQL。

数据库实时场景过来是由自研采集服务反对的,且只反对 MySQL 数据库,当初整体降级到了 Flink CDC + Flink SQL,并借助 Flink CDC 反对了更多的数据库。

离线场景里过来是由 DataX 反对的,当初咱们也都换成了 Flink SQL。

跨集群同步场景过来由 Hadoop 的 DISTCP 命令来实现的,这个命令只能拷贝底层的文件,在拷贝 Hive 表的时候须要配合另外的命令来增加分区,体验十分蹩脚,换到了 Flink SQL 就没有这个问题了,还减少了对 Iceberg 表的跨集群实时拷贝的反对。

直到现在小米还有大量的存量作业在期待迁徙,这些存量作业的各种引擎给咱们带来了十分大的保护累赘。一方面占用了大量人力去相熟和保护这些引擎,另一方面是咱们很难在这些差别微小的引擎上构建出对立的产品,这须要减少很多分支判断,也很容易出错。所以咱们十分迫切地心愿把这些场景由同一个引擎里反对起来,目前来看,Flink 在数据集成畛域的劣势极其显著,小米的数据集成引擎就是基于 Flink 构建的。

二、思考实际

首先咱们尝试形象出几个外围概念,并通过外围问题来界定每个外围概念的范畴和边界。

咱们将数据集成畛域的生产实践都归类到这三个层级上,别离是数据集成畛域、数据集成产品、数据集成引擎,这三个层级解决的外围问题是一层一层递进的。

数据集成畛域的外围问题是连贯。

数据集成的概念是绝对于数据开发的概念来定义的,上图的右侧展现的就是数据开发畛域最常见的技术栈,左侧展现的则是数据集成畛域的范畴。

在数据开发畛域,离线数仓通常应用 Spark+Hive 的技术栈,实时数仓的最新的技术栈是 Flink+ 数据湖,这里的 Hive 和数据湖都能够用 Flink 或 Spark 间接拜访。咱们通常所说的数据开发的工作次要就是在离线数仓或数据湖内进行的,咱们能够认为数据开发畛域次要是基于现成的能够间接拜访的数据进行开发。

在数据集成畛域,关注的重点则是:数据在哪里,怎么拜访这些数据,怎么让数据正确的参加到计算中。图中最左侧列出来的是最常见的几个数据集成场景,在这些场景里咱们不能像惯例的数据开发一样间接拜访这些数据。比方咱们必定不会在要进行数据计算的时候才实时的连贯到服务器甚至客户端读取数据,即便是最惯例的数据库场景,咱们通常也须要用一张 ODS 表来代替数据库参加计算,在作业中直连数据库很容易会让数据库压力过大,产生各种异样。

咱们心愿在数据开发过程中可能以一种对立的形式拜访上游的各种数据源,这个处理过程就是数据集成畛域里要解决的外围问题:连贯。

留神这里强调的是连贯,数据集成常常与数据导入的概念混同,把数据导入到 ODS 表之后再去做数据开发是数据集成畛域的一个优良实际,但不是所有场景都必须要有导入这个步骤,在 Flink 中只有有 Connector 的反对,在简略场景中咱们齐全能够间接连贯数据源做数据开发。

数据集成产品的外围问题是效率。

上图展现了最常见的数仓建模标准,咱们次要关注其中的 ODS 表。

数据开发过程中常常须要屡次读取原始数据,将原始数据导入到 ODS 表,再用 ODS 表代替原始数据参加数据开发,就能够防止反复连贯上游的数据源。

当初常常提到的用 ELT 代替 ETL 的做法,代表的是 ODS 表设计的一个优良实际,咱们称之为镜像同步。镜像同步要求 ODS 表构造与上游数据的构造尽可能放弃完全一致,并尽可能的保留上游数据的所有细节,数据荡涤的步骤则往后放,改为基于 ODS 表施行,这样如果荡涤逻辑存在问题,咱们基于 ODS 表进行修复的代价也十分小。

有镜像同步这个优良实际作为根底,咱们就可能将数据集成的过程标准化了。将镜像同步的整个流程中的各种反复工作固化下来,就造成了数据集成产品,从而能够大幅提高数据集成工作的开发效率。咱们能够认为数据集成产品就是从数据集成畛域的各种优良实际上倒退而来的。

这里须要关注一个细节,将数据开发的后果导出到数据利用的过程中,同样存在很多优良实际,而且这些优良实际的展示模式、底层技术与数据集成产品的类似度都十分高。很多状况下咱们会把这两种场景整合到一个产品里,在技术上这是十分正当的决策,但实质上数据导出场景更适合的名称是数据系统集成,这种做法在某种意义上是拓展了数据集成畛域的边界。

数据集成引擎的外围问题是异构。

在底层技术上,咱们须要有相应的数据集成引擎来反对咱们的数据集成产品。在引擎的设计中,最外围的问题是解决异构数据系统对接带来的各种问题,引擎波及到的数据系统越多,碰到的问题和解决方案也就越简单。

上图展现的是其中的一个例子,不同数据系统反对的字段类型在语义上有细微差别,这些语义差异是数据集成引擎的次要问题起源之一。

举个例子,MySQL 没有布尔类型,通常咱们用 tinyint(1) 来实现布尔类型,但转换为布尔类型还须要配置 JDBC 参数对 Connector 行为做动静的调整。当镜像同步到 Hive 或者 Iceberg 的时候,字段类型如果没有匹配上,就可能会呈现类型转换的异样。

Unsigned 也是一个十分经典的问题,其中最容易出错的是 Bigint Unsigned。Flink、Hive、Iceberg 都没有无符号类型,如果思考到精度问题,咱们就只能应用 decimal(20,0) 来保留 Bigint Unsigned。但很多字段设计成 Unsigned 只是心愿保障这个字段没有负数值,并不会产生有符号数值溢出的状况,所以很多用户依然心愿在 ODS 表中持续应用 Bigint,这里就很容易导致作业呈现各种问题。

直到现在咱们还有很多问题在解决中,对于数据集成引擎来说,解决异构数据系统对接带来的各种问题,屏蔽这些数据系统之间的差别,是最外围也是最有挑战的问题。

总结一下关键词,数据集成畛域的外围是连贯,数据集成产品的外围问题是效率,数据集成引擎的外围问题是异构。各种生产实践根本都能够对应到这三个层级。

Auto Catalog 个性是小米在数据集成畛域的一个外围实际,通过 Auto Catalog 个性能够大幅度晋升波及异构数据系统的作业的开发效率。

正中间的 SQL 就是通过 Auto Catalog 的形式,实现的 MySQL 写入 Iceberg 的作业。右上角的语句是通过 Create Table 语法来援用 MySQL 表的形式,咱们应用 Catalog 语法,也就是上面 SQL 中的“mysql_order.order.orders”三层构造来援用 MySQL 表的时候,就能够省略掉 Create Table 语句,在列比拟多的状况下,这个 Create Table 语句结构很繁琐也很容易出错。

惯例状况下应用 Catalog 语法,咱们须要提前应用 Create Catalog 语句对 Catalog 进行注册,左上角和下方的两条语句就是 Create Catalog 语句。Auto Catalog 个性是在引擎中主动解析 SQL 中的 Catalog 进行主动注册,这样咱们就能够省略这两个 Create Catalog 语句。这不仅能够进步咱们的开发效率,最次要还能够防止一些敏感信息的泄露。

省略掉这三个 DDL 语句之后,整个 SQL 就变得十分简洁了,对数据开发和数据查问的效率晋升都十分显著。

Auto Catalog 个性的两个环节都须要配合底层技术的反对。

第一个是在应用 Catalog 语法援用库表的环节。Catalog 语法尽管很简洁,但目前只有多数 Connector 原生提供了相应的 Catalog 实现。

这里咱们用了两个措施:

  1. 基于 Netflix Metacat 建设了对立的元数据服务,咱们把连贯信息和账号密码等敏感信息都保留到 Metacat 里,这就防止了对外裸露。
  2. 利用 HiveCatalog 的兼容表个性,在 Flink 里变相实现其余数据系统的 Catalog。次要做法就是用 Metacat 实现 HiveMetaStore 接口,这个做法有个毛病,就是减少了类型转换的复杂度。比方原生提供的 Iceberg Catalog,它只须要关注 Iceberg 类型与 Flink 类型之间的双向类型转换,但如果用 Metacat,类型转换过程就变成了原生类型、Metacat 类型、Hive 类型到最初的 Flink 类型的四重类型转换,复杂度显著晋升。所以倡议有原生 Catalog 的状况下,尽量应用原生 Catalog。

第二个是主动注册 Catalog 的环节。手工结构 DDL 语句比拟繁琐,因为它须要连贯信息和账号密码,但正因如此结构 DDL 语句的过程自身就蕴含了受权步骤。而主动注册 Catalog 就躲避掉了这个鉴权过程,所以咱们引入了 Apache Ranger,它是一个平安治理的框架。咱们基于 Apache Ranger 建设了对立权限机制,在 Flink SQL 中做了一个插件,通过在 SQL 优化器中减少规定的形式,来施行表级别的鉴权,这样咱们就能够防止用户去拜访无权限的 Catalog 或者库表了。

集成作业是咱们最次要的数据集成产品,用于提供异构的数据源导入到 ODS 表的最佳实际。集成作业底层引擎是基于 Flink SQL 的,但与惯例的 Flink SQL 作业相比,它额定提供了三个个性:

  1. 镜像同步,即在集成作业中整合了主动创立指标表的逻辑。在表的列十分多,或者蕴含一些很简单的嵌套构造类型的状况下,这个个性能够节俭很多工作量。
  2. 主动同步,即在源表的表构造产生变动的时候,主动将表构造的改变同步到指标表中。既能够保证数据的完整性,同时也缩小了人工染指。
  3. 流批一体,即保障一次作业提交就能够实现整体同步,主动无缝连接全量同步的批作业步骤和增量同步的流作业步骤。

这三个个性与惯例的 Flink SQL 作业存在较大差别,所以咱们在 Flink SQL 作业的根底上整合成了一个集成作业的产品,并进而倒退出了数据集成引擎。

MySQL 实时集成作业里咱们也多加了三个额定的个性。

第一个,专用采集账号。MySQL 的 Binlog 采集权限是整个 MySQL 实例级别的。也就是说只有有了采集权限,就相当于有了该实例所有 DB 的读权限。在小米 MySQL 实例部署多个 DB 的状况十分广泛,对用户账号间接凋谢采集权限的话,DBA 那边是齐全没有方法承受的。所以咱们依然沿用采集作业和集成作业离开的一个架构,两头通过音讯队列对接。

这样采集账号就只在采集作业中应用,采集作业里咱们须要去做一个管制,即咱们只向 Topic 输入单个 DB 的 Binlog。通过这个形式,将采集权限给限度在了 DB 的级别。

第二个,主动断点续传。咱们与 DBA 平台买通了获取主从库拓扑构造的接口。这样咱们就能够优先连贯从库进行采集,在从库生效的状况下,咱们还能够尝试获取其余可用的从库做主动重连。但这里有个前提是须要 MySQL 开启了 GTID,GTID 是 MySQL 的全局事务标记,在主从库中都能放弃惟一,GTID 是主动断点续传的根底。

第三个,千表同步连贯问题。MySQL 实例上,如果建设的采集作业太多,就会给咱们的服务造成压力,所以咱们须要尽可能复用采集作业。因为后面咱们就提供了 DB 级别的 Binlog Topic,咱们就间接共享了 DB 级别的 Binlog Topic。同一个 DB 上的所有表都会复用同一个 Binlog Topic。

在表特地多的状况下,Binlog Topic 的生产速度依然有可能会成为瓶颈,因而咱们在音讯队列上还减少了按表过滤的环节,把音讯过滤的逻辑下推到音讯队列的服务端执行,这样可能无效缩小网络流量、进步生产速度。

整个架构里,咱们把采集作业的局部换成了 Flink CDC,但整体依然是以音讯队列为外围的架构。

分库分表中间件次要有两种实现。一种是将分片规定间接下发到 Client 端,这种状况中间件对外会间接裸露分表或者分库的名称。另外一种是基于代理的,对外展现为单库单表,实际上是由代理服务去转发申请到各个分库或者分表里。

第一种分库分表的反对绝对简略一点,咱们在 Catalog 语法中拓展了正则匹配的反对,能够显著的晋升这种场景的开发效率。

第二种基于代理的中间件就会麻烦很多,代理中间件不太敌对的中央,一个是在实现细节上和 MySQL 服务端有很多差别,另外一个是通过代理服务也无奈采集它的 Binlog。所以关键点还是依赖于后面提到的元数据管理服务,就是咱们须要通过 Metacat 去获取它实在的拓扑构造。

咱们当初的实现是减少一个非凡的后缀,把它真正的分库分表的名称裸露进去,在理论执行的时候,SQL 语句会被转换成 UNION ALL 的模式后再执行。

上图是 Doris 写入反对分区笼罩语义的案例。

Doris 自身不反对 OVERWRITE 这个语义,但在理论场景中,咱们有很多用户心愿应用这个个性,而 Doris 自身又有相似的机制能够实现类似成果,只是目前的 Connector 还没有反对。

咱们在数据集成引擎里加了一个解决,将 OVERWRITE 这个语句转换成等价于左边的三个 SQL 语句的操作,用 Doris 的长期分区个性来实现了 OVERWRITE 的语义。

后面两个例子都是把输出的 SQL 做了一些解决,理论执行的 SQL 是在数据集成引擎内生成的。这个机制,咱们也同样用在了主动同步的个性上。

这里咱们形象出了一个叫 Schema Job 的概念。Schema Job 总是基于源表的表构造,依照最佳实际生成一个指标表的表构造,再把指标表的表构造替换掉,跑完了 Schema Job 咱们就能够认为源表和指标表的表构造曾经保持一致了。

离线集成作业反对主动同步非常简单,只须要在跑 Batch Job 之前执行一次 Schema Job 就能够了。

实时集成作业反对主动同步会略微麻烦一点,依然是先跑一个 Schema Job 把源表和指标表的表构造变成统一的,而后再跑起 Stream Job。当 Stream Job 退出的时候,咱们须要做一个额定的判断,如果 Stream Job 是因为 Schema 变更而退出的,咱们就再调度一个 Schema Job 去放弃表构造统一,而后再尝试依照新的表构造跑起 Stream Job,就这样始终循环上来。

这里有一个细节,对于数据库场景,在产生 DDL 变更时,通常在 CDC Connector 里能够采集到一条 DDL 音讯,咱们能够用这个 DDL 音讯触发 Stream Job 的退出。但在音讯队列场景里,音讯体的构造变更是不会产生相似 DDL 的音讯的,这个时候如果咱们不做任何解决,这个作业会始终失常的执行上来,但这些新的字段可能就被遗漏掉或者抛弃掉了。

这个时候咱们就依赖一个叫做 fail-on-unknown-field 的个性,设置了这个个性之后,咱们会实时查看音讯构造体中是否有 SQL 中没有定义的字段。当检测到未知字段后,咱们就会令以后的 Stream Job 失败,尝试触发 Schema Job 的循环。

咱们在半结构化的数据接入场景上十分依赖这个个性,举例一个十分经典的场景:

很多业务的后端团队和数据团队在组织架构上是离开的,两头通过音讯队列做数据对接。这种场景里,音讯的生产端是后端团队在负责,生产端是数据团队本人建作业去生产,音讯体很多状况下就是某个外围畛域模型的 JSON。

在很多状况下,后端团队更新畛域模型后,数据团队是不晓得的。不做额定解决的状况下,Flink SQL 作业会始终失常执行并疏忽音讯体中的新字段,甚至在开启了 ignore-parse-errors 个性时可能导致整个音讯都被抛弃。在这个场景里咱们就能够用 fail-on-unknown-field 个性将作业被动失败掉,而后提醒用户更新音讯体的 Schema。

基于 fail-on-unknown-field 个性施行 Schema Evolution 有两个前提,第一个是音讯体构造变更不会特地频繁,第二个是音讯体构造变更自身是可能向前兼容的。如果不满足这两个前提,这套计划的可靠性就有很大的隐患。

这种状况下咱们须要回归到 Schema On Read 的高可靠性计划,也就是基于 Hive/HDFS 的计划。Hive 自身有一套十分成熟的 Schema On Read 的工具包,Schema On Read 在写链路上不须要解析音讯的构造,间接把整个音讯体按行存的格局写入到 HDFS 文件上,只在读这些文件的时候才须要用到 Schema 去尝试解析。这样即便咱们的 Schema 与音讯体不匹配,也只是影响解析进去的数据,原始数据自身是不会失落的。

比拟惋惜的是,目前的几个支流数据湖技术都是基于列式存储的,没有现成可用的 Schema On Read 计划,这也是咱们前期可能要去拓展的一个方向。

这里再分享一下 TiDB 百亿级单表实时集成的案例。

TIDB 是一款十分优良的国产分布式开源数据库,从数据集成的角度来看,它有两个十分显著的特点:第一是单表的数据量可能反对十分大的规模,能够上到百亿行 / 数十 TB 的规模;第二是反对快照机制,这对流批一体是十分敌对的个性。

咱们在 TIDB 实时集成的开发过程中,碰到的次要艰难都是在全量同步步骤中写入 Iceberg 的过程产生的。这里最次要的问题是,Iceberg 的 Flink Connector 实现只提供了 Stream Writer,Stream Writer 在数据量微小的批处理场景下的性能比拟差,咱们次要做了两个优化。

上图展现的是 write-distribution-mode 的优化,从上图能够看到集成作业的逻辑非常简单,作业通过 TableSourceScan 从 TIDB 读数据,再通过 IcebergStreamWriter 往 Iceberg 里写数据。TableSourceScan 在读到数据之后,怎么把数据发送给 IcebergStreamWriter 呢?这里就是 Iceberg 的 write-distribution-mode 的配置。

目前有两种模式,左上方是 None 模式,这个模式里 Writer 不占用独自的 stage,而是间接在 TableSourceScan 的 TaskManager 上写入 Iceberg 中。这个模式少了一个 shuffle 阶段,如果 TableSourceScan 的数据分布比拟平均,它的入湖速度就会十分快。但因为 Iceberg 每个 Writer 写入每个分区的时候都会产生一批写入文件,这样写入文件数量就等于 Partition 数量乘以 Writer 的数量。当表规模很大的时候就会产生大量的小文件,对 Compaction 和 HDFS NameNode 造成很大的压力。

左下方是 Hash 模式,这个模式专门为小文件数量做优化,保障每个 Partition 只能由一个 Writer 写入。但调配 Partition 到 Writer 时是用的哈希算法进行调配,因为 Partition 的数量自身就非常少,用哈希算法去调配的时候,简直无奈防止的会产生数据歪斜的问题。

这两种模式在流场景下体现都很不错,然而在 TIDB 的全量同步的过程中,它的问题就会被放大到令咱们无奈承受。所以咱们就引入了 RoundRobin 模式,次要还是在哈希模式的根底下来解决数据歪斜的问题。

咱们剖析了几个最常见的分区函数,通过设定一个非凡排序,依照程序一一把 Partition 调配到 Writer,来确保 Partition 与 Writer 的平衡。这里用到了 PartitionCustom 的分区办法,通过自定义的 Partitioner 对分区进行匹配,目前实现适配的分区函数如下:

  1. Bucket 分区函数,只须要将 bucket_id 按 Writer 数量取模就能够达到实践上最好的平衡成果。
  2. Truncate 分区函数,只能反对数值型,用分区名称除以分区函数的宽度,就能够失去一个间断的整数值,再按 Writer 数量取模即可,这个形式在常见场景和合理配置下能够达到最优的平衡成果,但如果宽度设置过大,反而可能导致数据被集中在多数 Writer 中。
  3. Identity 分区函数,只能反对数值型,将分区名称代表的数值取整,再按 Writer 数量取模,当分区名称间断变动时成果比拟好,分区名称是离散值时成果较差。

RoundRobin 模式在常见场景的成果十分显著,理论测进去的性能相比前两者能有三倍的晋升。

上图右边展现的是 Iceberg 实现 Row Level Delete 的外围逻辑。Iceberg 有个 Delete Storage 来缓存 Checkpoint 期间的所有新增操作,更新和删除操作会依据 Delete Storage 中是否有相应的记录,决定是写入到 eq-delete-file 还是 pos-delete-file,失常状况下 Iceberg 一次 Checkpoint 会提交三个数据文件。

在做大表的全量同步时,Delete Storage 常常缓存了太多数据触发 OOM,咱们最终决定在全量同步的阶段跳过 Delete Storage 的步骤,因为全量同步阶段只有新增,没有更新和删除,实际上用不到 Delete Storage。

这个成果十分显著,咱们终于胜利的反对了百亿级别单表的全量同步。

但跳过 Delete Storage 会带来一个问题,增量同步过程是必须依赖 Delete Storage 的,这就导致全量同步和增量同步无奈一起执行,基于 HybridSource 的计划就不适宜应用这个优化措施了。

咱们只能将 TIDB 实时集成作业拆分成两个独自的作业:Batch Job 和 Stream Job。

Batch Job 基于快照读取 TiDB 的全量数据,它会配置后面说的各个优化项,并行度设置也会大一点。

Batch Job 执行实现后,再执行 Stream Job,Stream Job 从音讯队列中依照快照工夫点接上全量同步的进度,持续生产 CDC 事件执行实时同步的步骤,这个阶段的配置是独自优化的,并行度也会设置的绝对小一些。

咱们将这两个作业的调动逻辑放在 Flink Application 中施行,这样在用户层面看起来就只调度了一个作业,但在理论执行的过程中,Flink Application 会按需调度不同的 Flink Job。

这里重温一下 Flink Application 这个概念,在 Flink on Yarn 模式中,Flink 作业的 jar 包提交到 Yarn 集群后,在 main 办法中跑的逻辑就是 Flink Application。Flink Application 跑在 JobManager 的节点上,但逻辑上依然是两个独立的模块。而且咱们能够在 Flink Application 中提交多个 Flink Job。

目前咱们在 Flink Application 中是串行调度各个 Flink Job,这样在状态复原的时候就会比较简单,因为每次只有一个 Flink Job 须要复原。从实质上来说,内部调度 Flink Job 也能达到完全一致的成果,只是 Flink Application 中刚好有一个比拟适合的中央能够放这些逻辑。

小米数据集成引擎的外围逻辑就是跑在 Flink Application 中的。

三、引擎设计

上图是小米数据集成引擎的总体架构图,目前称这个引擎为 MIDI(Mi Data Integration)。MIDI 的外围逻辑都跑在 Flink Application 中,Flink Application 会在适当的工夫调度三种作业:Batch Job、Stream Job、Schema Job。

MIDI 的输出咱们称之为 MIDI SQL,是在 Flink SQL 的根底上减少了一些自定义语法,MIDI SQL 目前只反对简略的数据集成场景。

从 MIDI SQL 中咱们会解析出三张表,Source Table 是上图最右边源数据系统中的表,Sink Table 是左边的指标数据系统中的表,Middle Table 是下方的长条,代表的是蕴含源表 CDC 事件的 Topic。此外还有一个叫做 Application State Backend 的概念,次要用来记录 Flink Job 的执行状况。

上图是一个典型的数据库实时集成作业的时序图,蕴含了主动同步和流批一体的个性。

MIDI 首先会调度一个 Schema Job 去保障源表和指标表的表构造完全一致,而后生成 Batch SQL 并调度一个 Batch Job 来执行全量同步的步骤。全量同步步骤采纳批模式执行,并行度设置绝对高一些。

之后 MIDI 会获取全量同步的进度点,而后按进度点生成对应的 Stream SQL,并调度 Stream Job 接上之前的进度继续执行实时的增量同步的步骤。实时同步阶段采纳流模式执行,并行度设置绝对会低一点。

MIDI 执行完每一个 Flink Job 都会记录一个执行日志,也就是 Flink Application State Backend 的作用。它实际上就是一个文本文件,与 Checkpoint 目录放在一起。当作业中断后再复原的时候,MIDI 会先从执行日志里找到过后正在跑的那个 Flink Job,再去执行相应的复原动作。

再回到时序图里。如果源表 Schema 没有任何变更,Stream Job 跑起来之后会始终执行。当源表产生了 Schema 变更,就会触发作业退出,而后进入一个循环。咱们会尝试调度 Schema Job 来实现表构造同步,再基于新的表构造调度 Stream Job。这样咱们就能始终保持以后正在跑的 Stream Job,肯定是以最新的表构造在进行同步。

在这个设计思路下,Stream Job 成为了一种非凡的有界流,Stream Job 的生命周期与它执行的 Stream SQL 的 Schema 是强绑定的,Schema 生效后,Stream Job 也就相应的完结退出了。

这是咱们定义的 MIDI-SQL,次要引入三个自定义语法:Auto、Stream、Stream With。

Auto 语法是用来开启主动同步的,它必须与“Select ”独特应用。在理论执行的时候,“Auto ”会被替换为以后最新的表构造字段。

Stream 语法其实早就被 Flink 摈弃了,当初 Flink SQL 语法上并不辨别批作业和流作业,次要以 Source 是否为有界流来确定执行模式。但 MIDI 因为应用了 Catalog 语法来援用库表,同样的 SQL 语句,应用 CDC Connector 时就是流模式,应用 JDBC Connector 时就是批模式,咱们无奈通过 SQL 语句辨别两种状况,所以就把 Stream 语法又加回来了,用来判断执行模式,在 Catalog 中抉择不同的 Connector。

Stream With 语法是用来施行流批一体的,MIDI 的流批一体计划还是基于音讯队列的,咱们用这个语法将音讯队列与源表关联起来。

四、将来布局

将来咱们将对上图提到的几点进行摸索,这里重点提三个点:

  1. Schema On Read 场景反对:基于 Flink 和数据湖的计划更适宜的是结构化数据集成的场景,在半结构化和非结构化场景里,Schema On Read 依然是一个最佳实际,将来咱们心愿持续摸索如何在数据湖技术上提供 Schema On Read 的反对。
  2. 智能数据弥补:这是咱们尝试减少的第四种 Flink Job,咱们心愿定时执行这个步骤,主动的增量的对源表和指标表的数据做比对和弥补。
  3. 引擎个性打磨:MIDI 目前依然在比拟初期的阶段,很多个性还须要打磨,目前正在整顿局部个性反馈到社区独特建设。

点击查看直播回放 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

退出移动版