共计 2329 个字符,预计需要花费 6 分钟才能阅读完成。
本文为您介绍如何利用 DataWorks 数据集成将 JSON 数据从 OSS 迁移到 MaxCompute,并使用 MaxCompute 内置字符串函数 GET_JSON_OBJECT 提取 JSON 信息。
数据上传 OSS
将您的 JSON 文件重命名后缀为 TXT 文件,并上传到 OSS。本文中使用的 JSON 文件示例如下。
{
“store”: {
“book”: [
{
“category”: “reference”,
“author”: “Nigel Rees”,
“title”: “Sayings of the Century”,
“price”: 8.95
},
{
“category”: “fiction”,
“author”: “Evelyn Waugh”,
“title”: “Sword of Honour”,
“price”: 12.99
},
{
“category”: “fiction”,
“author”: “J. R. R. Tolkien”,
“title”: “The Lord of the Rings”,
“isbn”: “0-395-19395-8”,
“price”: 22.99
}
],
“bicycle”: {
“color”: “red”,
“price”: 19.95
}
},
“expensive”: 10
}
将 applog.txt 文件上传到 OSS,本文中 OSS Bucket 位于华东 2 区。
使用 DataWorks 导入数据到 MaxCompute
新增 OSS 数据源
进入 DataWorks 数据集成控制台,新增 OSS 类型数据源。
具体参数如下所示,测试数据源连通性通过即可点击完成。Endpoint 地址请参见 OSS 各区域的外网、内网地址,本例中为 http://oss-cn-shanghai.aliyun… http://oss-cn-shanghai-internal.aliyuncs.com(由于本文中 OSS 和 DataWorks 项目处于同一个 region 中,本文选用后者,通过内网连接)。
新建数据同步任务
在 DataWorks 上新建数据同步类型节点。
新建的同时,在 DataWorks 新建一个建表任务,用于存放 JSON 数据,本例中新建表名为 mqdata。
表参数可以通过图形化界面完成。本例中 mqdata 表仅有一列,类型为 string,列名为 MQ data。
完成上述新建后,您可以在图形化界面配置数据同步任务参数,如下图所示。选择目标数据源名称为 odps_first,选择目标表为刚建立的 mqdata。数据来源类型为 OSS,Object 前缀可填写文件路径及名称。列分隔符使用 TXT 文件中不存在的字符即可,本文中使用 ^(对于 OSS 中的 TXT 格式数据源,Dataworks 支持多字符分隔符,所以您可以使用例如 %&%#^$$^% 这样很难出现的字符作为列分隔符,保证分割为一列)。
映射方式选择默认的同行映射即可。
点击左上方的切换脚本按钮,切换为脚本模式。修改 fileFormat 参数为:“fileFormat”:”binary”。该步骤可以保证 OSS 中的 JSON 文件同步到 MaxCompute 之后存在同一行数据中,即为一个字段。其他参数保持不变,脚本模式代码示例如下。
{
“type”: “job”,
“steps”: [
{
“stepType”: “oss”,
“parameter”: {
“fieldDelimiterOrigin”: “^”,
“nullFormat”: “”,
“compress”: “”,
“datasource”: “OSS_userlog”,
“column”: [
{
“name”: 0,
“type”: “string”,
“index”: 0
}
],
“skipHeader”: “false”,
“encoding”: “UTF-8”,
“fieldDelimiter”: “^”,
“fileFormat”: “binary”,
“object”: [
“applog.txt”
]
},
“name”: “Reader”,
“category”: “reader”
},
{
“stepType”: “odps”,
“parameter”: {
“partition”: “”,
“isCompress”: false,
“truncate”: true,
“datasource”: “odps_first”,
“column”: [
“mqdata”
],
“emptyAsNull”: false,
“table”: “mqdata”
},
“name”: “Writer”,
“category”: “writer”
}
],
“version”: “2.0”,
“order”: {
“hops”: [
{
“from”: “Reader”,
“to”: “Writer”
}
]
},
“setting”: {
“errorLimit”: {
“record”: “”
},
“speed”: {
“concurrent”: 2,
“throttle”: false,
“dmu”: 1
}
}
}
完成上述配置后,点击运行接即可。运行成功日志示例如下所示。
获取 JSON 字段信息
在您的业务流程中新建一个 ODPS SQL 节点。
您可以首先输入 SELECT*from mqdata; 语句,查看当前 mqdata 表中数据。当然这一步及后续步骤,您也可以直接在 MaxCompute 客户端中输入命令运行。
确认导入表中的数据结果无误后,您可以使用 MaxCompute 内建字符串函数 GET_JSON_OBJECT 获取您想要的 JSON 数据。本例中使用 SELECT GET_JSON_OBJECT(mqdata.MQdata,’$.expensive’) FROM mqdata; 获取 JSON 文件中的 expensive 值。如下图所示,可以看到已成功获取数据。
本文作者:付帅阅读原文
本文为云栖社区原创内容,未经允许不得转载。