关于canal:移山数据迁移平台实时数据同步服务是如何保证消息的顺序性

37次阅读

共计 4577 个字符,预计需要花费 12 分钟才能阅读完成。

上一篇介绍了移山 (数据迁徙平台) 实时数据同步的整体架构;
本文次要介绍移山 (数据迁徙平台) 实时数据同步是如何保障音讯的程序性。
能够拜访 这里 查看更多对于 大数据平台建设 的原创文章。

一. 什么是音讯的程序性?

  1. 音讯生产端将音讯发送给同一个 MQ 服务器的同一个分区,并且按程序发送;
  2. 生产生产端依照音讯发送的程序进行生产。

二. 为什么要保障音讯的程序性?

在某些业务性能场景下须要保障音讯的发送和接管程序是统一的,否则会影响数据的应用。

须要保障音讯有序的场景

移山的实时数据同步应用 canal 组件订阅 MySQL 数据库的日志,并将其投递至 kafka 中(想理解移山实时同步服务架构设计的能够点这里);
kafka 生产端再依据具体的数据应用场景去解决数据(存入 HBase、MySQL 或间接做实时剖析);
因为 binlog 自身是有序的,因而写入到 mq 之后也须要保障程序。

  1. 如果当初移山创立了一个实时同步工作,而后订阅了一个业务数据库的订单表;
  2. 上游业务,向订单表里插入了一个订单,而后对该订单又做了一个更新操作,则 binlog 里会主动写入插入操作和更新操作的数据,这些数据会被 canal server 投递至 kafka broker 外面;
  3. 如果 kafka 生产端先生产到了更新日志,后生产到插入日志,则在往指标表里做操作时就会因为数据缺失导致产生异样。

三. 移山实时同步服务是怎么保障音讯的程序性

实时同步服务音讯解决整体流程如下:

咱们次要通过以下两个方面去保障保障音讯的程序性。

1. 将须要保障程序的音讯发送到同一个 partition

1.1 kafka 的同一个 partition 内的音讯是有序的
  • kafka 的同一个 partition 用一个 write ahead log 组织,是一个有序的队列,所以能够保障 FIFO 的程序;
  • 因而生产者依照肯定的程序发送音讯,broker 就会依照这个程序把音讯写入 partition,消费者也会依照雷同的程序去读取音讯;
  • kafka 的每一个 partition 不会同时被两个消费者实例生产,由此能够保障音讯生产的程序性。
1.2 管制同一 key 散发到同一 partition

要保障同一个订单的屡次批改达到 kafka 里的程序不能乱,能够在 Producer 往 kafka 插入数据时,管制同一个 key(能够采纳订单主键 key-hash 算法来实现)发送到同一 partition,这样就能保障同一笔订单都会落到同一个 partition 内。

1.3 canal 须要做的配置

canal 目前反对的 mq 有kafka/rocketmq,实质上都是基于本地文件的形式来反对了分区级的程序音讯的能力。咱们只需在配置 instance 的时候开启如下配置即可:

1> canal.properties

# leader 节点会期待所有同步中的正本确认之后再确认这条记录是否发送实现
canal.mq.acks = all

备注:

  • 这样只有至多有一个同步正本存在,记录就不会失落。

2> instance.properties

# 散列模式的分区数
canal.mq.partitionsNum=2
# 散列规定定义 库名. 表名: 惟一主键,多个表之间用逗号分隔
canal.mq.partitionHash=test.lyf_canal_test:id

备注:

  • 同一条数据的增删改操作 产生的 binlog 数据都会写到同一个分区内;
  • 查看指定 topic 的指定分区的音讯,能够应用如下命令:

    bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0

2. 通过日志工夫戳和日志偏移量进行乱序解决

将同一个订单数据通过指定 key 的形式发送到同一个 partition 能够解决大部分状况下的数据乱序问题。

2.1 非凡场景

对于一个有着先后顺序的音讯 A、B,失常状况下应该是 A 先发送实现后再发送 B。然而在异常情况下:

  • A 发送失败了,B 发送胜利,而 A 因为重试机制在 B 发送实现之后重试发送胜利了;
  • 这时对于自身程序为 AB 的音讯程序变成了 BA。

移山的实时同步服务会在将订阅到的数据存入 HBase 之前再加一层乱序解决。

2.2 binlog 里的两个重要信息

应用 mysqlbinlog 查看 binlog:

/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001

执行工夫和偏移量:

备注:

  1. 每条数据都会有执行工夫和偏移量这两个重要信息,下边的校验逻辑外围正是借助了这两个值
  2. 执行的 sql 语句在 binlog 中是以 base64 编码格局存储的,如果想查看 sql 语句,须要加上:--base64-output=decode-rows -v 参数来解码;
  3. 偏移量:

    • Position 就代表 binlog 写到这个偏移量的中央,也就是写了这么多字节,即以后 binlog 文件的大小;
    • 也就是说后写入数据的 Position 必定比先写入数据的 Position 大,因而能够依据 Position 大小来判断音讯的程序。

3. 音讯乱序解决演示

3.1 在订阅表里插入一条数据,而后再做两次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test');
Query OK, 1 row affected (0.00 sec)

MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2  where id = 13;
Query OK, 1 row affected (0.00 sec)
3.2 产生三条须要保障程序的音讯

插入,第一次更新,第二次更新 这三次操作产生的 binlog 被 canal server 推送至 kafka 中的音讯别离称为:音讯 A,音讯 B,音讯 C

  • 音讯 A:
  • 音讯 B:
  • 音讯 C:
3.3 网络起因造成音讯乱序

假如因为不可知的网络起因:

  • kafka broker 收到的三条音讯别离为:音讯 A,音讯 C,音讯 B
  • 则 kafka 生产端生产到的这三条音讯先后顺序就是:音讯 A,音讯 C,音讯 B
  • 这样就造成了音讯的乱序,因而 订阅到的数据在存入指标表前必须得加乱序校验解决
3.4 音讯乱序解决逻辑

咱们利用 HBase 的个性,将数据主键做为指标表的 rowkey。当 kafka 生产端生产到数据时,乱序解决次要流程(摘自禧云数芯大数据平台技术白皮书)如下:

demo 的三条音讯解决流程如下:
1> 判断音讯 A 的主键 id 做为 rowkey 在 hbase 的指标表中不存在,则将音讯 A 的数据直接插入 HBase:

2> 音讯 C 的主键 id 做为 rowkey,曾经在指标表中存在,则这时须要拿音讯 C 的执行工夫和表中存储的执行工夫去判断:

  • 如果音讯 C 中的执行工夫小于表中存储的执行工夫,则证实音讯 C 是反复音讯或乱序的音讯,间接抛弃;
  • 音讯 C 中的执行工夫大于表中存储的执行工夫,则间接更新表数据(本 demo 即合乎该种场景):
  • 音讯 C 中的执行工夫等于表中存储的执行工夫,则这时须要拿音讯 C 的偏移量和表中存储的偏移量去判断:

    • 音讯 C 中的偏移量小于表中存储的偏移量,则证实音讯 C 是反复音讯,间接抛弃;
    • 音讯 C 中的偏移量大于等于表中存储的偏移量,则间接更新表数据。

3> 音讯 B 的主键 id 做为 rowkey,曾经在指标表中存在,则这时须要拿音讯 B 的执行工夫和表中存储的执行工夫去判断:

  • 因为音讯 B 中的执行工夫小于表中存储的执行工夫(即音讯 C 的执行工夫),因而音讯 B 间接抛弃。
3.5 次要代码

kafka 生产端将生产到的音讯进行格式化解决和组装,并借助 HBase-client API 来实现对 HBase 表的操作。

1> 应用 Put 组装单行数据

/**
 * 包名:org.apache.hadoop.hbase.client.Put
 * hbaseData 为从 binlog 订阅到的数据,通过循环,为指标 HBase 表
 * 增加 rowkey、列簇、列数据。* 作用:用来对单个行执行退出操作。*/
Put put = new Put(Bytes.toBytes(hbaseData.get("id")));
// hbaseData 为从 binlog 订阅到的数据,通过循环,为指标 HBase 表增加列簇和列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));

2> 应用 checkAndMutate,更新 HBase 表的数据

只有服务端对应 rowkey 的列数据与预期的值合乎冀望条件(大于、小于、等于)时,才会将 put 操作提交至服务端。

 // 如果 update_info(列族)execute_time(列)不存在值就插入数据,如果存在则返回 false
boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"))                   .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put);

// 如果存在,则去比拟执行工夫
if (!res1) {
   // 如果本次传递的执行工夫大于 HBase 中的执行工夫,则插入 put
  boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put);

// 执行工夫相等时,则去比拟偏移量,本次传递的值大于 HBase 中的值则插入 put
  if (!res2) {boolean res3  = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")),
                  Bytes.toBytes("update_info"),       Bytes.toBytes("execute_position"),    CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put);
  }
}

四. 总结

  1. 目前移山的实时同步服务,kafka 生产端是应用一个线程去生产数据;
  2. 如果未来有版本升级需要,将生产端改为多个线程去生产数据时,要思考到多线程生产时有序的音讯会被打乱这种状况的解决办法。

关注微信公众号

欢送大家关注我的微信公众号浏览更多文章:

正文完
 0