写在后面
数据仓库的个性之一是集成,即首先把未通过加工解决的、不同起源的、不同模式的数据同步到 ODS 层,个别状况下,这些 ODS 层数据包含日志数据和业务 DB 数据。对于业务 DB 数据而言 (比方存储在 MySQL 中),将数据采集并导入到数仓中(通常是 Hive 或者 MaxCompute) 是十分重要的一个环节。
那么,该如何将业务 DB 数据高效精确地同步到数仓中呢?个别企业会应用两种计划:直连同步与实时增量同步(数据库日志解析)。其中直连同步的基本思路是直连数据库进行 SELECT,而后将查问的数据存储到本地文件作为两头存储,最初把文件 Load 到数仓中。这种形式十分的简略不便,然而随着业务的倒退,会遇到一些瓶颈,具体见下文剖析。
为了解决这些问题,个别会应用实时增量的形式进行数据同步,比方 DataWorks 提供的一键 TT 接入性能,其基本原理是 CDC (Change Data Capture) + Merge,即实时 Binlog 采集 + 离线解决 Binlog 还原业务数据这样一套解决方案。
本文次要包含以下内容,心愿对你有所帮忙
- 数据同步的形式
- 一键 TT 接入的流程与步骤
-
基于 Canal+Flink 模仿实现 TT 数据接入
数据同步的形式
直连同步
直连同步是指通过定义好的标准接口 API 和基于动态链接库的形式间接连贯业务库,比方 ODBC/JDBC 等规定了对立的标准接口,不同的数据库基于这套规范提供标准的驱动,从而反对完全相同的函数调用和 SQL 实现。比方常常应用的 Sqoop 就是采取这种形式进行批量数据同步的。
直连同步的形式配置非常简略,很容易上手操作,比拟适宜操作型业务零碎的数据同步,然而会存在以下问题:
- 数据同步工夫:随着业务规模的增长,数据同步破费的工夫会越来越长,无奈满足上游数仓生产的工夫要求。
-
性能瓶颈:直连数据库查问数据,对数据库影响十分大,容易造成慢查问,如果业务库没有采取主备策略,则会影响业务线上的失常服务,如果采取了主备策略,尽管能够防止对业务零碎的性能影响,但当数据量较大时,性能仍然会很差。
日志解析
所谓日志解析,即解析数据库的变更日志,比方 MySQL 的 Binlog 日志,Oracle 的归档日志文件。通过读取这些日志信息,收集变动的数据并将其解析到指标存储中即可实现数据的实时同步。这种读操作是在操作系统层面实现的,不须要通过数据库,因而不会给源数据库带来性能上的瓶颈。
数据库日志解析的同步形式能够实现实时与准实时的同步,提早能够管制在毫秒级别的,其最大的劣势就是性能好、效率高,不会对源数据库造成影响,目前,从业务零碎到数据仓库中的实时增量同步,宽泛采取这种形式,比方一键 TT 接入。当然,这种形式也会存在一些问题,比方批量补数时造成大量数据更新,日志解析会解决较慢,造成数据提早。除此之外,这种形式比较复杂,投入也较大,因为须要一个实时的抽取零碎去抽取并解析日志,下文会对此进行具体解释。
一键 TT 接入的根本步骤
根本流程
根本流程如下,首先是将数据的变更日志推送到 DataHub/TimeTunnel(TT, 一种基于生产者、消费者和 Topic 音讯标识的消息中间件,将音讯数据长久化到 HBase, 其底层基于 DataHub,类比 Kakfa),而后将表全量同步,且仅全量同步一次,接着将 TT 的数据订阅生产到 ODPS 的 TTSource 表,并将 TTSource 表合并到增量表中,最初将增量表与全量表进行 Merge 失去最新的全量表。当前的同步流程仅执行增量表的数据装载以及增量表与全量表的 Merge 合并,合并的形式是应用全外连贯 (full outer join)+ 数据笼罩(insert overwrite) 的形式,比方日调度,即是将当天的增量数据和前一天的全量数据做全外连贯,从新装载获取最新的一份全量数据。
步骤解释
以 WDK_MEMBER_PRO_APP.member_round_detail 为例
Step1. BPMS 审批
Step2. 创立 TT 表
这一步会在 TT(类比 Kafka)中生成名为 wdk_member_pro_app_member_round_detail 的 Topic,用于存储 Binlog 变更日志(比方 INSERT、UPDATE 数据),该 Topic 的个别命名格局是:${TDDL APP NAME}_${源表名}。值得注意的是,因为一行数据记录的变更日志是有严格程序的,而 DataHub/TimeTunnel 是分区有序的,所以要保障同一个 key 的数据进入到 Topic 的同一个分区,这样能力保障有序,这也是为什么要指定惟一主键的起因。
而后创立 TT Source 表,该表名为:s_tt_wdk_member_pro_app_member_round_detail_tt4,表名的个别格局是:s_tt_${TDDL APP NAME}_${源表名}_tt4,该表次要用于订阅 DataHub/TimeTunnel 中的 Topic,将 DataHub 的数据抽取到 ODPS 中存储(行将曾经采集到 TT 的数据存储到 ODPS 上)。该表的 content 字段存储了变更日志数据,包含一部分的元数据信息和理论的日志数据,对于元数据信息,具体如下:
- dbsync_ts: 同步机器的工夫戳(ms)+ 自增序列
- dbsync_db_name: 物理分库
- dbsync_table_name: 物理分表
- dbsync_modify_time: Binlog 变更日志工夫,即数据长久化到 DB 的工夫
- dbsync_operation: 数据的操作类型,比方 insert、update
-
dbsync_region_id: 标识本条数据的地区信息,实用于团体单元化场景。
Step3. 创立增量表
在 ODPS 中创立一张名为 s_tt_member_round_detail_delta 的表,该表的个别命名格局是:${指标表名}_delta。该表用于存储增量的数据,比方按天增量抽取,那么表中的数据就是当天的增量数据。次要解决逻辑是读取 TT Source 表的当天分区数据,将其写入到该增量表中。详见 TTMerge 节点工作。
Step4. 创立 TTMerge 节点
该节点的工作名称为 tt_${指标表名}_delta,次要作用是向 Step3 中创立的增量表中装载数据,值得注意的是:因为 TT 写入的数据是有时序的,例如一条记录在一天被更新 N 次,则在当日的表中,就有 N 条记录。对于按天增量的数据来说,只须要最新的数据即可,所以须要依照主键分区排序,分组取最新的数据,这里再次阐明一下主键的重要性,如果主键指定不正确,很容易造成脏数据的产生。装载数据的逻辑是:
INSERT OVERWRITE TABLE s_tt_member_round_detail_delta PARTITION(ds='20210304')
SELECT id
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,STATUS
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM (
-- 因为 TT 写入的数据是有时序的,例如一条记录在一天被更新 N 次,则在当日的表中,就有 N 条记录。-- 对于按天增量的数据来说,只须要最新的数据即可
-- 依照主键分区排序,分组取最新的数据
SELECT row_number() OVER(PARTITION BY id,user_id,biz_id ORDER BY id,user_id,biz_id,dbsync_ts DESC) AS row_number
,dbsync_operation
,id
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,STATUS
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM (
SELECT dbsync_ts
,dbsync_operation
,id
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,status
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM ( -- 获取并解析当天的变更日志数据
SELECT tt_split(content, 21) AS (dbsync_ts,dbsync_db_name,dbsync_table_name,dbsync_modify_time,dbsync_operation,dbsync_change_fields,id,gmt_create,gmt_modified,user_id,start_date,end_date,status,TYPE,attributes,round,merchant_code,consume_discount,card_id,plan_id,biz_id)
FROM hm_ods.s_tt_wdk_member_pro_app_member_round_detail_tt4
WHERE (
ds > '20210304'
OR (ds = '20210304' AND (hh > '00' OR (hh = '00' AND mm >= '00')))
)
AND (
ds < '20210305'
OR (ds = '20210305' AND (hh < '00' OR (hh = '00' AND mm < '00')))
)
) t
WHERE t.dbsync_ts != ''
) b
) u
WHERE row_number = 1
;
Step5.TTMerge 节点公布
公布 TTMerge 的工作
Step6.TTMerge 节点冒烟
执行 TTMerge 工作的冒烟测试,在运维核心的测试实例中能够查看。
Step7. 创立全量表
创立全量表 s_tt_member_round_detail,该表名就是在一键 TT 接入时填写的指标表名。该表用于存储截止到当天的全量数据,解决逻辑是应用当天的增量数据与昨天的全量数据进行全外连贯,从而 merge 一份最新的全量数据。
Step8. 创立全量表同步节点
创立全量表的同步节点工作,该工作只执行一次,用于首次将源表全量同步至指标表,后续的操作应用增量表与全量表进行 Merge 的形式来获取最新的全量数据。
Step9. 全量表节点公布
公布全量表同步工作。
Step10. 全量表节点冒烟
全量表同步工作冒烟测试,能够在运维核心的测试实例中查看,具体的工作名为:imp_s_tt_member_round_detail,其个别格局为:imp_${指标表名}。
Step11.Merge 节点生成
生成增量表与全量表的 Merge 工作,当天的增量数据与昨天的全量数据进行全外连贯,该 Merge 工作的根本逻辑是:
INSERT OVERWRITE TABLE s_tt_member_round_detail PARTITION(ds='20210305')
SELECT CASE WHEN n.id IS NULL THEN o.id
ELSE n.id
END
,CASE WHEN n.id IS NULL THEN o.gmt_create
ELSE n.gmt_create
END
,CASE WHEN n.id IS NULL THEN o.gmt_modified
ELSE n.gmt_modified
END
,CASE WHEN n.id IS NULL THEN o.user_id
ELSE n.user_id
END
,CASE WHEN n.id IS NULL THEN o.start_date
ELSE n.start_date
END
,CASE WHEN n.id IS NULL THEN o.end_date
ELSE n.end_date
END
,CASE WHEN n.id IS NULL THEN o.STATUS
ELSE n.STATUS
END
,CASE WHEN n.id IS NULL THEN o.TYPE
ELSE n.TYPE
END
,CASE WHEN n.id IS NULL THEN o.attributes
ELSE n.attributes
END
,CASE WHEN n.id IS NULL THEN o.round
ELSE n.round
END
,CASE WHEN n.id IS NULL THEN o.merchant_code
ELSE n.merchant_code
END
,CASE WHEN n.id IS NULL THEN o.consume_discount
ELSE n.consume_discount
END
,CASE WHEN n.id IS NULL THEN o.card_id
ELSE n.card_id
END
,CASE WHEN n.id IS NULL THEN o.plan_id
ELSE n.plan_id
END
,CASE WHEN n.id IS NULL THEN o.biz_id
ELSE n.biz_id
END
FROM (
SELECT *
FROM s_tt_member_round_detail_delta
WHERE ds = '20210305'
AND id IS NOT NULL
AND user_id IS NOT NULL
AND biz_id IS NOT NULL
) n
FULL OUTER JOIN (-- 全外连贯进行数据 merge
SELECT *
FROM s_tt_member_round_detail
WHERE ds = '20210304'
AND id IS NOT NULL
AND user_id IS NOT NULL
AND biz_id IS NOT NULL
) o
ON o.id = n.id
AND o.user_id = n.user_id
AND o.biz_id = n.biz_id
;
Step12.Merge 节点公布
公布 Merge 工作
从内网做点笔记 哈哈