乐趣区

关于java:DataX-配置解析

DataX 是阿里的异构数据源离线同步工具,具体的介绍和应用能够参考官网介绍和 Quick Start。DataX 系列次要是更具体的介绍整个运行的原理。

Configuration

DataX 配置的解析包含 job.json、core.json、plugin.json 这三个文件。这三个 json 文件都是多层级的 json 配置的,比方一个 a.b.c= d 的 json,咱们如果通过 json 获取,个别是先通过 a 这个 key 获取到 b.c 的 json,而后再通过 b 这个 key 获取到 c 的 josn,最初通过 c 这个 key 获取到 d,这样代码写起来就很繁琐。

DataX 提供了一个 Configuration 的类,能够间接把 json 进行压平,咱们通过上面的例子来看看。

public static String JSON = "{'a': {'b': {'c':'d'}}}";

public static void main(String[] args) {Configuration configuration = Configuration.from(JSON);
    System.out.println(configuration.get("a.b"));
    System.out.println(configuration.get("a.b.c"));
    System.out.println(configuration.get("a.b.d"));
}

运行后果如下,能够看到通过 Configuration 能够很不便的获取到 json 的多层级的数据。除了 get,还有合并 merge、依据 String 类型取值 getString、获取必填项 getNecessaryValue 等办法,这里就不一一介绍了。

{"c":"d"}
d
null

job.json

job.json 是作业的配置文件,在工作运行之前,通过参数把配置文件的全门路传入,所以名称是能够自定义的。

次要配置的内容包含 job.content.reader、job.content.writer 以及 job.setting.speed,reader 和 writer 能够参考每个对应模块里 resources/plugin_job_template.json 这个文件,也能够通过命令间接获取,这种形式做 Quick Start 里有示例。次要是指定用哪个 reader 进行读取数据,哪个 writer 进行写入数据,以及 reader 和 writer 的相干配置信息。

setting.speed 次要的流速的管制,这个前面再来具体解说。

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {}},
        "writer": {
          "name": "streamwriter",
          "parameter": {}}
      }
    ],
    "setting": {
      "speed": {"channel": 5}
    }
  }
}

core.json

全门路是在 DATAX_HOME/conf/core.json,配置一些全局的信息,比方 taskGroup 的 channel 个数,类型转换就是这里配置的。

{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}},
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {"reportInterval": 10000},
            "taskGroup": {"channel": 5},
            "trace": {"enable": "false"}

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}

plugin.json

plugin.json 的全门路是 DATAX_HOME/plugin/reader/streamreader/plugin.json,这个 streamreader 和下面 job.json 里是对应的关系。

这个文件次要的内容就是 name 和 class,class 就是运行时要用到的插件类。因为会有 reader 和 writer,所以这里会加载两个 plugin.json。

{
    "name": "streamreader",
    "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
    "description": {
        "useScene": "only for developer test.",
        "mechanism": "use datax framework to transport data from stream.",
        "warn": "Never use it in your real job."
    },
    "developer": "alibaba"
}

以上 job.json、core.json、plugin.json 这三个文件加载后会通过 merge 办法进行合并,所以最终失去的 Configuration 就是这些文件的合并信息,前面就是通过 Configuration 来启动插件。

退出移动版