关于数据库:TiFlink使用-TiKV-和-Flink-实现强一致的物化视图丨TiDB-Hackathon-项目分享

52次阅读

共计 9674 个字符,预计需要花费 25 分钟才能阅读完成。

编者按:

本文为 TiDB Hackathon 2020 较量中 TiFlink 我的项目最新进展的介绍,应用 TiKV 和 Flink 实现了强统一的物化视图的性能。

作者张茄子,算法、分布式技术和函数式编程爱好者。集体博客:https://io-meter.com/

在本年初的 TiDB Hackathon 上,我和一众队友尝试应用 Flink 为 TiDB 添加物化视图性能,并摘得了“最佳人气奖”。能够说,物化视图在这届较量中堪称是一个热点。单单是联合 Flink 实现相干性能的队伍就有三四个。必须抵赖的是,在较量完结时咱们我的项目的完成度很低,尽管基本思路曾经定型,最终出现的后果却远没达到预期。通过半年多断断续续的修补,在明天终于能够公布一个预览版本给大家试用。这篇文章就是对咱们思路和成绩的一个介绍。
相比其余队伍,咱们的次要指标是实现强统一的物化视图构建。也就是保障查问时的物化视图能够达到靠近快照隔离(Snapshot Isolation)的隔离级别,而不是个别流解决零碎的最终一致性(Eventual Consistency)。对于实现一致性的探讨在下文有具体介绍。

应用简介

只管是一个实验性的我的项目,咱们依然摸索了一些不便实用的个性,包含:

零内部依赖:除了 TiDB 集群和 Flink 部署环境之外,无需保护任何其余组件(包含 Kafka 集群和 TiCDC)。这是因为 TiFlink 间接从 TiKV 读写数据,不通过任何中间层,为更高吞吐、更低提早和更易保护发明了可能。

易用的接口:只管为了实现强一致性 TiFlink 引进了一些新的概念,然而通过特地编写的 TiFlinkApp 接口,用户能够疾速启动一个工作,也无需手动创立写入指标表。

批流联合:工作启动后会先批量生产源表以后已有的数据,随后主动切换到 CDC 日志生产。这个过程也会确保视图的一致性。

对于 TiFlink 实用的详细信息,请参考 README。上面是疾速启动一个工作的代码片段:

TiFlinkApp.newBuilder()   .setJdbcUrl("jdbc:mysql://root@localhost:4000/test") // Please make sure the user has correct permission   .setQuery("select id,"           + "first_name,"           + "last_name,"           + "email,"           + "(select count(*) from posts where author_id = authors.id) as posts"           + "from authors")   // .setColumnNames("a", "b", "c", "d") // Override column names inferred from the query   // .setPrimaryKeys("a") // Specify the primary key columns, defaults to the first column   // .setDefaultDatabase("test") // Default TiDB database to use, defaults to that specified by JDBC URL   .setTargetTable("author_posts") // TiFlink will automatically create the table if not exist   // .setTargetTable("test", "author_posts") // It is possible to sepecify the full table path   .setParallelism(3) // Parallelism of the Flink Job   .setCheckpointInterval(1000) // Checkpoint interval in milliseconds. This interval determines data refresh rate   .setDropOldTable(true) // If TiFlink should drop old target table on start   .setForceNewTable(true) // If to throw an error if the target table already exists   .build()   .start(); // Start the app

物化视图(流解决零碎)的一致性

目前支流的物化视图(流解决)零碎次要应用最终一致性。也就是说只管最终后果会收敛到统一的状态,但在解决期间终端用户仍可能查问到一些不统一的后果。最终一致性在很多利用中被证实是足够的,那么更强的一致性是否真的须要呢?这里的一致性和 Flink 的 Exact Once 语义又有什么关系呢?有必要进行一些介绍。

ACID

ACID 是数据库的一个根本的概念。一般来说,作为 CDC 日志起源的数据库曾经保障了这四条要求。然而在应用 CDC 数据进行流式解决的时候,其中的某些束缚却有可能被毁坏。
最典型的状况是失去 Atomic 个性。这是因为在 CDC 日志中,一个事务的批改可能笼罩多条记录,流解决零碎如果以行为单位进行解决,就有可能毁坏原子性。也就是说,在后果集上进行查问的用户看到的事务是不残缺的。
一个典型的案例如下:


Change Log 与事务的原子性

在上述案例中,咱们有一个账户表,账户表之间会有转账操作,因为转账操作波及多行批改,因而往往会产生多条记录。假如咱们有如下一条 SQL 定义的物化视图,计算所有账户余额的总和:

SELECT SUM(balance) FROM ACCOUNTS;

显然,如果咱们只存在表内账户之间的转账,这个查问返回的后果应该恒为某一常数。然而因为目前个别的流解决零碎不能处理事务的原子性,这条查问产生的后果却可能是一直稳定的。实际上,在一个一直并发批改的源表上,其稳定甚至可能是无界的。

只管在最终统一的模型下,上述查问的后果在通过一段时间之后将会收敛到正确值,但没有原子性保障的物化视图依然限度的利用场景:假如我想实现一个当上述查问后果偏差过大时进行报警的工具,我就有可能会接管到很多虚伪报警。也就是说此时在数据库端并没有任何异样,数值的偏差只是来源于流解决零碎外部。
在分布式系统中,还有另一种毁坏原子性的状况,就是当一个事务批改产生的副作用散布在多个不同的节点处。如果在这时不应用 2PC 等办法进行分布式提交,则也会毁坏原子性:局部节点(分区)上的批改先于其余节点失效,从而呈现不统一。

线性一致性

不同于由单机数据库产生的 CDC 日志(如 MySQL 的 Binlog),TiDB 这类分布式数据库产生的日志会有线性一致性的问题。在咱们的场景下,线性一致性的问题能够形容为:从用户的角度先后执行的一些操作,其产生的副作用(日志)因为音讯零碎传递的提早,以不同的先后顺序被流解决零碎解决。
假如咱们有订单表(ORDERS)和付款信息表(PAYMENTS)两个表,用户必须先创立订单能力进行领取,因而下列查问的后果必然是负数:

WITH order_amount AS (SELECT SUM(amount) AS total FROM ORDERS),WITH payment_amount AS (SELECT SUM(amount) AS total FROM PAYMENTS)SELECT order_amount.total - payment_amount.totalFROM order_amount, payment_amount;

然而因为 ORDERS 表和 PAYMENTS 表在别离存储在不同的节点上,因而流解决零碎生产他们的速度可能是不统一的。也就是说,流解决零碎可能曾经看到了领取信息的记录,然而其对应的订单信息还没达到。因而就可能察看到上述查问呈现正数的后果。

在流解决零碎中,有一个 Watermark 的概念能够用来同步不同表的数据的解决进度,然而它并不能防止上述线性一致性问题。这是因为 Watermark 只要求工夫戳小于其的所有记录都曾经达到,不要求工夫戳大于其的记录都没有达到。也就是说,只管 ORDERS 表和 PAYMENTS 体现在领有雷同的 Watermark,后者依然可能会有一些先到的记录曾经失效。
由此可见,单纯依附 Watermark 自身是无奈解决线性一致性问题的,必须和源数据库的工夫产生零碎和音讯零碎配合。

更强一致性的需要

只管最终一致性在很多场景下是够用的,但其仍然存在很多问题:

  1. 误导用户:因为很多用户并不理解一致性相干的常识,或者对其存在肯定的误会,导致其依据尚未收敛的查问后果做出了决策。这种状况在大部分关系型数据库都默认较强一致性的状况下是应该防止的。
  2. 可观测性差:因为最终一致性并没有收敛工夫的保障,再思考到线性一致性问题的存在,很难对流解决零碎的提早、数据新鲜度、吞吐量等指标进行定义。比如说用户看到的 JOIN 的后果可能是表 A 以后的快照和表 B 十分钟前的快照联接的后果,此时应如何定义查问后果的提早度呢?
  3. 限度了局部需要的实现:正如上文所提到的,因为不统一的外部状态,导致某些告警需要要么无奈实现,要么须要提早期待一段时间。否则用户就不得不承受较高的误报率。

实际上,更强一致性的不足还导致了一些运维操作,特地是 DDL 类的操作难以利用之前计算好的后果。参考关系型数据库和 NoSQL 数据库的倒退历史,咱们置信目前支流的最终一致性只是受限于技术倒退的权宜之计,随着相干实践和技术钻研的提高,更强的一致性将会缓缓成为流解决零碎的支流。

技术计划简介

这里具体介绍一下 TiFlink 在技术计划上的思考,以及如何实现了强统一的物化视图(StreamSQL)保护。

TiKV 和 Flink

只管这是一个 TiDB Hackthon 我的项目,因而必然会抉择 TiDB/TiKV 相干的组件,然而在我看来 TiKV 作为物化视图零碎的两头存储计划具备很多突出的劣势:

  1. TiKV 是一个比拟成熟分布式 KV 存储,而分布式环境是下一代物化视图零碎必须要反对的场景。利用 TiKV 配套的 Java Client,咱们能够不便的对其进行操作。同时 TiDB 自身作为一个 HTAP 零碎,正好为物化视图这个需要提供了一个 Playground。
  2. TiKV 提供了基于 Percolator 模型的事务反对和 MVCC,这是 TiFlink 实现强统一流解决的根底。在下文中能够看到,TiFlink 对 TiKV 的写入次要是以接连不断的事务的模式进行的。
  3. TiKV 原生提供了对 CDC 日志输入的反对。实际上 TiCDC 组件正是利用这一个性实现的 CDC 日志导出性能。在 TiFlink 中,为了实现批流一体并简化零碎流程,咱们抉择间接调用 TiKV 的 CDC GRPC 接口,因而也放弃了 TiCDC 提供的一些个性。

咱们最后的想法原本是间接将计算性能集成进 TiKV,抉择 Flink 则是在较量过程中进一步思考后失去的论断。抉择 Flink 的次要劣势有:

  1. Flink 是目前市面上最成熟的 Stateful 流解决零碎,其对解决工作的表达能力强,反对的语义丰盛,特地是反对批流一体的 StreamSQL 实现,是咱们能够分心于摸索咱们比拟关注的性能,如强一致性等。
  2. Flink 比拟残缺的 Watermark,而咱们发现其基于 Checkpoint 实现的 Exactly Once Delivery 语义能够很不便地和 TiKV 联合来实现事务处理。实际上,Flink 本人提供的一些反对 Two Phase Commit 的 Sink 就是联合 Checkpoint 来进行提交的。
  3. Flink 的流解决(特地是 StreamSQL)自身就基于物化视图的实践,在比拟新的版本开始提供的 DynamicTable 接口,就是为了不便将内部的 Change Log 引入零碎。它曾经提供了对 INSERT、DELETE、UPDATE 等多种 CDC 操作的反对。

当然,抉择 TiKV+Flink 这样的异构架构也会引入一些问题,比方 SQL 语法的不匹配,UDF 无奈共享等问题。在 TiFlink 中,咱们以 Flink 的 SQL 零碎和 UDF 为准,将其作为 TiKV 的一个外挂零碎应用,但同时提供了不便的建表性能。
强统一的物化视图的实现思路

这一部分将介绍 TiFlink 如何在 TiDB/TiKV 的根底上实现一个比拟强的一致性级别:提早快照隔离(Stale Snapshot Isolation)。在这种隔离级别下,查问者总是查问到历史上一个统一的快照状态。在传统的快照隔离中,要求查问者在 T 工夫能且只能察看到 Commit 工夫小于 T 的所有事务。而提早快照隔离只能保障察看到 T−Δt 之前所有已提交的事务。
在 TiDB 这样反对事务的分布式数据库上实现强统一的物化视图,最简略的思路就是应用一个接一个的事务来更新视图。事务在开始时读取到的是一个统一的快照,而应用分布式事务对物化视图进行更新,自身也是一个强统一的操作,且具备 ACID 的个性,因而得以保障一致性。

应用间断事务更新物化视图

为了将 Flink 和这样的机制联合起来且实现增量保护,咱们利用了 TiKV 自身曾经提供的一些个性:

  1. TiKV 应用 Time Oracle 为所有的操作调配工夫戳,因而尽管是一个分布式系统,其产生的 CDC 日志中的事务的工夫戳实际上是有序的。
  2. TiKV 的节点(Region)能够产生连续不断的增量日志(Change Log),这些日志蕴含了事务的各种原始信息并蕴含工夫戳信息。
  3. TiKV 的增量日志会定期产生 Resolved Timestamp,申明以后 Region 不再会产生工夫戳更老的音讯。因而很适宜用来做 Watermark。
  4. TiKV 提供了分布式事务,容许咱们管制一批批改的可见性。

因而 TiFlink 的根本实现思路就是:

  1. 利用流批一体的个性,以某全局工夫戳对源表进行快照读取,此时能够取得所有源表的一个一致性视图。
  2. 切换到增量日志生产,利用 Flink 的 DynamicTable 相干接口,实现物化视图的增量保护和输入。
  3. 以肯定的节奏 Commit 批改,使得所有的批改以原子的事务形式写入指标表,从而为物化视图提供一个又一个更新视图。

以上几点的关键在于协调各个节点一起实现分布式事务,因而有必要介绍一下 TiKV 的分布式事务执行原理。

TiKV 的分布式事务

TiKV 的分布式事务基于驰名的 Percolator 模型。Percolator 模型自身要求存储层的 KV Store 有 MVCC 的反对和单行读写的原子性和乐观锁(OCC)。在此基础上它采纳以下步骤实现一次事务:

  1. 指定一个事务主键(Primary Key)和一个开始工夫戳并写入主键。
  2. 其余行在 Prewrite 时以副键(Secondary Key)的模式写入,副键会指向主键并具备上述开始工夫戳。
  3. 在所有节点 Prewrite 实现后,能够提交事务,此时应先 Commit 主键,并给定一个 Commit 工夫戳。
  4. 主键 Commit 胜利后事务实际上曾经提交胜利,但此时为了不便读取,能够多节点并发地对副键进行 Commit 并执行清理工作,之后写入的行都将变为可见。

上述分布式事务之所以可行,是因为对主键的 Commit 是原子的,散布在不同节点的副键是否提交胜利齐全依赖于主键,因而其余的读取者在读到 Prewrite 后但还没 Commit 的行时,会去查看主键是否已 Commit。读取者也会依据 Commit 工夫戳判断某一行数据是否可见。Cleanup 操作如果中途故障,在之后的读取者也能够代行。
为了实现快照隔离,Percolator 要求写入者在写入时查看并发的 Prewrite 记录,保障他们的工夫戳合乎肯定的要求能力提交事务。实质上是要求写入集重叠的事务不能同时提交。在咱们的场景中假如物化视图只有一个写入者且事务是间断的,因而无需放心这点。
在理解了 TiKV 的分布式事务原理之后,要思考的就是如何将其与 Flink 联合起来。在 TiFlink 里,咱们利用 Checkpoint 的机制来实现全局统一的事务提交。

应用 Flink 进行分布式事务提交

从下面的介绍能够看出,TiKV 的分布式事务提交能够形象为一次 2PC。Flink 自身有提供实现 2PC 的 Sink,然而并不能间接用在咱们的场景下。起因是 Percolator 模型在提交时须要有全局统一的事务开始工夫戳和提交工夫戳。而且仅仅是在 Sink 端实现 2PC 是不足以实现强统一隔离级别的:咱们还须要在 Source 端配合,使得每个事务恰好读入所需的增量日志。
侥幸的是,Flink 的 2PC 提交机制实际上是由 Checkpoint 驱动的:当 Sink 接管到 Checkpoint 申请时,会实现必要的工作以进行提交。受此启发,咱们能够实现一对 Source 和 Sink,让他们应用 Checkpoint 的 ID 共享 Transaction 的信息,并配合 Checkpoint 的过程实现 2PC。而为了使不同节点能够对事务的信息(工夫戳,主键)等达成统一,须要引入一个全局协调器。事务和全局协调器的接口定义如下:

public interface Transaction {public enum Status {    NEW,    PREWRITE,    COMMITTED,    ABORTED;};  long getCheckpointId();  long getStartTs();  default long getCommitTs();  default byte[] getPrimaryKey();  default Status getStatus();}public interface Coordinator extends AutoCloseable, Serializable {Transaction openTransaction(long checkpointId);  Transaction prewriteTransaction(long checkpointId, long tableId);  Transaction commitTransaction(long checkpointId);  Transaction abortTransaction(long checkpointId);}

应用上述接口,各个 Source 和 Sink 节点能够应用 CheckpointID 开启事务或取得事务 ID,协调器会负责调配主键并保护事务的状态。为了不便起见,事务 Commit 时对主键的提交操作也放在协调器中执行。协调器的实现有很多办法,目前 TiFlink 应用最简略的实现:在 JobManager 所在过程中启动一个 GRPC 服务。基于 TiKV 的 PD(ETCD)或 TiKV 自身实现分布式的协调器也是可能的。


事务与 Checkpoint 的协调执行

上图展现了在 Flink 中执行分布式事务和 Checkpoint 之间的协调关系。一次事务的具体过程如下:

  1. Source 先从 TiKV 接管到增量日志,将他们依照工夫戳 Cache 起来,期待事务的开始。
  2. 当 Checkpoint 过程开始时,Source 会先接管到信号。在 Source 端的 Checkpoint 与日志接管服务运行在不同的线程中。
  3. Checkpoint 线程先通过全局协调器取得以后事务的信息(或开启一个新事务),分布式状况下一个 CheckpointID 对应的事务只会开启一次。
  4. 失去事务的开始工夫戳后,Source 节点开始将 Cache 中小于此工夫戳的已提交批改 Emit 到上游计算节点进行生产。此时 Source 节点也会 Emit 一些 Watermark。
  5. 当所有 Source 节点实现上述操作后,Checkpoint 在 Source 节点胜利实现,尔后会向后持续流传,依据 Flink 的机制,Checkpoint 在每个节点都会保障其达到之前的所有 Event 都已被生产。
  6. 当 Checkpoint 达到 Sink 时,之前流传到 Sink 的 Event 都曾经被 Prewrite 过了,此时能够开始事务的提交过程。Sink 在外部状态中长久化事务的信息,以便于谬误时复原,在所有 Sink 节点实现此操作后,会在回调中调用协调器的 Commit 办法从而提交事务。
  7. 提交事务后,Sink 会启动线程进行 Secondary Key 的清理工作,同时开启一个新的事务。

留神到,在第一个 Checkpoint 开始前,Sink 可能曾经开始接管到写入的数据了,而此时它还没有事务的信息。为了解决这一问题,TiFlink 在工作开始时会间接启动一个初始事务,其对应的 CheckpointID 是 0,用于提交最后的一些写入。这样的话,在 CheckpointID=1 的 Checkpoint 实现时,实际上提交的是这个 0 事务。事务和 Checkpoint 以这样的一种错位的形式协调执行。
下图展现了蕴含协调器在内的整个 TiFlink 工作的架构:

TiFlink 的零碎架构

基于以上的零碎设计,咱们就失去了一个在 TiKV 上实现提早快照隔离的物化视图。

其余设计思考

家喻户晓,KSQL 是 Flink 之外另一个风行的流解决零碎,它间接与 Kafka 音讯队列零碎联合,用户无需部署两套解决零碎,因而受到一些用户的青眼。很多用户也应用 KSQL 实现相似物化视图这样的需要。然而在我看来,这种强耦合于音讯队列的流解决零碎并不适宜物化视图的应用场景。
KSQL 能够说是 Log Oriented 数据处理系统的的代表,在这种零碎中,数据的根源在于日志信息,所有的表都是为了不便查问而生产日志信息从而构建进去的视图。这种零碎具备模型简略、容易实现、能够长时间保留日志记录等长处。
与之绝对是 Table Oriented 数据处理系统,MySQL、TiDB/TiKV 都属于这一类零碎。这一类零碎的所有批改操作都作用于表数据结构,尽管期间也会有日志生成,但往往对表数据结构和日志的批改是一起协调进行的。这里日志的次要是为长久化和事务服务,往往不会留存太长时间。相比于 Log Oriented 数据处理系统,这类系统对写入和事务的解决都更为简单一点,然而却领有更强可扩展性的要求。
归根结底,这是因为 Log Oriented 零碎中的数据是以日志的模式存储,因而在扩大时往往须要进行老本较高的 Rehash,也更难实现再均衡。而 Table Oriented 的零碎,数据次要以表的模式存储,因而能够以某些列进行有序排列,从而不便在一致性 Hash 的反对下实现 Range 的切分、合并和再均衡。
集体认为,在批流一体的物化视图场景下,长时间保留日志并无太大的意义(因为总是能够从源表的快照复原数据)。相同,随着业务的倒退一直扩大数据处理工作和视图是一件比拟重要的事。从这个角度来看 Table Oriented 零碎仿佛更适宜作为物化视图需要的存储承载介质。
当然,在实时生产增量 Log 时产生的分区合并或决裂是一个比拟难解决的问题。TiKV 在这种状况下会抛出一个 GRPC 谬误。TiFlink 目前应用的是比较简单的动态映射办法解决工作和分区之间的关系,在将来能够思考更为正当的解决方案。

总结

本文介绍了应用 Flink 在 TiKV 上实现强统一的物化视图的基本原理。以上原理曾经基本上在 TiFlink 零碎中实现,欢送各位读者试用。以上所有的探讨都基于 Flink 的最终统一模型的保障,即:流计算的后果只与生产的 Event 和他们在本人流中的程序无关,与他们达到零碎的程序以及不同流之间的绝对程序无关。

目前的 TiFlink 零碎还有很多值得进步的点,如:

  • 反对非 Integer 型主键和联结主键
  • 更好的 TiKV Region 到 Flink 工作的映射
  • 更好的 Fault Tolerance 和工作中断时 TiKV 事务的清理工作
  • 欠缺的单元测试

如果各位读者对 TiFlink 感兴趣的话,欢送试用并提出反馈意见,如果可能奉献代码帮忙欠缺这个零碎那就再好不过了。

对于物化视图零碎一致性的思考是我往年最次要的播种之一。实际上,最后咱们并没有器重这一方面,而是在一直地交换当中才意识到这是一个有价值且很有挑战性的问题。通过 TiFlink 的实现,能够说是基本上验证了上述办法实现提早快照一致性的可行性。当然,因为集体的能力程度无限,如果存在什么纰漏,也欢送各位提出探讨。

最初,如果咱们假如上述提早快照一致性的阐述是正确的,那么实现真正的快照隔离的办法也就跃然纸上。不晓得各位读者是否想到呢?

正文完
 0