如何基于日志同步实现数据的一致性和实时抽取

42次阅读

共计 8092 个字符,预计需要花费 21 分钟才能阅读完成。

一、背景

事情是从公司前段时间的需求说起,大家知道宜信是一家金融科技公司,我们的很多数据与标准互联网企业不同,大致来说就是:

玩数据的人都知道数据是非常有价值的,然后这些数据是保存在各个系统的数据库中,如何让需要数据的使用方得到一致性、实时的数据呢?

过去的通用做法有几种,分别是:

  • DBA 开放各个系统的备库,在业务低峰期(比如夜间),使用方各自抽取所需数据。由于抽取时间不同,各个数据使用方数据不一致,数据发生冲突,而且重复抽取,相信不少 DBA 很头疼这个事情。
  • 公司统一的大数据平台,通过 Sqoop 在业务低峰期到各个系统统一抽取数据,并保存到 Hive 表中, 然后为其他数据使用方提供数据服务。这种做法解决了一致性问题,但时效性差,基本是 T + 1 的时效。
  • 基于 trigger 的方式获取增量变更,主要问题是业务方侵入性大,而且 trigger 也带来性能损失。

这些方案都不算完美。我们在了解和考虑了不同实现方式后,最后借鉴了 linkedin 的思想,认为要想同时解决数据一致性和实时性,比较合理的方法应该是来自于 log。

(此图来自:https://www.confluent.io/blog…)

把增量的 Log 作为一切系统的基础。后续的数据使用方,通过订阅 kafka 来消费 log。

比如:

  • 大数据的使用方可以将数据保存到 Hive 表或者 Parquet 文件给 Hive 或 Spark 查询;
  • 提供搜索服务的使用方可以保存到 Elasticsearch 或 HBase 中;
  • 提供缓存服务的使用方可以将日志缓存到 Redis 或 alluxio 中;
  • 数据同步的使用方可以将数据保存到自己的数据库中;
  • 由于 kafka 的日志是可以重复消费的,并且缓存一段时间,各个使用方可以通过消费 kafka 的日志来达到既能保持与数据库的一致性,也能保证实时性;

为什么使用 log 和 kafka 作为基础,而不使用 Sqoop 进行抽取呢?因为:

为什么不使用 dual write(双写)呢?,请参考 https://www.confluent.io/blog…

这里就不多做解释了。

二、总体架构

于是我们提出了构建一个基于 log 的公司级的平台的想法。

下面解释一下 DWS 平台,DWS 平台是有 3 个子项目组成:

  • Dbus(数据总线):负责实时将数据从源端实时抽出,并转换为约定的自带 schema 的 json 格式数据 (UMS 数据),放入 kafka 中;
  • Wormhole(数据交换平台):负责从 kafka 读出数据 将数据写入到目标中;
  • Swifts(实时计算平台):负责从 kafka 中读出数据,实时计算,并将数据写回 kafka 中。

图中:

  • Log extractor 和 dbus 共同完成数据抽取和数据转换,抽取包括全量和增量抽取。
  • Wormhole 可以将所有日志数据保存到 HDFS 中;还可以将数据落地到所有支持 jdbc 的数据库,落地到 HBash,Elasticsearch,Cassandra 等;
  • Swifts 支持以配置和 SQL 的方式实现对进行流式计算,包括支持流式 join,look up,filter,window aggregation 等功能;
  • Dbus web 是 dbus 的配置管理端,rider 除了配置管理以外,还包括对 Wormhole 和 Swifts 运行时管理,数据质量校验等。

由于时间关系,我今天主要介绍 DWS 中的 Dbus 和 Wormhole,在需要的时候附带介绍一下 Swifts。

三、dbus 解决方案

3.1 日志解析

如前面所说,Dbus 主要解决的是将日志从源端实时的抽出。这里我们以 MySQL 为例子,简单说明如何实现。

我们知道,虽然 MySQL InnoDB 有自己的 log,MySQL 主备同步是通过 binlog 来实现的。如下图:

图片来自:https://github.com/alibaba/canal

而 binlog 有三种模式:

  • Row 模式:日志中会记录成每一行数据被修改的形式,然后在 slave 端再对相同的数据进行修改。
  • Statement 模式: 每一条会修改数据的 sql 都会记录到 master 的 bin-log 中。slave 在复制的时候 SQL 进程会解析成和原来 master 端执行过的相同的 SQL 来再次执行。
  • Mixed 模式:MySQL 会根据执行的每一条具体的 sql 语句来区分对待记录的日志形式,也就是在 Statement 和 Row 之间选择一种。

他们各自的优缺点如下:

此处来自:http://www.jquerycn.cn/a_13625

由于 statement 模式的缺点,在与我们的 DBA 沟通过程中了解到,实际生产过程中都使用 row 模式进行复制。这使得读取全量日志成为可能。

通常我们的 MySQL 布局是采用 2 个 master 主库(vip)+ 1 个 slave 从库 + 1 个 backup 容灾库 的解决方案,由于容灾库通常是用于异地容灾,实时性不高也不便于部署。

为了最小化对源端产生影响,显然我们读取 binlog 日志应该从 slave 从库读取。

读取 binlog 的方案比较多,github 上不少,参考 https://github.com/search?utf…。最终我们选用了阿里的 canal 做位日志抽取方。

Canal 最早被用于阿里中美机房同步,canal 原理相对比较简单:

  • Canal 模拟 MySQL Slave 的交互协议,伪装自己为 MySQL Slave,向 MySQL Slave 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 Slave(也就是 canal)
  • Canal 解析 binary log 对象 (原始为 byte 流)


图片来自:https://github.com/alibaba/canal

3.2 解决方案

Dbus 的 MySQL 版主要解决方案如下:

对于增量的 log,通过订阅 Canal Server 的方式,我们得到了 MySQL 的增量日志:

  • 按照 Canal 的输出,日志是 protobuf 格式,开发增量 Storm 程序,将数据实时转换为我们定义的 UMS 格式 (json 格式, 稍后我会介绍),并保存到 kafka 中;
  • 增量 Storm 程序还负责捕获 schema 变化,以控制版本号;
  • 增量 Storm 的配置信息保存在 Zookeeper 中,以满足高可用需求。
  • Kafka 既作为输出结果也作为处理过程中的缓冲器和消息解构区。

在考虑使用 Storm 作为解决方案的时候,我们主要是认为 Storm 有以下优点:

  • 技术相对成熟,比较稳定,与 kafka 搭配也算标准组合;
  • 实时性比较高,能够满足实时性需求;
  • 满足高可用需求;
  • 通过配置 Storm 并发度,可以活动性能扩展的能力;

3.3 全量抽取

对于流水表,有增量部分就够了,但是许多表需要知道最初(已存在)的信息。这时候我们需要 initial load(第一次加载)。

对于 initial load(第一次加载),同样开发了全量抽取 Storm 程序通过 jdbc 连接的方式,从源端数据库的备库进行拉取。initial load 是拉全部数据,所以我们推荐在业务低峰期进行。好在只做一次,不需要每天都做。

全量抽取,我们借鉴了 Sqoop 的思想。将全量抽取 Storm 分为了 2 个部分:

  • 数据分片
  • 实际抽取

数据分片需要考虑分片列,按照配置和自动选择列将数据按照范围来分片,并将分片信息保存到 kafka 中。

下面是具体的分片策略:

全量抽取的 Storm 程序是读取 kafka 的分片信息,采用多个并发度并行连接数据库备库进行拉取。因为抽取的时间可能很长。抽取过程中将实时状态写到 Zookeeper 中,便于心跳程序监控。

3.4 统一消息格式

无论是增量还是全量,最终输出到 kafka 中的消息都是我们约定的一个统一消息格式, 称为 UMS(unified message schema) 格式。

如下图所示:

消息中 schema 部分,定义了 namespace 是由 类型 + 数据源名 +schema 名 + 表名 + 版本号 + 分库号 + 分表号 能够描述整个公司的所有表,通过一个 namespace 就能唯一定位。

  • _ums_op_ 表明数据的类型是 I(insert),U(update),D(删除);
  • _ums_ts_ 发生增删改的事件的时间戳,显然新的数据发生的时间戳更新;
  • _ums_id_ 消息的唯一 id,保证消息是唯一的,但这里我们保证了消息的先后顺序(稍后解释);

payload 是指具体的数据,一个 json 包里面可以包含 1 条至多条数据,提高数据的有效载荷。

UMS 中支持的数据类型,参考了 Hive 类型并进行简化,基本上包含了所有数据类型。

3.5 全量和增量的一致性

在整个数据传输中,为了尽量的保证日志消息的顺序性,kafka 我们使用的是 1 个 partition 的方式。在一般情况下,基本上是顺序的和唯一的。

但是我们知道写 kafka 会失败,有可能重写,Storm 也用重做机制,因此,我们并不严格保证 exactly once 和完全的顺序性,但保证的是 at least once。

因此_ums_id_变得尤为重要。

对于全量抽取,_ums_id_是唯一的,从 zk 中每个并发度分别取不同的 id 片区,保证了唯一性和性能,填写负数,不会与增量数据冲突,也保证他们是早于增量消息的。

对于增量抽取,我们使用的是 MySQL 的日志文件号 + 日志偏移量作为唯一 id。Id 作为 64 位的 long 整数,高 7 位用于日志文件号,低 12 位作为日志偏移量。

例如:000103000012345678。103 是日志文件号,12345678 是日志偏移量。

这样,从日志层面保证了物理唯一性(即便重做也这个 id 号也不变),同时也保证了顺序性(还能定位日志)。通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新。

其实_ums_ts_与_ums_id_意图是类似的,只不过有时候_ums_ts_可能会重复, 即在 1 毫秒中发生了多个操作,这样就得靠比较_ums_id_了。

3.6 心跳监控和预警

整个系统涉及到数据库的主备同步,Canal Server,多个并发度 Storm 进程等各个环节。

因此对流程的监控和预警就尤为重要。

通过心跳模块,例如每分钟(可配置)对每个被抽取的表插入一条心态数据并保存发送时间,这个心跳表也被抽取,跟随着整个流程下来,与被同步表在实际上走相同的逻辑(因为多个并发的的 Storm 可能有不同的分支),当收到心跳包的时候,即便没有任何增删改的数据,也能证明整条链路是通的。

Storm 程序和心跳程序将数据发送公共的统计 topic,再由统计程序保存到 influxdb 中,使用 grafana 进行展示,就可以看到如下效果:

图中是某业务系统的实时监控信息。上面是实时流量情况,下面是实时延时情况。可以看到,实时性还是很不错的,基本上 1~2 秒数据就已经到末端 kafka 中。

Granfana 提供的是一种实时监控能力。

如果出现延时,则是通过 dbus 的心跳模块发送邮件报警或短信报警。

3.7 实时脱敏

考虑到数据安全性,对于有脱敏需求的场景,Dbus 的全量 storm 和增量 storm 程序也完成了实时脱敏的功能。脱敏方式有 3 种:

总结一下:简单的说,Dbus 就是将各种源的数据,实时的导出,并以 UMS 的方式提供订阅,支持实时脱敏,实际监控和报警。

四、Wormhole 解决方案

说完 Dbus,该说一下 Wormhole,为什么两个项目不是一个,而要通过 kafka 来对接呢?

其中很大一个原因就是解耦,kafka 具有天然的解耦能力,程序直接可以通过 kafka 做异步的消息传递。Dbus 和 Wornhole 内部也使用了 kafka 做消息传递和解耦。

另外一个原因就是,UMS 是自描述的,通过订阅 kafka,任何有能力的使用方来直接消费 UMS 来使用。

虽然 UMS 的结果可以直接订阅,但还需要开发的工作。Wormhole 解决的是:提供一键式的配置,将 kafka 中的数据落地到各种系统中,让没有开发能力的数据使用方通过 wormhole 来实现使用数据。

如图所示,Wormhole 可以将 kafka 中的 UMS 落地到各种系统,目前用的最多的 HDFS,JDBC 的数据库和 HBase。

在技术栈上,wormhole 选择使用 spark streaming 来进行。

在 Wormhole 中,一条 flow 是指从一个 namaspace 从源端到目标端。一个 spark streaming 服务于多条 flow。

选用 Spark 的理由是很充分的:

  • Spark 天然的支持各种异构存储系统;
  • 虽然 Spark Stream 比 Storm 延时稍差,但 Spark 有着更好的吞吐量和更好的计算性能;
  • Spark 在支持并行计算方面有更强的灵活性;
  • Spark 提供了一个技术栈内解决 Sparking Job,Spark Streaming,Spark SQL 的统一功能,便于后期开发;

这里补充说一下 Swifts 的作用:

  • Swifts 的本质是读取 kafka 中的 UMS 数据,进行实时计算,将结果写入到 kafka 的另外一个 topic。
  • 实时计算可以是很多种方式:比如过滤 filter,projection(投影),lookup,流式 join window aggregation,可以完成各种具有业务价值的流式实时计算。

Wormhole 和 Swifts 对比如下:

4.1 落 HDFS

通过 Wormhole Wpark Streaming 程序消费 kafka 的 UMS,首先 UMS log 可以被保存到 HDFS 上。

kafka 一般只保存若干天的信息,不会保存全部信息,而 HDFS 中可以保存所有的历史增删改的信息。这就使得很多事情变为可能:

  • 通过重放 HDFS 中的日志,我们能够还原任意时间的历史快照。
  • 可以做拉链表,还原每一条记录的历史信息,便于分析;
  • 当程序出现错误是,可以通过回灌(backfill),重新消费消息,重新形成新的快照。

可以说 HDFS 中的日志是很多的事情基础。

介于 Spark 原生对 parquet 支持的很好,Spark SQL 能够对 Parquet 提供很好的查询。UMS 落地到 HDFS 上是保存到 Parquet 文件中的。Parquet 的内容是所有 log 的增删改信息以及_ums_id_,_ums_ts_都存下来。

Wormhole spark streaming 根据 namespace 将数据分布存储到不同的目录中,即不同的表和版本放在不同目录中。

由于每次写的 Parquet 都是小文件,大家知道 HDFS 对于小文件性能并不好,因此另外还有一个 job,每天定时将这些的 Parquet 文件进行合并成大文件。

每个 Parquet 文件目录都带有文件数据的起始时间和结束时间。这样在回灌数据时,可以根据选取的时间范围来决定需要读取哪些 Parquet 文件,不必读取全部数据。

4.2 插入或更新数据的幂等性

常常我们遇到的需求是,将数据经过加工落地到数据库或 HBase 中。那么这里涉及到的一个问题就是,什么样的数据可以被更新到数据?

这里最重要的一个原则就是数据的幂等性。

无论是遇到增删改任何的数据,我们面临的问题都是:

  • 该更新哪一行;
  • 更新的策略是什么。

对于第一个问题,其实就需要定位数据要找一个唯一的键,常见的有:

  • 使用业务库的主键;
  • 由业务方指定几个列做联合唯一索引;

对于第二个问题,就涉及到_ums_id_了,因为我们已经保证了_ums_id_大的值更新,因此在找到对应数据行后,根据这个原则来进行替换更新。

之所以要软删除和加入_is_active_列,是为了这样一种情况:

如果已经插入的_ums_id_比较大,是删除的数据(表明这个数据已经删除了),如果不是软删除,此时插入一个_ums_id_小的数据(旧数据),就会真的插入进去。

这就导致旧数据被插入了。不幂等了。所以被删除的数据依然保留(软删除)是有价值的,它能被用于保证数据的幂等性。

4.3 HBase 的保存

插入数据到 Hbase 中,相当要简单一些。不同的是 HBase 可以保留多个版本的数据(当然也可以只保留一个版本)默认是保留 3 个版本;

因此插入数据到 HBase,需要解决的问题是:

  • 选择合适的 rowkey:Rowkey 的设计是可以选的,用户可以选择源表的主键,也可以选择若干列做联合主键。
  • 选择合适的 version:使用_ums_id_+ 较大的偏移量(比如 100 亿)作为 row 的 version。

Version 的选择很有意思,利用_ums_id_的唯一性和自增性,与 version 自身的比较关系一致:即 version 较大等价于_ums_id_较大,对应的版本较新。

从提高性能的角度,我们可以将整个 Spark Streaming 的 Dataset 集合直接插入到 HBase,不需要比较。让 HBase 基于 version 自动替我们判断哪些数据可以保留,哪些数据不需要保留。

Jdbc 的插入数据:插入数据到数据库中,保证幂等的原理虽然简单,要想提高性能在实现上就变得复杂很多,总不能一条一条的比较然后在插入或更新。

我们知道 Spark 的 RDD/dataset 都是以集合的方式来操作以提高性能,同样的我们需要以集合操作的方式实现幂等性。

具体思路是:

  • 首先根据集合中的主键到目标数据库中查询,得到一个已有数据集合;
  • 与 dataset 中的集合比较,分出两类:

A:不存在的数据,即这部分数据 insert 就可以;

B:存在的数据,比较_ums_id_,最终只将哪些_ums_id_更新较大 row 到目标数据库,小的直接抛弃。

使用 Spark 的同学都知道,RDD/dataset 都是可以 partition 的,可以使用多个 worker 并进行操作以提高效率。

在考虑并发情况下,插入和更新都可能出现失败,那么还有考虑失败后的策略。

比如:因为别的 worker 已经插入,那么因为唯一性约束插入失败,那么需要改为更新,还要比较_ums_id_看是否能够更新。

对于无法插入其他情况(比如目标系统有问题),Wormhole 还有重试机制。插入到其他存储中的就不多介绍了,总的原则是:根据各自存储自身特性,设计基于集合的,并发的插入数据实现。这些都是 Wormhole 为了性能而做的努力,使用Wormhole 的用户不必关心。

五、运用案例

5.1 实时营销

说了那么多,DWS 有什么实际运用呢?下面我来介绍某系统使用 DWS 实现了的实时营销。

如上图所示:

系统 A 的数据都保存到自己的数据库中,我们知道,宜信提供很多金融服务,其中包括借款,而借款过程中很重要的就是信用审核。

借款人需要提供证明具有信用价值的信息,比如央行征信报告,是具有最强信用数据的数据。而银行流水,网购流水也是具有较强的信用属性的数据。

借款人通过 Web 或手机 APP 在系统 A 中填写信用信息时,可能会某些原因无法继续,虽然可能这个借款人是一个优质潜在客户,但以前由于无法或很久才能知道这个信息,所以实际上这样的客户是流失了。

应用了 DWS 以后,借款人已经填写的信息已经记录到数据库中,并通过 DWS 实时的进行抽取、计算和落地到目标库中。根据对客户的打分,评价出优质客户。然后立刻将这个客户的信息输出到客服系统中。

客服人员在很短的时间(几分钟以内)就通过打电话的方式联系上这个借款人(潜客),进行客户关怀,将这个潜客转换为真正的客户。我们知道借款是有时效性的,如果时间太久就没有价值了。

如果没有实时抽取 / 计算 / 落库的能力,那么这一切都无法实现。

5.2 实时报表系统

另外一个实时报表的应用如下:

我们数据使用方的数据来自多个系统,以前是通过 T + 1 的方式获得报表信息,然后指导第二天的运营,这样时效性很差。

通过 DWS,将数据从多个系统中实时抽取,计算和落地,并提供报表展示,使得运营可以及时作出部署和调整,快速应对。

六、总结

  • DWS 技术上基于主流实时流式大数据技术框架,高可用大吞吐强水平扩容,低延迟高容错最终一致。
  • DWS 能力上支持异构多源多目标系统,支持多数据格式(结构化半结构化非结构化数据)和实时技术能力。
  • DWS 将三个子项目合并作为一个平台推出,使得我们具备了实时的能力,驱动各种实时场景应用。
  • 适合场景包括:实时同步/实时计算/实时监控/实时报表/实时分析/实时洞察/实时管理/实时运营/实时决策

作者:王东

7 月 25 日晚 8 点,线上直播,【AI 中台——智能聊天机器人平台】,点击了解详情。

来源:宜信技术学院

正文完
 0