关于云计算:DolphinScheduler-调度-DataX-实现-MySQL-To-ElasticSearch-增量数据同步实践

7次阅读

共计 3445 个字符,预计需要花费 9 分钟才能阅读完成。

数据同步的形式

数据同步的 2 大形式

  • 基于 SQL 查问的 CDC(Change Data Capture):

    • 离线调度查问作业,批处理。把一张表同步到其余零碎,每次通过查问去获取表中最新的数据。也就是咱们说的基于 SQL 查问抽取;
    • 无奈保障数据一致性,查的过程中有可能数据曾经产生了屡次变更;
    • 不保障实时性,基于离线调度存在人造的提早;
    • 工具软件以 Kettle(Apache Hop 最新版)、DataX 为代表, 须要联合任务调度零碎应用。
  • 基于日志的 CDC:

    • 实时生产日志,流解决,例如 MySQL 的 binlog 日志残缺记录了数据库中的变更,能够把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件蕴含了所有历史变更明细;
    • 保障实时性,因为相似 binlog 的日志文件是能够流式生产的,提供的是实时数据;
    • 工具软件以 Flink CDC、阿里巴巴 Canal、Debezium 为代表。

基于 SQL 查问增量数据同步原理

咱们思考用 SQL 如何查问增量数据?数据有减少、批改、删除
删除数据采纳逻辑删除的形式,比方定义一个 is_deleted 字段标识逻辑删除
如果数据是 UPDATE 的,也就是会被批改的,那么 where update_datetime >= last_datetime(调度滚动工夫)就是增量数据
如果数据是 APPEND ONLY 的除了用更新工夫还能够用 where id >= 调度上次 last_id

联合任务调度零碎
调度工夫是每日调度执行一次,那么 last_datetime = 以后调度开始执行工夫 – 24 小时,提早就是 1 天
调度工夫是 15 分钟一次,那么 last_datetime = 以后调度开始执行工夫 – 15 分钟,提早就是 15 分钟

这样就实现了捕捉增量数据,从而实现增量同步

DolphinScheduler + Datax 构建离线增量数据同步平台

本实际应用
单机 8c16g
DataX 2022-03-01 官网下载
DolphinScheduler 2.0.3(DolphinScheduler 的装置过程略,请参考官网)

DolphinScheduler 中设置好 DataX 环境变量
DolphinScheduler 提供了可视化的作业流程定义,用来离线定时调度 DataX Job 作业,应用起来很是顺滑

基于 SQL 查问离线数据同步的用武之地
为什么不必基于日志实时的形式?不是不必,而是依据场合用。思考到业务理论需要状况,基于 SQL 查问这种离线的形式也并非齐全淘汰了
特地是业务上实时性要求不高,每次调度增量数据没那么大的状况下,不须要分布式架构来负载,这种状况下是比拟适合的抉择
场景举例:
网站、APP 的百万级、千万级的内容搜寻,每天几百篇内容新增 + 批改,搜寻上会用到 ES(ElasticSearch),那么就须要把 MySQL 内容数据增量同步到 ES
DataX 就能满足需要!

DolphinScheduler 中配置 DataX MySQL To ElasticSearch 工作流

工作流定义

工作流定义 > 创立工作流 > 拖入 1 个 SHELL 组件 > 拖入 1 个 DATAX 组件
SHELL 组件(文章)
脚本

echo '文章同步 MySQL To ElasticSearch'

DATAX 组件 (t_article)
用到 2 个插件 mysqlreader、elasticsearchwriter^[1]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/ 你的数据库?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": ["select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >='${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {"endpoint": "${biz_es_host}",
                        "accessId": "${biz_es_username}",
                        "accessKey": "${biz_es_password}",
                        "index": "t_article",
                        "type": "_doc",
                        "batchSize": 1000,
                        "cleanup": false,
                        "discovery": false,
                        "dynamic": true,
                        "settings": {
                            "index": {
                                "number_of_replicas": 0,
                                "number_of_shards": 1
                            }
                        },
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "id",
                                "type": "long"
                            },
                            {
                                "name": "title",
                                "type": "text"
                            },
                            {
                                "name": "content",
                                "type": "text"
                            }
                            {
                                "name": "is_delete",
                                "type": "text"
                            },
                            {
                                "name": "delete_date",
                                "type": "date"
                            },
                            {
                                "name": "create_date",
                                "type": "date"
                            },
                            {
                                "name": "update_date",
                                "type": "date"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader 和 writer 的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate} 
biz_mysql_host: 你的 mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的 mysql 账号
biz_mysql_password: 你的 mysql 明码
biz_es_host: 你的 es 地址带协定和端口 http://127.0.0.1:9200
biz_es_username: 你的 es 账号
biz_es_password: 你的 es 明码

配置的自定义参数将会主动替换 json 模板中的同名变量

reader mysqlreader 插件中要害配置:a.update_date >= '${biz_update_dt}' 就是实现增量同步的要害配置
writer elasticsearchwriter 插件中要害配置:“

"column": [
    {
        "name": "pk",
        "type": "id"
    },
    ......
]

type = id 这样配置,就把文章主键映射到 es 主键 _id 从而实现雷同主键 id 反复写入数据,就会更新数据。如果不这样配置数据将会反复导入 es 中

保留工作流

全局变量设置
global_bizdate:$[yyyy-MM-dd 00:00:00-1]

global_bizdate 援用的变量为 DolphinScheduler 内置变量,具体参考官网文档 ^[2]
联合调度工夫设计好工夫滚动的窗口时长,比方按 1 天增量,那么这里工夫就是减 1 天

最终的工作流 DAG 图为:

by 流水理鱼 |wwek

参考

1. DataX ElasticSearchWriter 插件文档
2. Apache DolphinScheduler 内置参数
本文首发于流水理鱼博客,如要转载请注明出处。
欢送关注我的公众号:流水理鱼(liushuiliyu),全栈、云原生、Homelab 交换。
如果您对相干文章感兴趣,也能够关注我的博客:www.iamle.com 下面有更多内容

正文完
 0