关于数据仓库:阿里的一键TT

写在后面

数据仓库的个性之一是集成,即首先把未通过加工解决的、不同起源的、不同模式的数据同步到ODS层,个别状况下,这些ODS层数据包含日志数据和业务DB数据。对于业务DB数据而言(比方存储在MySQL中),将数据采集并导入到数仓中(通常是Hive或者MaxCompute)是十分重要的一个环节。

那么,该如何将业务DB数据高效精确地同步到数仓中呢?个别企业会应用两种计划:直连同步与实时增量同步(数据库日志解析)。其中直连同步的基本思路是直连数据库进行SELECT,而后将查问的数据存储到本地文件作为两头存储,最初把文件Load到数仓中。这种形式十分的简略不便,然而随着业务的倒退,会遇到一些瓶颈,具体见下文剖析。

为了解决这些问题,个别会应用实时增量的形式进行数据同步,比方DataWorks提供的一键TT接入性能,其基本原理是CDC (Change Data Capture) + Merge,即实时Binlog采集 + 离线解决Binlog还原业务数据这样一套解决方案。
本文次要包含以下内容,心愿对你有所帮忙

  • 数据同步的形式
  • 一键TT接入的流程与步骤
  • 基于Canal+Flink模仿实现TT数据接入

    数据同步的形式

    直连同步

    直连同步是指通过定义好的标准接口API和基于动态链接库的形式间接连贯业务库,比方ODBC/JDBC等规定了对立的标准接口,不同的数据库基于这套规范提供标准的驱动,从而反对完全相同的函数调用和SQL实现。比方常常应用的Sqoop就是采取这种形式进行批量数据同步的。

直连同步的形式配置非常简略,很容易上手操作,比拟适宜操作型业务零碎的数据同步,然而会存在以下问题:

  • 数据同步工夫:随着业务规模的增长,数据同步破费的工夫会越来越长,无奈满足上游数仓生产的工夫要求。
  • 性能瓶颈:直连数据库查问数据,对数据库影响十分大,容易造成慢查问,如果业务库没有采取主备策略,则会影响业务线上的失常服务,如果采取了主备策略,尽管能够防止对业务零碎的性能影响,但当数据量较大时,性能仍然会很差。

    日志解析

    所谓日志解析,即解析数据库的变更日志,比方MySQL的Binlog日志,Oracle的归档日志文件。通过读取这些日志信息,收集变动的数据并将其解析到指标存储中即可实现数据的实时同步。这种读操作是在操作系统层面实现的,不须要通过数据库,因而不会给源数据库带来性能上的瓶颈。

数据库日志解析的同步形式能够实现实时与准实时的同步,提早能够管制在毫秒级别的,其最大的劣势就是性能好、效率高,不会对源数据库造成影响,目前,从业务零碎到数据仓库中的实时增量同步,宽泛采取这种形式,比方一键TT接入。当然,这种形式也会存在一些问题,比方批量补数时造成大量数据更新,日志解析会解决较慢,造成数据提早。除此之外,这种形式比较复杂,投入也较大,因为须要一个实时的抽取零碎去抽取并解析日志,下文会对此进行具体解释。

一键TT接入的根本步骤

根本流程

根本流程如下,首先是将数据的变更日志推送到DataHub/TimeTunnel(TT,一种基于生产者、消费者和Topic音讯标识的消息中间件,将音讯数据长久化到HBase,其底层基于DataHub,类比Kakfa),而后将表全量同步,且仅全量同步一次,接着将TT的数据订阅生产到ODPS的TTSource表,并将TTSource表合并到增量表中,最初将增量表与全量表进行Merge失去最新的全量表。当前的同步流程仅执行增量表的数据装载以及增量表与全量表的Merge合并,合并的形式是应用全外连贯(full outer join)+数据笼罩(insert overwrite)的形式,比方日调度,即是将当天的增量数据和前一天的全量数据做全外连贯,从新装载获取最新的一份全量数据。

步骤解释

以WDK_MEMBER_PRO_APP.member_round_detail为例

Step1. BPMS审批

Step2.创立TT表

这一步会在TT(类比Kafka)中生成名为wdk_member_pro_app_member_round_detail的Topic,用于存储Binlog变更日志(比方INSERT、UPDATE数据),该Topic的个别命名格局是:${TDDL APP NAME}_${源表名}。值得注意的是,因为一行数据记录的变更日志是有严格程序的,而DataHub/TimeTunnel是分区有序的,所以要保障同一个key的数据进入到Topic的同一个分区,这样能力保障有序,这也是为什么要指定惟一主键的起因。
而后创立TT Source表,该表名为:s_tt_wdk_member_pro_app_member_round_detail_tt4,表名的个别格局是:s_tt_${TDDL APP NAME}_${源表名}_tt4,该表次要用于订阅DataHub/TimeTunnel中的Topic,将DataHub的数据抽取到ODPS中存储(行将曾经采集到TT的数据存储到ODPS上)。该表的content字段存储了变更日志数据,包含一部分的元数据信息和理论的日志数据,对于元数据信息,具体如下:

  • dbsync_ts: 同步机器的工夫戳(ms)+自增序列
  • dbsync_db_name: 物理分库
  • dbsync_table_name: 物理分表
  • dbsync_modify_time: Binlog变更日志工夫,即数据长久化到DB的工夫
  • dbsync_operation: 数据的操作类型,比方insert、update
  • dbsync_region_id: 标识本条数据的地区信息,实用于团体单元化场景。

    Step3.创立增量表

    在ODPS中创立一张名为s_tt_member_round_detail_delta的表,该表的个别命名格局是:${指标表名}_delta。该表用于存储增量的数据,比方按天增量抽取,那么表中的数据就是当天的增量数据。次要解决逻辑是读取TT Source表的当天分区数据,将其写入到该增量表中。详见TTMerge节点工作。

Step4.创立TTMerge节点

该节点的工作名称为tt_${指标表名}_delta,次要作用是向Step3中创立的增量表中装载数据,值得注意的是:因为TT写入的数据是有时序的,例如一条记录在一天被更新N次,则在当日的表中,就有N条记录。对于按天增量的数据来说,只须要最新的数据即可,所以须要依照主键分区排序,分组取最新的数据,这里再次阐明一下主键的重要性,如果主键指定不正确,很容易造成脏数据的产生。装载数据的逻辑是:

INSERT OVERWRITE TABLE s_tt_member_round_detail_delta PARTITION(ds='20210304')
SELECT  id
        ,gmt_create
        ,gmt_modified
        ,user_id
        ,start_date
        ,end_date
        ,STATUS
        ,TYPE
        ,attributes
        ,round
        ,merchant_code
        ,consume_discount
        ,card_id
        ,plan_id
        ,biz_id
FROM    (   
            -- 因为TT写入的数据是有时序的,例如一条记录在一天被更新N次,则在当日的表中,就有N条记录。
            -- 对于按天增量的数据来说,只须要最新的数据即可
            -- 依照主键分区排序,分组取最新的数据
            SELECT  row_number() OVER(PARTITION BY id,user_id,biz_id ORDER BY id,user_id,biz_id,dbsync_ts DESC) AS row_number
                    ,dbsync_operation
                    ,id
                    ,gmt_create
                    ,gmt_modified
                    ,user_id
                    ,start_date
                    ,end_date
                    ,STATUS
                    ,TYPE
                    ,attributes
                    ,round
                    ,merchant_code
                    ,consume_discount
                    ,card_id
                    ,plan_id
                    ,biz_id
            FROM    (
                        SELECT  dbsync_ts
                                ,dbsync_operation
                                ,id
                                ,gmt_create
                                ,gmt_modified
                                ,user_id
                                ,start_date
                                ,end_date
                                ,status
                                ,TYPE
                                ,attributes
                                ,round
                                ,merchant_code
                                ,consume_discount
                                ,card_id
                                ,plan_id
                                ,biz_id
                        FROM    (  -- 获取并解析当天的变更日志数据
                                    SELECT  tt_split(content, 21) AS (dbsync_ts,dbsync_db_name,dbsync_table_name,dbsync_modify_time,dbsync_operation,dbsync_change_fields,id,gmt_create,gmt_modified,user_id,start_date,end_date,status,TYPE,attributes,round,merchant_code,consume_discount,card_id,plan_id,biz_id)
                                    FROM    hm_ods.s_tt_wdk_member_pro_app_member_round_detail_tt4
                                    WHERE   (
                                                    ds > '20210304'
                                                OR  (ds = '20210304' AND (hh > '00' OR (hh = '00' AND mm >= '00')))
                                            )
                                    AND     (
                                                    ds < '20210305'
                                                OR  (ds = '20210305' AND (hh < '00' OR (hh = '00' AND mm < '00')))
                                            )
                                ) t
                        WHERE   t.dbsync_ts != ''
                    ) b
        ) u
WHERE   row_number = 1
;

Step5.TTMerge节点公布

公布TTMerge的工作

Step6.TTMerge节点冒烟

执行TTMerge工作的冒烟测试,在运维核心的测试实例中能够查看。

Step7.创立全量表

创立全量表s_tt_member_round_detail,该表名就是在一键TT接入时填写的指标表名。该表用于存储截止到当天的全量数据,解决逻辑是应用当天的增量数据与昨天的全量数据进行全外连贯,从而merge一份最新的全量数据。

Step8.创立全量表同步节点

创立全量表的同步节点工作,该工作只执行一次,用于首次将源表全量同步至指标表,后续的操作应用增量表与全量表进行Merge的形式来获取最新的全量数据。

Step9.全量表节点公布

公布全量表同步工作。

Step10.全量表节点冒烟

全量表同步工作冒烟测试,能够在运维核心的测试实例中查看,具体的工作名为:imp_s_tt_member_round_detail,其个别格局为:imp_${指标表名}。

Step11.Merge节点生成

生成增量表与全量表的Merge工作,当天的增量数据与昨天的全量数据进行全外连贯,该Merge工作的根本逻辑是:

INSERT OVERWRITE TABLE s_tt_member_round_detail PARTITION(ds='20210305')
SELECT  CASE    WHEN n.id IS NULL THEN o.id 
                ELSE n.id 
        END
        ,CASE    WHEN n.id IS NULL THEN o.gmt_create 
                 ELSE n.gmt_create 
         END
        ,CASE    WHEN n.id IS NULL THEN o.gmt_modified 
                 ELSE n.gmt_modified 
         END
        ,CASE    WHEN n.id IS NULL THEN o.user_id 
                 ELSE n.user_id 
         END
        ,CASE    WHEN n.id IS NULL THEN o.start_date 
                 ELSE n.start_date 
         END
        ,CASE    WHEN n.id IS NULL THEN o.end_date 
                 ELSE n.end_date 
         END
        ,CASE    WHEN n.id IS NULL THEN o.STATUS 
                 ELSE n.STATUS 
         END
        ,CASE    WHEN n.id IS NULL THEN o.TYPE 
                 ELSE n.TYPE 
         END
        ,CASE    WHEN n.id IS NULL THEN o.attributes 
                 ELSE n.attributes 
         END
        ,CASE    WHEN n.id IS NULL THEN o.round 
                 ELSE n.round 
         END
        ,CASE    WHEN n.id IS NULL THEN o.merchant_code 
                 ELSE n.merchant_code 
         END
        ,CASE    WHEN n.id IS NULL THEN o.consume_discount 
                 ELSE n.consume_discount 
         END
        ,CASE    WHEN n.id IS NULL THEN o.card_id 
                 ELSE n.card_id 
         END
        ,CASE    WHEN n.id IS NULL THEN o.plan_id 
                 ELSE n.plan_id 
         END
        ,CASE    WHEN n.id IS NULL THEN o.biz_id 
                 ELSE n.biz_id 
         END
FROM    (
            SELECT  *
            FROM    s_tt_member_round_detail_delta
            WHERE   ds = '20210305'
            AND     id IS NOT NULL
            AND     user_id IS NOT NULL
            AND     biz_id IS NOT NULL
        ) n
FULL OUTER JOIN (-- 全外连贯进行数据merge
                    SELECT  *
                    FROM    s_tt_member_round_detail
                    WHERE   ds = '20210304'
                    AND     id IS NOT NULL
                    AND     user_id IS NOT NULL
                    AND     biz_id IS NOT NULL
                ) o
ON      o.id = n.id
AND     o.user_id = n.user_id
AND     o.biz_id = n.biz_id
;

Step12.Merge节点公布

公布Merge工作

从内网做点笔记 哈哈

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理