共计 2194 个字符,预计需要花费 6 分钟才能阅读完成。
摘要:本文为您介绍如何利用 DataWorks 数据集成直接从 MongoDB 提取 JSON 字段到 MaxCompute。
数据及账号准备
首先您需要将数据上传至您的 MongoDB 数据库。本例中使用阿里云的云数据库 MongoDB 版,网络类型为 VPC(需申请公网地址,否则无法与 DataWorks 默认资源组互通),测试数据如下。
{
“store”: {
“book”: [
{
“category”: “reference”,
“author”: “Nigel Rees”,
“title”: “Sayings of the Century”,
“price”: 8.95
},
{
“category”: “fiction”,
“author”: “Evelyn Waugh”,
“title”: “Sword of Honour”,
“price”: 12.99
},
{
“category”: “fiction”,
“author”: “J. R. R. Tolkien”,
“title”: “The Lord of the Rings”,
“isbn”: “0-395-19395-8”,
“price”: 22.99
}
],
“bicycle”: {
“color”: “red”,
“price”: 19.95
}
},
“expensive”: 10
}
登录 MongoDB 的 DMS 控制台,本例中使用的数据库为 admin,集合为 userlog,您可以在查询窗口使用 db.userlog.find().limit(10) 命令查看已上传好的数据,如下图所示。此外,需提前在数据库内新建用户,用于 DataWorks 添加数据源。本例中使用命令 db.createUser({user:”bookuser”,pwd:”123456″,roles:[“root”]}),新建用户名为 bookuser,密码为 123456,权限为 root。
使用 DataWorks 提取数据到 MaxCompute
新增 MongoDB 数据源进入 DataWorks 数据集成控制台,新增 MongoDB 类型数据源。
具体参数如下所示,测试数据源连通性通过即可点击完成。由于本文中 MongoDB 处于 VPC 环境下,因此 数据源类型需选择 有公网 IP。
访问地址及端口号可通过在 MongoDB 管理控制台点击实例名称获取,如下图所示。
新建数据同步任务在 DataWorks 上新建数据同步类型节点。
新建的同时,在 DataWorks 新建一个 [建表任务](https://help.aliyun.com/document_detail/85302.html#concept-qlb-gd4-q2b “ 本文向您介绍如何对数据表进行创建、提交和查询。”),用于存放 JSON 数据,本例中新建表名为 mqdata。
表参数可以通过图形化界面完成。本例中 mqdata 表仅有一列,类型为 string,列名为 MQ data。
完成上述新建后,您可以在图形化界面进行数据同步任务参数的初步配置,如下图所示。选择目标数据源名称为 odps_first,选择目标表为刚建立的 mqdata。数据来源类型为 MongoDB,选择我们刚创建的数据源 mongodb_userlog。完成上述配置后,点击转换为脚本,跳转到脚本模式。
脚本模式代码示例如下。
{
“type”: “job”,
“steps”: [
{
“stepType”: “mongodb”,
“parameter”: {
“datasource”: “mongodb_userlog”,
// 数据源名称
“column”: [
{
“name”: “store.bicycle.color”, //JSON 字段路径,本例中提取 color 值
“type”: “document.document.string” // 本栏目的字段数需和 name 一致。假如您选取的 JSON 字段为一级字段,如本例中的 expensive,则直接填写 string 即可。
}
],
“collectionName // 集合名称 ”: “userlog”
},
“name”: “Reader”,
“category”: “reader”
},
{
“stepType”: “odps”,
“parameter”: {
“partition”: “”,
“isCompress”: false,
“truncate”: true,
“datasource”: “odps_first”,
“column”: [
//MaxCompute 表列名 “mqdata”
],
“emptyAsNull”: false,
“table”: “mqdata”
},
“name”: “Writer”,
“category”: “writer”
}
],
“version”: “2.0”,
“order”: {
“hops”: [
{
“from”: “Reader”,
“to”: “Writer”
}
]
},
“setting”: {
“errorLimit”: {
“record”: “”
},
“speed”: {
“concurrent”: 2,
“throttle”: false,
“dmu”: 1
}
}
}
完成上述配置后,点击运行接即可。运行成功日志示例如下所示。
结果验证
在您的业务流程中新建一个 ODPS SQL 节点。
您可以输入 SELECT * from mqdata; 语句,查看当前 mqdata 表中数据。当然这一步您也可以直接在 MaxCompute 客户端中输入命令运行。
本文作者:付帅阅读原文
本文为云栖社区原创内容,未经允许不得转载。