乐趣区

关于数据库:基于-TiSpark-的海量数据批量处理技术

作者介绍:杨哲轩,PingCAP 资深解决方案架构师。

相熟 TiSpark 的人都晓得,TiSpark 是 Spark 的一个插件,它其实就是给予了 Spark 可能去拜访 TiDB 底层分布式存储引擎 TiKV 或者 TiFlash 的能力。之前咱们始终在解决读的问题,写问题并没有付出太多的工夫去解决。明天就给大家揭秘,咱们是怎么应用 TiSpark 去实现海量数据批处理,而后写入到 TiDB 外面去的。

传统批处理 vs TiSpark 的批处理

在介绍 TiSpark 之前,咱们首先来回顾一下传统批处理架构。

对于传统批处理架构而言,首先要有一个数据,这个数据能够是用户的 CSV 文件,也能够是用户从 TiDB 或 MySQL,或者是其它异构数据库外面读出来的数据。在拿到这些数据之后,它首先须要做的是工作切分,对于每一个批次的工作,每一个小批的工作,别离去进行数据处理,而后再进行分批提交,最初再去写入到 TiDB 外面。
相熟数据库的人可能都晓得,这一套架构有一个致命的问题:就是它没有方法通过数据库来保障事务的 ACID 个性。传统的批处理架构,都须要引入一些工作表的机制,来追踪每一个子工作的胜利状态。如果说子工作表中有一个状态是失败的,那可能就须要把整个工作全副回滚。甚至在一些状况下,都须要人工去染指。对于 TiSpark 来说,则不须要这样。

TiSpark 拿到读取结束的数据当前,首先把它当做一个整体去进行数据处理,毋庸分片分批解决。数据处理之后造成的新数据,是间接通过两阶段协定,并发的写入到 TiKV 里,不通过 TiDB Server。如果拿 TiSpark 的批处理技术和传统批处理架构来进行比照,会发现传统批处理架构有着两个致命的缺点。第一点是慢,在一些商业银行,它的日中批处理工作,都是有肯定的时效性的。如果说你的处理速度特地慢,是会影响到第二天的停业。第二,传统批处理架构也是没有方法可能保障事务。为了解决事务的问题须要引入很多机制,业务侧去做这种事务的保障,会特地的简单,难用,也会影响到整体的解决和写读的速度。

TiSpark 解读

架构

接下来大家可能对 TiSpark 的整体架构会比拟感兴趣,下图应该就说的比较清楚了。

首先,左侧蓝色局部是 TiDB 的分布式存储引擎,包含 TiKV、TiFlash;粉色局部是 PD;右侧绿色局部是 TiDB Server;上方黄色局部是 Spark 集群。

在一个工作提交到 TiSpark,TiSpark 在解决完数据之后,开始写入数据之前,会先进行一个锁表的解决。锁表的意义是避免其它的事务与 TiSpark 正在写入的事务发生冲突,导致 TiSpark 的事务进行回滚。大家晓得,TiSpark 批处理它所波及到的数据量都会特地大,可能是成千万,甚至上亿的数据量,如果因为这样而回滚是咱们不想看到的事件,所以咱们须要事后做一个锁表。

这里须要强调一下,这个锁表只针对于 3.0.14 以上的版本。 在 4.0 版本中 TiDB 曾经原生反对了 10GB 的大事务,它对事务的协定做了肯定的批改,这也意味着如果 TiSpark 可能兼容这种协定上的批改,是能够不须要去锁表的。

第二步就是 TiSpark 会对它将要写入的数据去定型、统计、抽样、计算,算进去它这一次批量的写入,大略会生成多少个新的 Region,而后把这些信息传递给 TiDB,由 TiDB 跟其它的组件去进行交互,把新生成的 Region 事后切分进去。Region 预切分的益处:

  • 第一,是躲避了热点问题。
  • 第二,如果说在 TiSpark 写入过程中,因为 Region 产生决裂,可能会导致一些写入性能的降级,通过这种形式,就可能无效的去躲避。

另外,TiSpark 在写入过程中,也会跟 PD 去进行互动,这个互动次要是两个方面。第一个方面,是一些元信息。要晓得 TiKV 底层数据是一个键值对,TiSpark 在写入之前也会把所有的行数据,转换成为键值对。既然是一个键值对,就须要晓得我这个键值对须要去哪一个 Region,这就是 Region 具体地址的获取。另外一方面,TiSpark 在写入也是保障事务的,它须要向 PD 申请一个工夫戳。不相熟 TiDB 的同学,能够简略把这个工夫戳了解为事务的 ID 号,接下来就非常简单了,筹备工作都曾经做完,TiSpark 会间接把它生成的键值对,通过 Spark Worker 去并发的多对多的写入到 TiKV 外面。

原理

接下来讲一讲 TiSpark 的原理。原理能够具体分为两大块。

第一块是 TiSpark 实现了一个 Java 版本的 TiKV 客户端。 这个客户端的性能是比拟多,也比拟丰盛,齐全能够独自剥离,而后拿去给用 Java 实现的业务去应用,就是跟 TiKV 去进行交互。首先,它实现了 Coprocessor 的接口。这也意味着它能够跟 TiKV 或者是 TiFlash 去进行互动,能够把一些合计进行下推,比如说 Limit、Order,或者是聚合等等。它也会做一些谓词、索引、键值域的解决。比方我有一个查问,它用了索引当前,或者说用了主键当前,它的查问范畴可能是 10 到 100,如果我还持续用全表查的话,速度会特地慢。所以,这时全表查会被优化成为 10 到 100 的范畴查。

另外,Java 版本的 TiKV 客户端也实现了两阶段的协定。这也是 TiSpark 可能保障写入合乎 ACID 的外围性能。简略来说,这个协定在 TiDB 那边也有一样的实现。只不过它是用 Golang 实现的。咱们所做的工作,就是把这个协定用 Java 从新实现了一遍。此外,这个客户端也会去保护一些统计信息,索引信息。这样的益处就是在 Spark 做执行打算的时候,可能无效的利用到这些信息,去抉择一条更优的执行门路。方才提到的 Java 版本的 TiKV 的客户端,只是通知你,你能够通过这个去跟 TiKV、TiFlash 去进行交互,然而,并没有解决另外一个很要害的问题,就是我怎么样把这个货色通知 Spark。那这个问题的答案就是 TiSpark。

第二块是 TiSpark 利用了 Spark 的 Extensions Point。 咱们之所以抉择 Spark Extensions Point 作为一个入口是因为这样做能够缩小保护老本,咱们没有必要独自去保护一套 Spark 的代码。假如说咱们当初抉择的是保护 Spark 代码,当初去实现拜访 TiKV 或是 TiFlash 的逻辑,那这意味着咱们势必要跟骨干进行分叉。相熟开发的同学可能都晓得,如果你跟骨干离的太远,骨干的后续更新你是很难再合回来的。所以基于这种考量,咱们过后是采纳了 Spark Extensions Point 作为计划。这个计划除了下面说的这个益处以外,还有别的益处吗?答案是有的。它能够劫持 Spark Catalyst 的优化器,可能将怎么样去拜访 TiKV 或者是 TiFlash 的逻辑注入到 Spark 的执行打算,或者是去进行一些相应的改写。

并且 TiSpark 无论是单表写入,还是多表写入,它都是可能保障事务的 ACID 个性的。只不过单表写入,它是齐全兼容 Spark DataSource API。因为 Spark 外面,DataFrame 就是一个单表的概念,如果说你想要去做多表的写入的话,你须要应用的是 TiSpark 额定保护的接口(后文会举例介绍)。咱们可能保障不论是 TiSpark 当前更新多少个版本,这个接口是不会变的。

交融

方才是讲了一些原理,可能大家会有一个疑难,你这个货色很好,那它怎么样可能跟现有的分布式业务零碎去交融呢?答案是,它是能够交融的。

举一个简略的例子。咱们当初有一个分布式业务零碎,它分为三个局部。第一局部是有一个服务利用框架;第二局部是有一个异步工作利用框架;而后接下来就是批量利用框架。

服务利用框架是面向业务开发人员的,业务开发人员去实现这个批工作的逻辑,而后提交给服务利用框架,服务利用框架再把这个工作提交给异步工作框架去进行调度,最终把它放到咱们的批量利用框架外面去真正的执行。有了 TiSpark 当前,TiSpark 能够比拟好的融入到咱们的批量利用框架外面,方才说的那个流程,只不过是异步工作利用框架在提交给批量利用框架之后,整个的执行门路是由 TiSpark 来管制。就是 TiSpark 来去管制它的整体的调度和解决,而不是由原来的批量调度框架,或者是批量解决框架去进行调度或者是解决。

利用

方才始终在说原理和交融,上面就是讲一讲,TiSpark 到底是怎么样能够利用在这个批工作上?我集体认为,批工作最重要的其实是数据处理,在 TiSpark 外面,数据处理是能够通过 Data Frame 中的接口来实现的。当然如果不那么相熟 Data Frame 接口的同学,也能够采纳 Spark SQL 的形式来实现。下图是一个比较简单的例子:

比如说我有一张用户表,这个用户表外面有它的贷款和利率,而后我想依据这个贷款和利率去计算当月所须要还的利息。那其实很简略,你能够通过 DataFrame 中的接口,通过列的名字去定位到它的贷款,和它的利率,而后通过一个简略的算术运算,加减乘除,把它的当月所需还的利息算进去。而后最终是通过 DF 的另外一个接口 withColumn,把它新重建一个列,这个列的名字就叫做 toBeDeducted。而后就会生成一个新的 DataFrame。这个新的 DataFrame 与原来的 DataFrame 惟一的区别就是它多了一列,叫做 toBeDeducted。

接下来咱们能够把这个 toBededucted 跟它原来的余额去进行减法操作,减法操作结束之后,这个新的余额,就是通过批量工作当前真正的余额。此时能够看到方才算进去的 toBeDeducted 是一个冗余列,咱们是能够把它看作一个抛弃操作。实际上也很简略,你能够通过一个 job 去解决。另外,方才这个计算应该是比较简单,那同时,用这个 DatoFrame 的接口,也能够去实现一些比较复杂的批处理逻辑。

举个例子,信用卡的积分我的项目。一个信用卡积分的计算,它可能有三张表,别离是积分信息、生产信息、规定信息。积分信息是用户以后的积分;生产信息就是每个月的生产金额;规定信息是我在不同的商户外面,他的生产的返比是不一样的。可能在珠宝类的商户外面,它的返比是 1:2,也就说,1 块钱等于 2 积分。那咱们能够把这三张表,在 Spark 外面进行 join,而后生成一个新的 DataFrame。而后 DataFrame 再通过相应的列的名字,去进行一些算术计算,比方把生产金额乘以规定信息外面的系数,而后把这个批工作去执行结束。执行结束当前,能够去依照不同的表的构造,去对 DataFrame 进行相应的解决和操作。最终,通过咱们的写入接口,可能又快又好的写入到 TiDB 外面。这里要强调的就是,TiSpark 的写入是间接去写 TiKV,不通过 TiDB 的,它绕过了 TiDB。

而后讲一下单表写入。单表写入就比较简单,它是齐全兼容了 Spark DataSource 的 API,应用上也是十分不便的。如果说你之前没有 Spark DataSource 的教训,我感觉学习上也肯定会特地的快,因为 Spark 它也兼容 Java 语言,而后你也能够用 Java 的接口去实现整个 DF 的写入。对于写入我想强调两点,第一点就是 format 和 options。format 是因为 Spark 外面的 format 有很多种。比如说,如果你想用 JDBC 写,那你可能就要用 JDBC。如果你的写入对象是一个 Parquet,那你可能须要用 parquet。那这边因为咱们的写入对象是 TiDB 外面的 TiKV,那咱们这边就要用 TiDB 这个字符串。而后这个 options 外面其实就是一个 TiDB 的 options。这外面保护的是 TiDB server 的一些相干信息。包含了它的 IT 地址、端口、用户名、明码等等。因为后面也提到了,咱们的 TiSpark 在写入的时候,也是须要跟 TiDB 去进行交互的,有了这些信息,它就能够比拟好的去跟 TiDB 进行交互。

上面讲一下多表写入。方才的单表写入,可能齐全兼容 DataSource API,是因为在 Spark 外面,DataFrame 就是一个单表的概念。那如果说你想再写入多表,如果多表写入也可能保障 ACID 的个性的话,是没有方法去持续应用 Spark DataSoucre 的 API,你必须去应用 TiSpark 提供的接口。这边它有一个 DBTable 和 DataFrame 的映射关系。就是一张表你是须要通过它的库名,它的表名,来对应到它具体的 DataFrame,而后把它放在一个 Map 外面。假如你当初有三张表须要去同时写入的话,那它这个 Map 外面的元素应该是三个。接下来就是进入到咱们 TiBatchWrite.write 这外面的接口,它这外面就会有一个 data 和 TiDB options。TiDB options 我在单表写入的时候,也曾经具体介绍过了,这边就不再赘述。

相熟 Spark 的同学,可能会问一个问题,因为 Spark 中的 DataFrame 是一个单表的概念,如果说你做一个合并,很可能会有表构造不兼容的问题。我是感觉这个问题十分好。然而,你认真想一想,在 TiKV 外面,它这个数据格式是什么?是一个一个的键值对,那其实在咱们反对多表写入的时候,后面的逻辑都是独自的,只有在 DataFrame 转换成为了键值对当前,咱们才会去把它合并。

举个例子,假如说我当初有一张表它是有 100 行数据,另外一张表是 200 行数据,转化成为键值对当前,能够因为有索引,组件等,扩张了两倍,就是 200 行变成 400 个键值对,100 行变成了 200 个键值对。合并完了当前,它是 600 个键值对。在合并完之后,我才去做两阶段协定的提交。因为两阶段协定的提交可能保障你这 600 个键值对的提交,要么是胜利,要么是失败的。这也意味着,如果说我的 600 个键值对提交胜利了,我的两张表写入是胜利的,如果它没有胜利,那么咱们两张表的写入是失败的。

所以说,通过咱们的接口写入的多表的写入,也是合乎 ACID 个性的。

另外一个大家可能会比拟好奇的就是我有一个工作提交到了 TiSpark 外面,我有没有方法去看的到它这个工作的进度?答案是能够的。下图是我在提交了一个 4 百万行的数据写入的一个截图。

从图中能够看到,差不多靠近 5 分钟左右应该就能写完。在 0 到 6 job ID 这边,其实做的都是筹备工作。7 到 10 做的是两阶段提交外面选一个主键 (在两阶段协定提交过程中保障事务的原子性) 的步骤。而后 job ID 11 是真正的写入的工作。通过这个监控,能够比较清楚的看到,目前这个批处理工作以后正在解决什么。

长处

其实回过头来看,咱们这个 TiSpark 的批处理,有着什么样的长处?

第一个长处就是快,快,快。重要的事件说三遍,因为它真的特地快。 快的起因是什么?是因为 TiSpark 绕过了 TiDB,能够多对多的并行写入 TiKV。这意味着什么?意味着它能够程度横向扩大。如果我的资源限度在 TiKV 那边,我能够简略加一个 TiKV 节点,就是扩大它的磁盘资源。如果我的瓶颈在 Spark 这边,我能够加一个 Spark 的计算节点。通过这种形式,能够又快又好的把这个数据写到 TiKV 外面。

第二个长处,是配置比较简单。相熟 spring batch 的同学可能都晓得,spring batch 须要配置一大堆的 Item reader 和 Iitem writer,特地简单,特地难用。 对于 TiSpark 来说,你可能惟一须要配置的就是通知 Spark 怎么样去用这个 TiSpark。而且 TiSpark 所有的批处理逻辑,基本上 99% 都是兼容 Spark 的 DataSource API。只有你相熟了 DataSource API,包含 DataFrame API,那你的批处理逻辑的书写、写入逻辑的书写,都会十分的不便。

第三个长处是不仅快,它还能保障事务。 就是写入要么胜利,要么失败。小孩子才做抉择,成年人全都要,就是又快又可能保障事务。目前 TiSpark 的写入性能能够在 8 分钟内写完 6000 万行 TPCH 的 lineitem 的数据。

如果大家对 TiSpark 批处理计划有趣味的话,也欢送邮件与我分割(邮箱:<yangzhexuan@pingcap.com>),咱们一起看看怎么样可能把它融入到现有的分布式业务零碎,更好的为用户带来价值、谋福利。

退出移动版