写在后面数据仓库的个性之一是集成,即首先把未通过加工解决的、不同起源的、不同模式的数据同步到ODS层,个别状况下,这些ODS层数据包含日志数据和业务DB数据。对于业务DB数据而言(比方存储在MySQL中),将数据采集并导入到数仓中(通常是Hive或者MaxCompute)是十分重要的一个环节。
那么,该如何将业务DB数据高效精确地同步到数仓中呢?个别企业会应用两种计划:直连同步与实时增量同步(数据库日志解析)。其中直连同步的基本思路是直连数据库进行SELECT,而后将查问的数据存储到本地文件作为两头存储,最初把文件Load到数仓中。这种形式十分的简略不便,然而随着业务的倒退,会遇到一些瓶颈,具体见下文剖析。
为了解决这些问题,个别会应用实时增量的形式进行数据同步,比方DataWorks提供的一键TT接入性能,其基本原理是CDC (Change Data Capture) + Merge,即实时Binlog采集 + 离线解决Binlog还原业务数据这样一套解决方案。本文次要包含以下内容,心愿对你有所帮忙
数据同步的形式一键TT接入的流程与步骤基于Canal+Flink模仿实现TT数据接入
数据同步的形式直连同步直连同步是指通过定义好的标准接口API和基于动态链接库的形式间接连贯业务库,比方ODBC/JDBC等规定了对立的标准接口,不同的数据库基于这套规范提供标准的驱动,从而反对完全相同的函数调用和SQL实现。比方常常应用的Sqoop就是采取这种形式进行批量数据同步的。
直连同步的形式配置非常简略,很容易上手操作,比拟适宜操作型业务零碎的数据同步,然而会存在以下问题:
数据同步工夫:随着业务规模的增长,数据同步破费的工夫会越来越长,无奈满足上游数仓生产的工夫要求。性能瓶颈:直连数据库查问数据,对数据库影响十分大,容易造成慢查问,如果业务库没有采取主备策略,则会影响业务线上的失常服务,如果采取了主备策略,尽管能够防止对业务零碎的性能影响,但当数据量较大时,性能仍然会很差。
日志解析所谓日志解析,即解析数据库的变更日志,比方MySQL的Binlog日志,Oracle的归档日志文件。通过读取这些日志信息,收集变动的数据并将其解析到指标存储中即可实现数据的实时同步。这种读操作是在操作系统层面实现的,不须要通过数据库,因而不会给源数据库带来性能上的瓶颈。
数据库日志解析的同步形式能够实现实时与准实时的同步,提早能够管制在毫秒级别的,其最大的劣势就是性能好、效率高,不会对源数据库造成影响,目前,从业务零碎到数据仓库中的实时增量同步,宽泛采取这种形式,比方一键TT接入。当然,这种形式也会存在一些问题,比方批量补数时造成大量数据更新,日志解析会解决较慢,造成数据提早。除此之外,这种形式比较复杂,投入也较大,因为须要一个实时的抽取零碎去抽取并解析日志,下文会对此进行具体解释。
一键TT接入的根本步骤根本流程根本流程如下,首先是将数据的变更日志推送到DataHub/TimeTunnel(TT,一种基于生产者、消费者和Topic音讯标识的消息中间件,将音讯数据长久化到HBase,其底层基于DataHub,类比Kakfa),而后将表全量同步,且仅全量同步一次,接着将TT的数据订阅生产到ODPS的TTSource表,并将TTSource表合并到增量表中,最初将增量表与全量表进行Merge失去最新的全量表。当前的同步流程仅执行增量表的数据装载以及增量表与全量表的Merge合并,合并的形式是应用全外连贯(full outer join)+数据笼罩(insert overwrite)的形式,比方日调度,即是将当天的增量数据和前一天的全量数据做全外连贯,从新装载获取最新的一份全量数据。
步骤解释以WDK_MEMBER_PRO_APP.member_round_detail为例
Step1. BPMS审批Step2.创立TT表这一步会在TT(类比Kafka)中生成名为wdk_member_pro_app_member_round_detail的Topic,用于存储Binlog变更日志(比方INSERT、UPDATE数据),该Topic的个别命名格局是:${TDDL APP NAME}_${源表名}。值得注意的是,因为一行数据记录的变更日志是有严格程序的,而DataHub/TimeTunnel是分区有序的,所以要保障同一个key的数据进入到Topic的同一个分区,这样能力保障有序,这也是为什么要指定惟一主键的起因。而后创立TT Source表,该表名为:s_tt_wdk_member_pro_app_member_round_detail_tt4,表名的个别格局是:s_tt_${TDDL APP NAME}_${源表名}_tt4,该表次要用于订阅DataHub/TimeTunnel中的Topic,将DataHub的数据抽取到ODPS中存储(行将曾经采集到TT的数据存储到ODPS上)。该表的content字段存储了变更日志数据,包含一部分的元数据信息和理论的日志数据,对于元数据信息,具体如下:
dbsync_ts: 同步机器的工夫戳(ms)+自增序列dbsync_db_name: 物理分库dbsync_table_name: 物理分表dbsync_modify_time: Binlog变更日志工夫,即数据长久化到DB的工夫dbsync_operation: 数据的操作类型,比方insert、updatedbsync_region_id: 标识本条数据的地区信息,实用于团体单元化场景。
Step3.创立增量表在ODPS中创立一张名为s_tt_member_round_detail_delta的表,该表的个别命名格局是:${指标表名}_delta。该表用于存储增量的数据,比方按天增量抽取,那么表中的数据就是当天的增量数据。次要解决逻辑是读取TT Source表的当天分区数据,将其写入到该增量表中。详见TTMerge节点工作。
Step4.创立TTMerge节点该节点的工作名称为tt_${指标表名}_delta,次要作用是向Step3中创立的增量表中装载数据,值得注意的是:因为TT写入的数据是有时序的,例如一条记录在一天被更新N次,则在当日的表中,就有N条记录。对于按天增量的数据来说,只须要最新的数据即可,所以须要依照主键分区排序,分组取最新的数据,这里再次阐明一下主键的重要性,如果主键指定不正确,很容易造成脏数据的产生。装载数据的逻辑是:
INSERT OVERWRITE TABLE s_tt_member_round_detail_delta PARTITION(ds='20210304')SELECT id ,gmt_create ,gmt_modified ,user_id ,start_date ,end_date ,STATUS ,TYPE ,attributes ,round ,merchant_code ,consume_discount ,card_id ,plan_id ,biz_idFROM ( -- 因为TT写入的数据是有时序的,例如一条记录在一天被更新N次,则在当日的表中,就有N条记录。 -- 对于按天增量的数据来说,只须要最新的数据即可 -- 依照主键分区排序,分组取最新的数据 SELECT row_number() OVER(PARTITION BY id,user_id,biz_id ORDER BY id,user_id,biz_id,dbsync_ts DESC) AS row_number ,dbsync_operation ,id ,gmt_create ,gmt_modified ,user_id ,start_date ,end_date ,STATUS ,TYPE ,attributes ,round ,merchant_code ,consume_discount ,card_id ,plan_id ,biz_id FROM ( SELECT dbsync_ts ,dbsync_operation ,id ,gmt_create ,gmt_modified ,user_id ,start_date ,end_date ,status ,TYPE ,attributes ,round ,merchant_code ,consume_discount ,card_id ,plan_id ,biz_id FROM ( -- 获取并解析当天的变更日志数据 SELECT tt_split(content, 21) AS (dbsync_ts,dbsync_db_name,dbsync_table_name,dbsync_modify_time,dbsync_operation,dbsync_change_fields,id,gmt_create,gmt_modified,user_id,start_date,end_date,status,TYPE,attributes,round,merchant_code,consume_discount,card_id,plan_id,biz_id) FROM hm_ods.s_tt_wdk_member_pro_app_member_round_detail_tt4 WHERE ( ds > '20210304' OR (ds = '20210304' AND (hh > '00' OR (hh = '00' AND mm >= '00'))) ) AND ( ds < '20210305' OR (ds = '20210305' AND (hh < '00' OR (hh = '00' AND mm < '00'))) ) ) t WHERE t.dbsync_ts != '' ) b ) uWHERE row_number = 1;Step5.TTMerge节点公布公布TTMerge的工作
...