上一篇介绍了移山(数据迁徙平台)实时数据同步的整体架构;
本文次要介绍移山(数据迁徙平台)实时数据同步是如何保障音讯的程序性。
能够拜访 这里 查看更多对于大数据平台建设的原创文章。
一. 什么是音讯的程序性?
- 音讯生产端将音讯发送给同一个MQ服务器的同一个分区,并且按程序发送;
- 生产生产端依照音讯发送的程序进行生产。
二. 为什么要保障音讯的程序性?
在某些业务性能场景下须要保障音讯的发送和接管程序是统一的,否则会影响数据的应用。
须要保障音讯有序的场景
移山的实时数据同步应用canal
组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想理解移山实时同步服务架构设计的能够点这里);
kafka 生产端再依据具体的数据应用场景去解决数据(存入 HBase、MySQL 或间接做实时剖析);
因为binlog 自身是有序的,因而写入到mq之后也须要保障程序。
- 如果当初移山创立了一个实时同步工作,而后订阅了一个业务数据库的订单表;
- 上游业务,向订单表里插入了一个订单,而后对该订单又做了一个更新操作,则 binlog 里会主动写入插入操作和更新操作的数据,这些数据会被 canal server 投递至 kafka broker 外面;
- 如果 kafka 生产端先生产到了更新日志,后生产到插入日志,则在往指标表里做操作时就会因为数据缺失导致产生异样。
三. 移山实时同步服务是怎么保障音讯的程序性
实时同步服务音讯解决整体流程如下:
咱们次要通过以下两个方面去保障保障音讯的程序性。
1. 将须要保障程序的音讯发送到同一个partition
1.1 kafka的同一个partition内的音讯是有序的
- kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,所以能够保障FIFO的程序;
- 因而生产者依照肯定的程序发送音讯,broker 就会依照这个程序把音讯写入 partition,消费者也会依照雷同的程序去读取音讯;
- kafka 的每一个 partition 不会同时被两个消费者实例生产,由此能够保障音讯生产的程序性。
1.2 管制同一key散发到同一partition
要保障同一个订单的屡次批改达到 kafka 里的程序不能乱,能够在Producer 往 kafka 插入数据时,管制同一个key (能够采纳订单主键key-hash算法来实现)发送到同一 partition,这样就能保障同一笔订单都会落到同一个 partition 内。
1.3 canal 须要做的配置
canal 目前反对的mq有kafka/rocketmq
,实质上都是基于本地文件的形式来反对了分区级的程序音讯的能力。咱们只需在配置 instance 的时候开启如下配置即可:
1> canal.properties
# leader节点会期待所有同步中的正本确认之后再确认这条记录是否发送实现canal.mq.acks = all
备注:
- 这样只有至多有一个同步正本存在,记录就不会失落。
2> instance.properties
# 散列模式的分区数canal.mq.partitionsNum=2# 散列规定定义 库名.表名: 惟一主键,多个表之间用逗号分隔canal.mq.partitionHash=test.lyf_canal_test:id
备注:
- 同一条数据的增删改操作 产生的 binlog 数据都会写到同一个分区内;
查看指定topic的指定分区的音讯,能够应用如下命令:
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
2. 通过日志工夫戳和日志偏移量进行乱序解决
将同一个订单数据通过指定key的形式发送到同一个 partition 能够解决大部分状况下的数据乱序问题。
2.1 非凡场景
对于一个有着先后顺序的音讯A、B,失常状况下应该是A先发送实现后再发送B。然而在异常情况下:
- A发送失败了,B发送胜利,而A因为重试机制在B发送实现之后重试发送胜利了;
- 这时对于自身程序为AB的音讯程序变成了BA。
移山的实时同步服务会在将订阅到的数据存入HBase之前再加一层乱序解决 。
2.2 binlog里的两个重要信息
应用 mysqlbinlog
查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
执行工夫和偏移量:
备注:
- 每条数据都会有执行工夫和偏移量这两个重要信息,下边的校验逻辑外围正是借助了这两个值;
- 执行的sql 语句在 binlog 中是以base64编码格局存储的,如果想查看sql 语句,须要加上:
--base64-output=decode-rows -v
参数来解码; 偏移量:
- Position 就代表 binlog 写到这个偏移量的中央,也就是写了这么多字节,即以后 binlog 文件的大小;
- 也就是说后写入数据的 Position 必定比先写入数据的 Position 大,因而能够依据 Position 大小来判断音讯的程序。
3.音讯乱序解决演示
3.1 在订阅表里插入一条数据,而后再做两次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test');Query OK, 1 row affected (0.00 sec)MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13;Query OK, 1 row affected (0.00 sec)Rows matched: 1 Changed: 1 Warnings: 0MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13;Query OK, 1 row affected (0.00 sec)
3.2 产生三条须要保障程序的音讯
把插入,第一次更新,第二次更新这三次操作产生的 binlog 被 canal server 推送至 kafka 中的音讯别离称为:音讯A,音讯B,音讯C。
- 音讯A:
- 音讯B:
- 音讯C:
3.3 网络起因造成音讯乱序
假如因为不可知的网络起因:
- kafka broker收到的三条音讯别离为:音讯A,音讯C,音讯B;
- 则 kafka 生产端生产到的这三条音讯先后顺序就是:音讯A,音讯C,音讯B
- 这样就造成了音讯的乱序,因而订阅到的数据在存入指标表前必须得加乱序校验解决。
3.4 音讯乱序解决逻辑
咱们利用HBase的个性,将数据主键做为指标表的 rowkey。当 kafka 生产端生产到数据时,乱序解决次要流程(摘自禧云数芯大数据平台技术白皮书)如下:
demo的三条音讯解决流程如下:
1> 判断音讯A 的主键id做为rowkey在hbase的指标表中不存在,则将音讯A的数据直接插入HBase:
2> 音讯C 的主键id做为rowkey,曾经在指标表中存在,则这时须要拿音讯C 的执行工夫和表中存储的执行工夫去判断:
- 如果音讯C 中的执行工夫小于表中存储的执行工夫,则证实音讯C 是反复音讯或乱序的音讯,间接抛弃;
- 音讯C 中的执行工夫大于表中存储的执行工夫,则间接更新表数据(本demo即合乎该种场景):
音讯C 中的执行工夫等于表中存储的执行工夫,则这时须要拿音讯C 的偏移量和表中存储的偏移量去判断:
- 音讯C 中的偏移量小于表中存储的偏移量,则证实音讯C 是反复音讯,间接抛弃;
- 音讯C 中的偏移量大于等于表中存储的偏移量,则间接更新表数据。
3> 音讯B 的主键id做为rowkey,曾经在指标表中存在,则这时须要拿音讯B 的执行工夫和表中存储的执行工夫去判断:
- 因为音讯B中的执行工夫小于表中存储的执行工夫(即音讯C 的执行工夫),因而音讯B 间接抛弃。
3.5 次要代码
kafka 生产端将生产到的音讯进行格式化解决和组装,并借助 HBase-client API
来实现对 HBase 表的操作。
1> 应用Put
组装单行数据
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 为从binlog订阅到的数据,通过循环,为指标HBase表 * 增加rowkey、列簇、列数据。 * 作用:用来对单个行执行退出操作。 */Put put = new Put(Bytes.toBytes(hbaseData.get("id")));// hbaseData 为从binlog订阅到的数据,通过循环,为指标HBase表增加列簇和列put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 应用 checkAndMutate
,更新HBase
表的数据
只有服务端对应rowkey的列数据与预期的值合乎冀望条件(大于、小于、等于)时,才会将put操作提交至服务端。
// 如果 update_info(列族) execute_time(列) 不存在值就插入数据,如果存在则返回falseboolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put);// 如果存在,则去比拟执行工夫if (!res1) { // 如果本次传递的执行工夫大于HBase中的执行工夫,则插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put);// 执行工夫相等时,则去比拟偏移量,本次传递的值大于HBase中的值则插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); }}
四.总结
- 目前移山的实时同步服务,kafka 生产端是应用一个线程去生产数据;
- 如果未来有版本升级需要,将生产端改为多个线程去生产数据时,要思考到多线程生产时有序的音讯会被打乱这种状况的解决办法。
关注微信公众号
欢送大家关注我的微信公众号浏览更多文章: