关于数据库:Flink-on-TiDB-便捷可靠的实时数据业务支撑

1次阅读

共计 5395 个字符,预计需要花费 14 分钟才能阅读完成。

作者介绍: 林佳,网易互娱计费数据核心实时业务负责人,实时开发框架 JFlink-SDK 和实时业务平台 JFlink 的主程,Flink Code Contributor。

本文由网易互娱计费数据核心实时业务负责人林佳老师分享,次要介绍网易数据中心在解决实时业务时为什么抉择 Flink 和 TiDB,以及两者的联合利用状况。

明天次要从开发的角度来跟大家聊一聊为什么网易数据中心在解决实时业务时,抉择 Flink 和 TiDB。

首先,TiDB 是一个混合型的 HTAP 分布式数据库,具备一键程度伸缩、强一致性的多正本数据安全、分布式事务、实时 OLAP 等重要个性,同时兼容 MySQL 协定和生态,迁徙便捷,运维老本极低。而 Flink 是目前最热门的开源计算框架,在解决实时数据方面,其高吞吐量、低提早的优异性能以及对 Exactly Once 语义的保障为网易游戏实时业务解决提供了便捷反对。

Flink on TiDB 到底能够发明怎么的业务价值? 本文将从一个实时累加值的故事来跟大家分享。

从一个实时累加值的故事说起

接触过线上业务的同学应该对上述数据十分相熟,这是一张经典的线上实时业务表,也能够了解为日志或某种枯燥递增的数据,蕴含了事实产生的工夫戳、账户、购买物品、购买数量等。针对这类数据的剖析,假如应用 Flink 等实时计算框架,能够通过分桶解决,如 groupby 用户 ID,groupby 道具,再对工夫进行分桶,最终将产生如下的继续数据。

如果将上述继续数据落入 TiDB,与此同时 TiDB 仍放弃已有的线上维度表,如账户信息、道具信息等, 通过对表做一个 JOIN 操作就能疾速从事实的统计数据中剖析出时序数据所代表的价值 ,再对接到可视化利用,能发现很多不一样的货色。

整个过程看起来非常简单又完满,Flink 解决计算问题,TiDB 解决海量存储问题。但,事实真的如此吗?

理论接触线上数据的同学可能会遇到相似的问题,如:

  • 多种数据源 :各个业务方的内部系统日志,并且存在有的数据存储在数据库,有的须要以日志的形式调用,还有以 rest 接口调用的形式。
  • 数据格式多样 :各个业务或渠道打的数据格式齐全不同,有的是 JSON,有的是 Encoded URL。
  • 乱序达到 :数据达到程序被打乱。

基于上述问题,咱们引入了 Flink。在数据中心外部,咱们封装了一套称之为 JFlink – SDK 的框架,次要基于 Flink 对 ETL、乱序解决、分组聚合以及一些罕用需要进行模块化、配置化,而后通过线上数据源的配置,计算失去一些事实的统计或事实数据,最初入到能够包容海量数据的 TiDB 中。

然而,Flink 在解决这批数据时,为了故障复原,会通过 CheckPoint 保留数据以后的计算状态。如果在两次保留期间,产生了数据计算的 commit,即这部分计算结果曾经刷出 TiDB 了,而后产生了故障,那么 Flink 会主动回退到上一个 CheckPoint 的地位,即回退到上一次正确的状态。此时,如图的 4 笔数据就会被重算,重算之后可能会被更新到 TiDB 中。

如果数据是个累加值的话,能够看到其累加值被谬误地累加了两遍,这是应用 Flink on TiDB 可能呈现的问题之一。

Flink 的精确保障

Flink 的精确保障

Flink 如何提供准确性保障?首先,须要理解 Flink 的 CheckPoint 机制。CheckPoint 相似于 MySQL 的事务保留点,指在做实时数据处理时,对长期状态的保留。

CheckPoint 分为 At least Once 和 Exactly Once,但即便抉择应用 Exactly Once 也无奈解决下面累加值反复计算的问题。比方从 Kafka 读了数据,以上述事实表为根底 account 是 1000、购买物品为 a、购买数量别离为 1 件和 2 件,此时 Flink 解决数据就会被分到分桶里。与此同时,另一种 Key 会被 Keyby,相当于 MySQL 的 groupby 分到另一个桶里去计算,而后通过聚合函数刷到 TiDB Sink 中。

计算状态的保留

Flink 通过 CheckPoint 机制来保证数据的 Exactly Once。假如须要进行一个比较简单的执行打算 DAG,只有一个 source,而后通过 MAP 刷 TiDB sink。在这个过程中,Flink 是线性的,通过在数据流外面插入 CheckPoint barrier 机制来实现,相当于 CheckPoint barrier 走到哪里,哪里就触发线性执行打算中的算子保留点。

假如从 source 开始,那么会保留 source,如果是 Kafka,须要存一下 Kafka 的以后生产地位。在节点保留结束之后,须要做下一个算子的状态保留,此处的 MAP 假如是分桶计算,那么它其实就曾经存了桶里的累积数据。

在此之后,CheckPoint barrier 就达到了 sink,此时 sink 也去做相应的状态贮存。当相应的状态存储别离做完之后,总的 Job Manager(相当于 Master)汇报状态存储的 CheckPoint 曾经实现了。

而当 Master 确认了所有的子工作都曾经实现了分布式工作的 CheckPoint 之后,会散发一个 Complete 的信息。如上图模型所示,能够联想到它其实就是 2PC,分布式二阶段提交协定,每个分布式子工作别离提交本人的事务,而后再整体提交整个事务。被存下来的状态将存储在 RocksDB 中,当呈现故障时,能够从 RocksDB 复原数据,而后从断点从新计算整个流程。

Exactly Once 语义反对

回看 Exactly Once,上述形式真的能实现 Exactly Once 吗?其实不能,但为何 Flink 官网称这是 Exactly Once 呢?以下将详述其中原因。

从上图的代码能够看出,Exactly Once CheckPoint 是无奈保障端到端的,只能保障 Flink 外部算子的 Exactly Once。因而,将计算数据去写入 TiDB 时,如果 TiDB 无奈与 Flink 联动,就无奈保障端到端的 Exactly Once 了。

类比一下什么是端到端,其实 Kafka 就反对这种语义,因为 Kafka 对外裸露了 2PC 的接口,容许用户手动调整接口来管制 Kafka 事务的 2PC 过程,也因而能够利用 CheckPoint 机制来防止算错的状况。

但如果不能手动管制,那会怎么样呢?

咱们来看看如下实例,假如依然将用户设置为 1000,购买道具为 A 的数据写入到 TiDB 的累加表,会生成如下 SQL:INSERT VALUES ON DUPLICATE UPDATE。当 CheckPoint 产生时,是否保障该语句被执行到 TiDB?

如果不加非凡解决,简略执行这条 SQL 的话,其实不能保障这条 SQL 到底有没有被执行,如未执行,则会报错,退回到上一个 CheckPoint,大快人心。因为它实际上没有计算,没有累加,也不会反复计算一遍,所以是对的。但如果曾经写出,再去反复的退回上一个 CheckPoint,那么将会呈现反复累加 3 的状况。

Flink 为了解决这个问题,提供了一种接口,能够手动实现 SinkFunction,管制事务的开始,Pre Commit、Commit、Rollback。

而 CheckPoint 机制实质是一种 2PC,当分布式算子在执行内部事务时,其实算子关联到 Pre Commit。同理,假如在 Kafka 中,能够通过 Pre Commit 事务将 Kafka 事务预提交。当算子收到 Job Manager(即 Master)同步的所有算子 CheckPoint 的状态保留都已实现时,此时 Commit,事务是必然胜利的。

如果其余算子失败了,则须要进行 Rollback,确保事务没有被胜利地提交到远端。这里如果有 2PC SinkFunction 加上 XA 全 section 语义的话,其实就能够做到严格意义的 Exactly Once。

但不是所有的 sink 都反对二阶段提交协定,比方 TiDB 外部是二阶段提交来治理协调其事务,然而目前来说,并没有把二阶段提交协定提供给用户手动管制。

幂等计算

那么,如何做到保障业务的 Exactly Once 后果落到 TiDB?其实也很简略,采纳 At Least Once 语义加上一个 Unique Key,即幂等计算。

如何抉择 Unique Key? 如果一份数据有一个惟一标记,咱们天然会抉择其惟一标记。比方一份数据有惟一 ID,当一张表通过 Flink 同步到另一张表的时候,这就是很经典的利用其 Primary key 做 insert ignore 或者 replace into 的语义去重。如果是日志,能够抉择日志文件特有的属性。而如果通过 Flink 去计算聚合后果,则能够用聚合的 Key 加上窗口边界值,或者其余的幂等形式来计算出数值,作为最终计算的惟一键。

如此,就能够实现后果是可重入的。既然可重入,再加上 CheckPoint 的可回退个性,就能够把 Flink 跟 TiDB 联合起来,做到精准的 Exactly Once 后果写入。

Flink on TiDB

在 Flink on TiDB 局部,咱们外部的 JFlink 框架对 Flink 进行封装,而后在与 TiDB 联动上又做了什么?以下将详述。

数据连接器的设计

首先,是数据连接器的设计。因为 Flink 对于 TiDB 的反对或者说对关系型数据库的反对都比较慢,Flink Conector JDBC 在 Flink 1.11 版本才呈现,工夫还不太长。

目前,咱们将 TiDB 作为数据源,把数据放在 Flink 解决,次要是通过 TiDB 官网提供的 CDC 工具,相当于通过监听 TiDB 的变更,将数据落到 Kafka。而 Kafka 又是十分经典的流式数据管道,所以通过 Kafka 将数据进行生产解决,而后再通过 Flink 进行解决。

但不是所有业务都能够用 CDC 模式,比方落数据时要减少一些比较复杂的过滤条件,或者落数据时须要定期读取某些配置表,亦或者先须要理解内部的一些配置项能力晓得切分状况时,可能就须要手动的自定义 source。

而 JFlink 在封装时,其实是封装了业务字段的枯燥表来进行切片读取。枯燥是指某张表肯定会有某个字段,枯燥变动的,或者是 append only。

在实现上,TiDB 和 Flink 之间,封装了 JFlink TiDB Connect,通过一个连接词去创立跟 TiDB 的链接。而后通过异步线程来捞数据,再通过阻塞队列进行阻塞。阻塞队列的作用次要是为了流控。

对于 Flink 的主线程,次要通过监听阻塞队列上的有非空信号。当收到非空信号时,就把数据拉进去,通过反序列化器作为整个实时处理框架的流转对象,而后能够对接前面各种模块化了的 UDF。在实现 source 的 At Least Once 语义时,如果借助 Flink 的 CheckPoint 机制,就变得非常简单了。

因为咱们曾经有个大前提,即这张表是一张由某个字段组成的枯燥表,在枯燥表上进行数据切分时,就能够记下以后的切分地位。如果产生故障,让整条流回退到上一个 CheckPoint,source 也会回退到上一个保留的切片地位,此时就可能保障不漏数据的生产,即实现了 source 的 At Least Once。

对于 sink,其实 Flink 官网是提供了 JDBC sink,当然 source 也提供了 JDBC sink,然而 Flink 官网提供的 JDBC sink 实现比拟奢侈,应用同步批量插入的语义。

其实同步批量插入是比拟激进的,当数据量比拟大时,且没有严格的先来先提交的语义,此时应用同步提交相对来说性能不是很高,如果应用异步提交的话,性能就会晋升很多,相当于充分利用了 TiDB 分布式数据库的个性,反对小事务高并发,有助于晋升 QPS。

当咱们实现 sink 时,实际上原理也非常简单。咱们这里先讲讲 Flink 官网是怎么实现。Flink 官网是通过将 Flink 的主线程写到一张 buffer 中,当 buffer 写满时进行换页,同时拉起一条线程将数据同步到 TiDB。

而咱们的改良是通过一个阻塞队列来进行流控,而后把数据写到某个 buffer 页,当 buffer 页写满时,马上拉起一条异步线程去刷出,这样就能够保障在非 FIFO 语义下晋升 QPS 的性能。实践证明,通过这种形式,咱们能够把官网写出的 QPS 从大略 3 万多晋升到靠近 10 万。

不过在实现 sink 的 At Least Once 语义的时候就相对来说简单一点。回忆 CheckPoint 机制,如果咱们要实现 sink 的 At Least Once,就必须保障 CheckPoint 实现时,sink 是洁净的,即所有数据都刷出了,这样能力保障其 At Least Once。在这种状况下,可能就须要将 CheckPoint 的线程、一般刷出的主线程以及其余的换页线程等都加上。当触发 CheckPoint 的时候,同步把所有数据都保障刷洁净之后,才去实现 CheckPoint。如此,一旦 CheckPoint 实现,sink 必然是洁净的,也意味着后面流过来的所有数据都正确更新到 TiDB 了。

在咱们优化结束之后,实现了 100k 左右的 OPS。在咱们测试环境上,大略是三台物理机混布 PD、TiKV、TiDB 这些节点。

业务场景

咱们目前技术核心计费数据核心应用 TiDB 跟 Flink 联合的利用场景十分多。如:

  • 海量业务日志数据的实时格式化入库;
  • 基于海量数据的剖析统计;
  • 实时 TiDB / Kafka 双流连贯的领取链路剖析;
  • 对通数据地图;
  • 时序数据。

所以,能够看到其实 Flink on TiDB 在网易数据中心业务层的利用是遍地开花的,此处援用一句,“桃李不言,下自成蹊”,既然能用到这么宽泛,也就证实了这条路其实是十分有价值的。

正文完
 0