关于数据库:基于DataX的数据同步下应用DataX进行数据同步

57次阅读

共计 9069 个字符,预计需要花费 23 分钟才能阅读完成。

作者:烧鸡太子爷

起源:恒生 LIGHT 云社区

后面做了 DataX 介绍以及装置,上面咱们用一个实例来阐明下 DataX 的运行,也整顿了一些理论使用的过程中的留神点

DataX 运行

运行命令

(1)查看模板:python ${DATAX_HOME}\bin\datax.py -r 读插件类型 -w 写插件类型

    举例: mysql 导入到 orcale 数据库   python ${DATAX_HOME}\bin\datax.py -r mysqlreader -w oraclewriter(2)执行工作(通过读 json 配置文件配置运行):python ${DATAX_HOME}\bin\datax.py {json 配置文件}

举例(从 pgsql 数据传输到 mysql)

(1)查看模板

    python datax.py -r postgresqlreader -w mysqlwriter

    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    Please refer to the mysqlreader document:
    https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
    Please refer to the postgresqlwriter document:
    https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md
    Please save the following configuration as a json file and  use
    python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
    to run the job.
    {
        "job": {
        "content": [
        {
            "reader": {                      // 读入库配置
                "name": "postgresqlreader",       // 数据源名称
                "parameter": {               // 数据库配置信息
                    "column": [],
                    "connection": [
                        {"jdbcUrl": [],
                            "table": []}
                    ],
                    "password": "","username":"",
                    "where": ""            // 数据过滤条件
                }
            },
            "writer": {                     // 写入库配置
                "name": "mysqlwriter", // 数据库名称
                "parameter": {              // 数据库配置信息
                    "column": [],
                    "connection": [
                        {"jdbcUrl": "","table": []
                        }
                    ],
                    "password": "","postSql": [],         // 工作执行后操作"preSql": [],          // 工作执行前操作"username":""  
                }
            }
        }
    ],
    "setting": {      // 根本配置
        "speed": {    // 流量管制
            "channel": "" // 并发数, 提供了通道 (并发)、记录流、字节流三种流控模式
        }
    }
  }//                       还能够配置脏数据管制等配置
}

(2)依据模板编辑配置

{
"job": {
    "setting": {
        "speed": {
            "channel": 3,
            "byte": 1048576
        },
        "errorLimit": {
            "record": 0,
            "percentage": 0.02
        }
    },
    "content": [{
        "reader": {
            "name": "postgresqlreader",
            "parameter": {
                "username": "authenticator",
                "password": "financegtn104",
                "column": [
                    "\"id\"","\"name\"",
                    "\"content_type_id\"","\"codename\""
                ],
                "splitPk": "","connection": [{"table": ["public.auth_permission"],"jdbcUrl": ["jdbc:postgresql://10.20.64.241:5433/finance"]
                }]
            }
        },
        "writer": {
            "name": "mysqlwriter",
            "parameter": {
                "username": "root",
                "password": "hundsun1234",
                "column": [
                    "`id`",
                    "`name`",
                    "`content_type_id`",
                    "`codename`"
                ],
                "connection": [{
                    "table": ["auth_permission"],
                    "jdbcUrl": "jdbc:mysql://10.20.64.151:3306/datax_test"
                }]
            }
        }
    }]
}
}

(3)执行 Job

python ./bin/datax.py ./job/mysql2pgsql.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

2020-08-05 13:01:22.518 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2020-08-05 13:01:22.524 [main] INFO  Engine - the machine info  =>
    osInfo: Oracle Corporation 1.8 25.181-b13
    jvmInfo:        Linux amd64 3.10.0-862.11.6.el7.x86_64
    cpu num:        8
    totalPhysicalMemory:    -0.00G
    freePhysicalMemory:     -0.00G
    maxFileDescriptorCount: -1
    currentOpenFileDescriptorCount: -1
    GC Names        [PS MarkSweep, PS Scavenge]
    MEMORY_NAME                    | allocation_size                | init_size              
    PS Eden Space                  | 256.00MB                       | 256.00MB               
    Code Cache                     | 240.00MB                       | 2.44MB                 
    Compressed Class Space         | 1,024.00MB                     | 0.00MB                 
    PS Survivor Space              | 42.50MB                        | 42.50MB                
    PS Old Gen                     | 683.00MB                       | 683.00MB               
    Metaspace                      | -0.00MB                        | 0.00MB                 


2020-08-05 13:01:22.541 [main] INFO  Engine -
{
    "content":[
            {
                    "reader":{
                            "name":"postgresqlreader",
                            "parameter":{
                                    "column":[
                                            "\"id\"","\"name\"",
                                            "\"content_type_id\"","\"codename\""
                                    ],
                                    "connection":[
                                            {
                                                    "jdbcUrl":["jdbc:postgresql://10.20.64.241:5433/finance"],
                                                    "table":["public.auth_permission"]
                                            }
                                    ],
                                    "password":"*************",
                                    "splitPk":"","username":"authenticator"
                            }
                    },
                    "writer":{
                            "name":"mysqlwriter",
                            "parameter":{
                                    "column":[
                                            "`id`",
                                            "`name`",
                                            "`content_type_id`",
                                            "`codename`"
                                    ],
                                    "connection":[
                                            {
                                                    "jdbcUrl":"jdbc:mysql://10.20.64.151:3306/datax_test",
                                                    "table":["auth_permission"]
                                            }
                                    ],
                                    "password":"***********",
                                    "username":"root"
                            }
                    }
            }
    ],
    "setting":{
            "errorLimit":{
                    "percentage":0.02,
                    "record":0
            },
            "speed":{
                    "byte":1048576,
                    "channel":3
            }
    }
}
2020-08-05 13:01:22.557 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2020-08-05 13:01:22.559 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2020-08-05 13:01:22.559 [main] INFO  JobContainer - DataX jobContainer starts job.
2020-08-05 13:01:22.560 [main] INFO  JobContainer - Set jobId = 0
2020-08-05 13:01:22.676 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:postgresql://10.20.64.241:5433/finance.
2020-08-05 13:01:22.704 [job-0] INFO  OriginalConfPretreatmentUtil - table:[public.auth_permission] has columns:[id,name,content_type_id,codename].
2020-08-05 13:01:22.917 [job-0] INFO  OriginalConfPretreatmentUtil - table:[auth_permission] all columns:[id,name,content_type_id,codename].
2020-08-05 13:01:22.924 [job-0] INFO  OriginalConfPretreatmentUtil - Write data [INSERT INTO %s (`id`,`name`,`content_type_id`,`codename`) VALUES(?,?,?,?)
], which jdbcUrl like:[jdbc:mysql://10.20.64.151:3306/datax_test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]
2020-08-05 13:01:22.925 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2020-08-05 13:01:22.925 [job-0] INFO  JobContainer - DataX Reader.Job [postgresqlreader] do prepare work .
2020-08-05 13:01:22.926 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .
2020-08-05 13:01:22.926 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2020-08-05 13:01:22.927 [job-0] INFO  JobContainer - Job set Max-Byte-Speed to 1048576 bytes.
2020-08-05 13:01:22.929 [job-0] INFO  JobContainer - DataX Reader.Job [postgresqlreader] splits to [1] tasks.
2020-08-05 13:01:22.930 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.
2020-08-05 13:01:22.944 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2020-08-05 13:01:22.947 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2020-08-05 13:01:22.948 [job-0] INFO  JobContainer - Running by standalone Mode.
2020-08-05 13:01:22.955 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2020-08-05 13:01:22.958 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2020-08-05 13:01:22.958 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2020-08-05 13:01:22.964 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2020-08-05 13:01:22.967 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select "id","name","content_type_id","codename" from public.auth_permission] jdbcUrl:[jdbc:postgresql://10.20.64.241:5433/finance].
2020-08-05 13:01:23.063 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select "id","name","content_type_id","codename" from public.auth_permission] jdbcUrl:[jdbc:postgresql://10.20.64.241:5433/finance].
2020-08-05 13:01:23.365 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[402]ms
2020-08-05 13:01:23.366 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2020-08-05 13:01:32.966 [job-0] INFO  StandAloneJobContainerCommunicator - Total 24 records, 803 bytes | Speed 80B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-05 13:01:32.966 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2020-08-05 13:01:32.967 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2020-08-05 13:01:32.967 [job-0] INFO  JobContainer - DataX Reader.Job [postgresqlreader] do post work.
2020-08-05 13:01:32.967 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2020-08-05 13:01:32.968 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /home/dataX/datax/hook
2020-08-05 13:01:32.968 [job-0] INFO  JobContainer -
     [total cpu info] =>
            averageCpu                     | maxDeltaCpu                    | minDeltaCpu            
            -1.00%                         | -1.00%                         | -1.00%

     [total gc info] =>
             NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime   
             PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s     
             PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s     
2020-08-05 13:01:32.969 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-08-05 13:01:32.969 [job-0] INFO  StandAloneJobContainerCommunicator - Total 24 records, 803 bytes | Speed 80B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-05 13:01:32.969 [job-0] INFO  JobContainer -
工作启动时刻                    : 2020-08-05 13:01:22
工作完结时刻                    : 2020-08-05 13:01:32
工作总计耗时                    :                 10s
工作均匀流量                    :               80B/s
记录写入速度                    :              2rec/s
读出记录总数                    :                  24
读写失败总数                    :                   0  

DataX-Web 运行

编辑操作太麻烦?是否有如 ketter 一样提供图形界面生产配置文件,是否有工作对立治理?

DataX Web 是 DataX 的集成可视化页面零碎,抉择数据源即可一键生成数据同步工作,反对批量创 建 RDBMS 数据同步工作,集成开源调度零碎,反对分布式、增量同步数据、实时查看运行日志、监控执行器资源、KILL 运行过程等 相较于 kettle,DataX-Web 间接通过页面一键生成工作,实时查看运行状况,同步效率更快等劣势

(1)环境筹备
* 
    MySQL (5.5+) 必选,对应客户端能够选装, Linux 服务上若装置 mysql 的客户端能够通过部署脚本疾速初始化数据库
* 
    JDK (1.8.0_xxx) 必选
* 
    DataX 必选
* 
Python (2.x) (反对 Python3 须要批改替换 datax/bin 上面的三个 python 文件,替换文件在 doc/datax-web/datax-python3 下) 必选,次要用于调度执行底层 DataX 的启动脚本,默认的形式是以 Java 子过程形式执行 DataX,用户能够抉择以 Python 形式来做自定义的革新(2)装置
   1、源码编译  2、官网编译好的包。解压包后,进入解压后的目录,找到 bin 目录上面的 install.sh 文件,如果抉择交互式的装置,则间接执行./bin/install.sh

留神点:

 官网的数据库脚本装置的 job_info 表短少 user_id 字段,界面点击的时候会报错找不到 user_id 字段谬误,手动减少 user_id 字段,类型为 int,长度为 11。Alert(ALTER TABLE job_info ADD  user_id int(11);)

具体参考教程:https://github.com/WeiYe-Jing…

安装包阐明

最初在浏览器输出 IP:port 就能够看到 DataX-Web 的页面了

DataX 和 kettle 比照

1)Kettle 领有本人的治理控制台,能够间接在客户端进行 etl 工作制订,不过是 CS 架构,而不反对 BS 浏览器模式。DataX 并没有界面(DataX-Web 曾经扩大,反对 BS)

2)反对的数据库,都反对的比拟齐全,kettle 反对的应该更多,DataX 是阿里开发,能够更好地反对阿里本身的数据库系列,如 ODPS、ADS 等

3)Kettle 曾经退出 BI 组织 Pentaho,退出后 kettle 的开发粒度和被关注度更进一步晋升

4)DataX 开源的反对粒度不高,关注度远没有 kettle 高,代码提交次数更是少的很。

5)依据网上参考信息,网友测试 kettle 全量抽取较大数据量时,抽取工夫长,比照测试 datax 比 kettle 快。

其余

  • DataX 插件开源还撑持其余扩大,能够下载源码本人编译:
    Datax 反对 ElasticSearch Reader(官网曾经撑持 Writer,但不反对 Reader)
    Datax 反对增量 postgresql writeMode update
    Datax 反对 Redis Writer
    扩大源码下载地址:https://github.com/WeiYe-Jing…
  • PostgreSql 没有反对增量配置,针对有批改的增量数据能够采纳将增量数据放在长期表中(与指标表同库),通过表比对将批改的数据删除再插入,新增数据直接插入


想向技术大佬们多多取经?开发中遇到的问题何处探讨?如何获取金融科技海量资源?

恒生 LIGHT 云社区,由恒生电子搭建的金融科技业余社区平台,分享实用技术干货、资源数据、金融科技行业趋势,拥抱所有金融开发者。

扫描下方小程序二维码,退出咱们!

正文完
 0