DataX是阿里的异构数据源离线同步工具,具体的介绍和应用能够参考官网介绍和Quick Start。DataX系列次要是更具体的介绍整个运行的原理。

Configuration

DataX配置的解析包含job.json、core.json、plugin.json这三个文件。这三个json文件都是多层级的json配置的,比方一个a.b.c=d的json,咱们如果通过json获取,个别是先通过a这个key获取到b.c的json,而后再通过b这个key获取到c的josn,最初通过c这个key获取到d,这样代码写起来就很繁琐。

DataX提供了一个Configuration的类,能够间接把json进行压平,咱们通过上面的例子来看看。

public static String JSON = "{'a': {'b': {'c': 'd'}}}";public static void main(String[] args) {    Configuration configuration = Configuration.from(JSON);    System.out.println(configuration.get("a.b"));    System.out.println(configuration.get("a.b.c"));    System.out.println(configuration.get("a.b.d"));}

运行后果如下,能够看到通过Configuration能够很不便的获取到json的多层级的数据。除了get,还有合并merge、依据String类型取值getString、获取必填项getNecessaryValue等办法,这里就不一一介绍了。

{"c":"d"}dnull

job.json

job.json是作业的配置文件,在工作运行之前,通过参数把配置文件的全门路传入,所以名称是能够自定义的。

次要配置的内容包含job.content.reader、job.content.writer以及job.setting.speed,reader和writer能够参考每个对应模块里resources/plugin_job_template.json这个文件,也能够通过命令间接获取,这种形式做Quick Start里有示例。次要是指定用哪个reader进行读取数据,哪个writer进行写入数据,以及reader和writer的相干配置信息。

setting.speed次要的流速的管制,这个前面再来具体解说。

{  "job": {    "content": [      {        "reader": {          "name": "streamreader",          "parameter": {          }        },        "writer": {          "name": "streamwriter",          "parameter": {          }        }      }    ],    "setting": {      "speed": {        "channel": 5      }    }  }}

core.json

全门路是在DATAX_HOME/conf/core.json,配置一些全局的信息,比方taskGroup的channel个数,类型转换就是这里配置的。

{    "entry": {        "jvm": "-Xms1G -Xmx1G",        "environment": {}    },    "common": {        "column": {            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",            "timeFormat": "HH:mm:ss",            "dateFormat": "yyyy-MM-dd",            "extraFormats":["yyyyMMdd"],            "timeZone": "GMT+8",            "encoding": "utf-8"        }    },    "core": {        "dataXServer": {            "address": "http://localhost:7001/api",            "timeout": 10000,            "reportDataxLog": false,            "reportPerfLog": false        },        "transport": {            "channel": {                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",                "speed": {                    "byte": -1,                    "record": -1                },                "flowControlInterval": 20,                "capacity": 512,                "byteCapacity": 67108864            },            "exchanger": {                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",                "bufferSize": 32            }        },        "container": {            "job": {                "reportInterval": 10000            },            "taskGroup": {                "channel": 5            },            "trace": {                "enable": "false"            }        },        "statistics": {            "collector": {                "plugin": {                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",                    "maxDirtyNumber": 10                }            }        }    }}

plugin.json

plugin.json的全门路是DATAX_HOME/plugin/reader/streamreader/plugin.json,这个streamreader和下面job.json里是对应的关系。

这个文件次要的内容就是name和class,class就是运行时要用到的插件类。因为会有reader和writer,所以这里会加载两个plugin.json。

{    "name": "streamreader",    "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",    "description": {        "useScene": "only for developer test.",        "mechanism": "use datax framework to transport data from stream.",        "warn": "Never use it in your real job."    },    "developer": "alibaba"}

以上job.json、core.json、plugin.json这三个文件加载后会通过merge办法进行合并,所以最终失去的Configuration就是这些文件的合并信息,前面就是通过Configuration来启动插件。