关于数据库:Tapdata-肖贝贝实时数据引擎系列一-新鲜的数据流

8次阅读

共计 4888 个字符,预计需要花费 13 分钟才能阅读完成。

前言

2006 年诞生的 hadoop 和 她周边的生态, 在过来的这些年里为大数据的炽热提供了足够的能量, 十几年过来了, 场景在变动, 技术在演变, 大家对数据的认知曾经不再局限于 T+1 与 高吞吐高提早 为次要特色的上一代框架理念, 在实在的场景里, 实时, 精确, 多变 的数据也施展着越来越重要的作用

为满足这些新的需要, 各种框架和中间件如雨后春笋般一直涌出

hive 的呈现让这头大象有了一个粗劣但僵滞的面庞, hbase 与 impala 开始尝试将其提速, spark/flink 作为新的流解决框架, 尝试通过实时计算的形式, 将数据更快地输送到业务方面前, presto/dremio 从数据模型动手, 尝试通过虚拟化实时汇合来自不同数据源的数据, 变相达到实时的目标, 而各种新型的 OLAP 数据库, 以 clickhouse 为代表, 试图提供近实时的海量数据统计分析计划, 在不同的细分畛域, 比方 时序 / 特色 等畛域, 也各自涌现了富裕特色的产品进去

与传统的商业软件倒退形式不同, 这个实时数据相干的赛道中, 开源曾经逐步成为不谋而合的抉择, talk is cheap, show me the code, 大家各凭本事谈话

而根底框架就像是可爱的姑娘, 每个人都感觉本人的才是最好的, TAPDATA 在实时数据计划的落地过程中, 也逐步感觉到了现有的各种技术产品总是在什么中央差点货色, 一个个场景做下来, 一个个客户谈下来, 去实现一个属于本人的流计算框架的想法在脑海中越来越明确

在给客户产生间接价值的同时, 把这些教训累积起来, 去做一个能够影响更多人的技术产品, 可能是一件更有意思的事件

为此, 我前几天登录了好久没用的知乎账号, 在这个人均百万的平台下, 开始了这个系列的分享, 去把 TAPDATA 对于实时计算引擎的一些思考整顿成文字, 大家看了如果感觉有用, 能够默默珍藏, 如果感觉哪里写得不对, 能够评论或者私信我, 如果感觉这个货色方向有问题, 或者说就是一些没有价值的垃圾, 也欢送揭示我, 咱们共同进步

陈腐的, 才是最好的

实现一个实时的数据计算, 第一步是数据起源怎么获得, 基于 JDBC 或者各个数据库驱动的 Query, 能够很不便拿到批量的数据, 然而更实时的数据拿起来, 就不是那么的不言而喻和标准化

实时数据的获取, 有一个名词叫 CDC, 全称是 change data capture, 能够想见一个场景如果有一个专门的名词缩写来形容, 个别都不会很简略

CDC 的实现个别有以下几种形式

轮询

最间接的想法是通过 Query, 定期轮询最新的数据, 这么做的益处是简直全副的数据库都能够间接反对, 开发起来老本也低, 然而问题也很显著, 次要有:

轮询须要有条件, 这个条件个别是递增字段, 或者工夫属性, 对业务上有 入侵
最小 延时 为轮询距离
轮询对数据库造成了额定的查问 压力
最致命的是, 轮询 无奈获取被删除的数据, 也无奈得悉更新的数据更新了哪些内容, 这些尽管在工程上能够通过各种伎俩去找一个折衷方案, 但终究会存在各种各样的问题
因为实现容易, 轮询是最早也是目前最宽泛被利用于理论场景的计划, 然而也因为毛病很多, 在最近呈现的各种计算框架中, 轮询个别作为保底而不是首选计划呈现

触发器

不少数据库都有触发器(Trigger) 的设计, 在对数据行列进行读写时, 能够触发一个存储过程, 实现一系列的操作, 基于这个前提, 能够对数据库的写操作编写一个自定义触发器, 实现数据获取, 常见的计划有:

  1. 数据触发保留到独自的一张表, 典型的产品化实现有 SQL Server, 其余的数据库也能够本人实现相似的逻辑, 而后通过轮询这张表取得变更
  2. 数据触发到内部音讯队列, 消费者通过音讯队列获取数据
  3. 通过 api 间接发送到指标端

    相比轮询, 触发器能够更全面地获取更具体的实时数据, 不过问题也有很多, 次要是的问题有:

没有规范: 用户须要依据每种数据库的触发器去设计本人的数据获取计划

  1. 通用性不够: 局部数据库没有触发器设计
  2. 影响性能: 触发器在数据写入的时候, 在数据处理逻辑里减少了一段逻辑, 尽管有些触发器的设计是异步的, 不影响延时, 然而因为占用了数据库自身的计算资源, 对吞吐有一些影响
  3. 相比轮询, 触发器子计划在延时和数据准确性上有了一些冲破, 是一种计划的提高

数据库日志

绝大数据数据库都有各种各样的日志, 其中一种日志用来记录每个操作产生的数据变更, 很多数据库都用这份日志来做多正本同步, 或者用来做数据恢复

而内部服务也能够通过这种形式拿到最新的实时变更, 相比轮询, 通过日志拿到的数据延时个别在亚秒内, 而且对数据库的性能影响非常低, 同时反对的数据库类型相比触发器更多, 只有存在正本, 就存在相似的日志设计

因为基于数据库日志的计划具备其余两种计划不可比较的劣势, 曾经逐步成为实时计算框架首选的数据获取计划, 然而这种计划因为应用了数据库外部的设计, 开发难度和实现老本是最高的, 这个也限度了计划的应用

音讯队列

除此之外, 还有一些来自利用的音讯, 或者一些其余的业务自定义数据, 大多数都通过各种音讯队列来直达, 典型的有 kafka 和 各种名字的 MQ, 因为更多是业务定制在外面, 这里各家都有各家的场景, 对立来做是比拟艰难的

数据库日志的难题

在之前提到的各种 CDC 计划中, 数据库日志具备非常明显的后果劣势, 然而因为开发艰难, 目前利用范畴也不是特地宽泛, 数据库日志计划的问题次要有以下几种

数据库品种繁多

数据库日志属于数据库外部实现逻辑, 除了特意为兼容去设计之外, 很少有雷同或者类似的对外接口, 不论是从 API, 还是日志格局上来说, 根本是各家有各家的做法, 对流计算框架来说, 适配起来要一个个做, 没有捷径能够走, 老本很高

以后市面上用的比拟多的数据库少说有几十种, 如果想笼罩全, 大略有两百种左右的适配工作量, 放眼看去目前并没有哪个开源或者闭源的计划, 在这方面做得比拟全面, 除了开源数据库之外, 还有一些商业数据库, 比方 db2, gaussdb, hana, 文档的缺失, 开源计划的缺失, 导致这些计划实现起来很麻烦

不兼容的版本

即便是同一种数据库, 不同的版本之间也往往有不兼容的状况, 极少有数据库能够在一个正本内运行不同的大版本, 比方 oracle 的 8 到 20 之间的版本, mongodb 的 2 到 5 之间的版本, 会存在很多细节和设计的不同

数据库品种曾经很多, 加上版本的不兼容, 要残缺解决这些场景, 适配的数量一下子减少到五百种以上, 艰难成倍晋升

部署架构多种多样

第三种多样性来自于部署架构, 即便是同一个数据库的同一个版本, 也存在各种各样的部署架构, 比方对 Mysql, 有包含 PXC, Myshard, Mycat 在内的各种集群计划, PG 也有 GP, XL, XC, Citus 在内的各种计划, oracle 有 DG, RAC, mongodb 有 正本, 分片

这些多样性与前几种互相组合, 最初的残缺的工作量曾经达到简直人力不可为的水平

不规范的格局

如果说多样性只是工作量上的问题, 数据库日志的一些设计, 则从理念上造成了一些艰难

因为数据库的日志更多是为了主从同步设计, 次要是保证数据的最终统一, 这个与实时计算的场景需要存在一些差别, 比方咱们以 MongoDB 的一个删除日志来做示例

rs0:PRIMARY> use mock 
switched to db mock 
rs0:PRIMARY> db.t.insert({a:1, b:1}) 
WriteResult({"nInserted" : 1}) 
rs0:PRIMARY> db.t.remove({}) 
WriteResult({"nRemoved" : 1}) 
rs0:PRIMARY> use local
switched to db local 
rs0:PRIMARY> db.oplog.rs.find({ns:"mock.t"}).pretty() 
{ 
        "op" : "i", 
        "ns" : "mock.t", 
        "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), 
        "o" : {"_id" : ObjectId("610eba317d24f05b0e9fdb3b"), 
                "a" : 1, 
                "b" : 1 
        }, 
        "ts" : Timestamp(1628355121, 2), 
        "t" : NumberLong(1), 
        "wall" : ISODate("2021-08-07T16:52:01.890Z"), 
        "v" : NumberLong(2) 
} 
{ 
        "op" : "d", 
        "ns" : "mock.t", 
        "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), 
        "o" : {"_id" : ObjectId("610eba317d24f05b0e9fdb3b") 
        }, 
        "ts" : Timestamp(1628355126, 1), 
        "t" : NumberLong(1), 
        "wall" : ISODate("2021-08-07T16:52:06.191Z"), 
        "v" : NumberLong(2) 
}

插入一条数据, 将其删除, 查问一下数据库日志, 关注删除那条记录, 外面只记录将主键删除的信息, 并无奈失去原始字段的值

实时计算一个比拟典型的场景是多表 JOIN, 如果咱们以 a 为字段进行 JOIN, 来自数据源为 MongoDB 的实时流因为无奈拿到被删除的数据中 a 字段的值是多少, 这个会导致实时的 JOIN 无奈获取最新的后果

为了实现残缺的流计算的需要, 只保证数据同步一致性的日志是不足够的, 咱们往往须要残缺的数据库变更数据

一些现存的解决方案

尽管数据库日志有着各种各样的问题, 然而因为其过于显著的劣势, 越来越成为实时流框架的当红炸子鸡选型, 那下面的问题, 也逐步有理解法

针对实现工作量的问题, 当初呈现了三种流派 :

一个是专精派, 每个计划只解决一个数据库, 或者只专一解决一个数据库, 比方 oracle 的 ogg, mysql 的 canal, 都专一在本人的畛域去做到很高的深度

一个是容纳万象派, 典型的有 debezium, 通过插件的模式去兼容各个数据库的规范

最初一个是交融派, 他们本人不做实现, 只是将来自一和二的计划再通过一次形象, 做成交融的一个解决方案 (没错的, 说的就是 https://github.com/ververica/…)
而针对数据日志不规范的问题, 在技术上个别是通过一个残缺数据的缓存层来实现日志的二次加工, 尽管在性能上实现了较好的补充, 然而因为残缺保留了数据, 资源耗费也比拟高, 而且目前没有看到对立的产品呈现, 更多是停留在一些场景里做计划补充

TAPDATA 的解决方案

在咱们的计划里, 是依照 容纳万象 + 必要的数据缓存 联合的形式去解决的这个问题

相比与 debezium, 咱们在性能上做了大量的优化, 在 解析速度上有数倍晋升, 同时, 反对的数据库品种曾经扩大到 三十种以上

对数据库日志不规范的问题, 也实现了必要的存储形象, 一个典型的用法如下:

CacheConfig cacheConfig = TapCache.config("source-cache"). 
        .setSize("1g") 
        .setTtl("3d"); 
 
DataSource<Record> source = TapSources.mongodb("mongodb-source") 
        .setHost("127.0.0.1") 
        .setPort(27017) 
        .setUser("root") 
        .setPassword("xxx") 
        .withCdc() 
        .formatCdc(cacheConfig) 
        .build()

来构建一个残缺的实时数据流, 其中流出的数据, 蕴含了残缺的 全量 + 增量数据, 并应用了内存缓存对增量日志做了规整化

对上游来讲, 这就是陈腐的, 实时的数据流了

留一个小问题

仔细的敌人曾经曾经发现了, 这里的数据蕴含了全量与增量, 然而咱们的数据格式, 并没有像 flink 或者 hazelcast jet 这些通用的做法一样, 分成了 BatchSource, Record, ChangeRecord 这些类别, 是出于什么思考呢?

关注咱们 (https://tapdata.net), 关注我, 带给你最新的实时计算引擎的思考。
关注 Tapdata 微信公众号, 带给你最新的实时计算引擎的思考。本文作者为 tapdata 技术合伙人 肖贝贝,更多技术博客:https://tapdata.net/blog.html

正文完
 0