关于数据采集:从实战中了解数据开发全流程DataWorks-OpenAPI实战

70次阅读

共计 12907 个字符,预计需要花费 33 分钟才能阅读完成。

简介:DataWorks 作为飞天大数据平台操作系统,历经 11 年倒退,造成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在应用产品的过程中心愿他们的本地服务可能和阿里云上的 DataWorks 服务进行交互,从而晋升企业大数据处理的效率,缩小人工操作和运维工作,升高数据危险和企业老本,当初 DataWorks 凋谢 OpenAPI 能力满足企业的定制化需要。

DataWorks 作为飞天大数据平台操作系统,历经 11 年倒退,造成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在应用产品的过程中心愿他们的本地服务可能和阿里云上的 DataWorks 服务进行交互,从而晋升企业大数据处理的效率,缩小人工操作和运维工作,升高数据危险和企业老本,当初 DataWorks 凋谢 OpenAPI 能力满足企业的定制化需要。
DataWorks OpenAPI 涵盖租户、元数据、数据开发、运维核心、数据品质、数据服务等 DataWorks 外围能力,企业版和旗舰版别离赠送 100 万次 / 月、1000 万次 / 月的收费调用额度。

对于 Dataworks OpenAPI 开明要求和凋谢地区可查阅 DataWorks OpenAPI 概述
限 DataWorks 企业版及以上应用立刻开明
开明 7 天试用请应用钉钉扫码分割

实战简介

咱们假如这样一个简略的场景,开发人员想把 RDS 库外面的数据同步到一张 MaxCompute 分区表中,而后在自建零碎的页面上展现通过数据分析后的报表数据,那么如何通过 DataWorks OpenAPI 去实现整个链路的实现呢?

实战筹备

一、引入 DataWorks OpenAPI SDK

这一部分可参考 装置 DataWorks OpenAPI Java SDK,除了 java 语言,咱们还反对 Python,PHP,C#,Go 等语言反对。默认状况下咱们不须要显式去指定 DataWorks OpenAPI 的 EndPoint,然而如果 aliyun-java-sdk-core 版本偏低的状况下可能会找不到 DataWorks OpenAPI 的 Endpoint,这时候可在不降级版本的状况下通过应用如下代码进行申请。

    IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
    DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
    IAcsClient client = new DefaultAcsClient(profile);

如上代码是显式地指定了 DataWorks OpenAPI 的 EndPoint,dataworks.${regionId}.aliyuncs.com 这样的域名格局在公网环境下可拜访,然而有些用户须要在 VPC 环境下调用 OpenAPI,那么则须要把域名 dataworks.${regionId}.aliyuncs.com 变更成 dataworks-vpc.${regionId}.aliyuncs.com,这样在 VPC 网络环境下即便不能拜访公网也能申请到 DataWorks OpenAPI。
如果您不分明 regionId(地区 ID)的概念,可参考地区和可用区。

二、理解 DataWorks OpenAPI 文档

具体浏览 DataWorks OpenAPI 文档对开发十分有帮忙,做 API 开发时如果对参数的束缚不太了解时可参考 DataWorks OpenAPI 文档,外面对每个出入参、参数示例、错误码形容都有具体的解释。点击查看 API 参考 >>

实战步骤

步骤一:创立 RDS 数据源

集成租户 API 可创立引擎、创立数据源、查看我的项目空间等信息。在咱们这个业务场景中,MaxCompute 分区表存在于 MaxCompute 引擎中,咱们在 DataWorks 管控台创立完 MaxCompute 工作空间后会主动创立好 MaxCompute 引擎的数据源,所以咱们只须要应用【CreateConnection】创立好 RDS 数据源即可:

        CreateConnectionRequest createRequest = new CreateConnectionRequest();

        createRequest.setProjectId(-1L);
        createRequest.setName("TEST_CONNECTION");
        createRequest.setConnectionType("MYSQL");
        createRequest.setEnvType(1);
        createRequest.setContent("{\"password\":\"12345\"}");
        Long connectionId;

        try {CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
            Assert.assertNotNull(createResponse.getData());
            connectionId = createResponse.getData();

            UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
            updateRequest.setConnectionId(connectionId);
            updateRequest.setDescription("1");
            UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
            Assert.assertTrue(acsResponse.getData());

            DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
            deleteRequest.setConnectionId(connectionId);
            DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
            Assert.assertTrue(deleteResponse.getData());
        } catch (ClientException e) {e.printStackTrace();
            Assert.fail();}

UpdateConnection 和 DeleteConnection 可别离批改和删除数据源信息。另外对我的项目空间的成员进行治理的 API 集是 CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。

步骤二:表的创立

集成 DataWorks 元数据 OpenAPI 咱们能治理引擎侧的表信息,通过 DataWorks 管控台和租户 API 咱们实现了 MaxCompute 引擎和 RDS 数据源的创立工作,下一步须要实现表的创立,可通过元数据的【CreateTable】实现:

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateTableRequest request = new CreateTableRequest();
        request.setTableName("table_test");
        request.setColumnss(new ArrayList<>());
        request.setEndpoint("endpoint");
        CreateTableResponse response = client.getAcsResponse(request);
        String nextTaskId = response.getTaskInfo().getNextTaskId();
        System.out.println(nextTaskId);

对于表治理的 API 集是 CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable 等,除了可对表进行治理,元数据 API 还能对表元数据、表主题进行治理,更多详情可参考 DataWorks OpenAPI 文档。

步骤三:工作开发和公布调度

集成数据开发 API 可管理文件,并对文件进行提交和公布后生成周期工作,周期工作会定时调度运行,创立不同类型的文件是依据 FileType 这个字段决定的,目前咱们已反对十分多的 FileType,通过运维核心的 API【ListProgramTypeCount】可获取所有已反对的零碎节点以及自定义节点。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateFileRequest createFileRequest = new CreateFileRequest();
        createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
        createFileRequest.setInputList(projectIdentifier+"_root");
        createFileRequest.setContent(content);
        createFileRequest.setFileName("create_file_" + caseId);
        createFileRequest.setFileFolderPath("业务流程 /POP 接口测试 /MaxCompute/test_folder_3");
        createFileRequest.setFileDescription("create file" + caseId);
        createFileRequest.setRerunMode("ALL_ALLOWED");
        CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

content 字段存储 SQL 脚本、Shell 脚本、数据集成的脚本代码,数据集成的脚本格局可参考通过脚本模式配置工作。
应用【CreateFile】创立完脚本后,如需批改可应用 UpdateFile、DeleteFile 进行治理。和页面上的操作流程统一的是实现文件开发后得提交和公布文件才会生成周期实例,这里要留神的是须要轮询 SubmitFile 返回的 DeploymentId,只有当 GetDeployment 返回的状态是实现时(status.finished())才示意部署胜利。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        // 查看提交后果
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }

如果是在规范模式的我的项目下开发,提交实现后,还须要公布文件能力最终提交到调度成为周期工作。公布文件应用 DeployFile,和提交文件一样,也须要应用 GetDeployment 轮询部署状态。

    DeployFileRequest request = new DeployFileRequest();
    request.setFileId(fileId);
    request.setComment("deploy file");
    DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
    // 查看公布部署后果
    DeploymentStatus status = null;
    GetDeploymentResponse.Data.Deployment deployment = null;
    int retryTimes = 0;
    while (retryTimes < 6) {GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                    deploymentId, new Gson().toJson(deployment));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
    }

数据开发 API 除了可对文件治理外,还能治理文件夹、资源、函数,更多详情可参考 DataWorks OpenAPI 文档。

步骤四:配置运维监控

通过 API 实现周期工作的生产之后,会在 DataWorks 平台每天生成调度实例被定时调度运行,应用运维核心 API 可对周期工作和周期实例进行运维操作,可通过 GetNode、GetInstance、ListInstances 等 API 查看周期工作和周期实例,监控实例运行状况。

        GetInstanceRequest request = new GetInstanceRequest();
        request.setInstanceId(INSTANCE_ID);
        request.setProjectEnv(PROJECT_ENV);
        try {GetInstanceResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
            BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
            Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
            Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
            Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
            Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
            Assert.assertEquals("", bizInstanceDto.getParamValues());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
            Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
            Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
            Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
            Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
        } catch (Exception e) {e.printStackTrace();
            Assert.fail();}

如果实例运行异样可通过 RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance 解决。
应用 CreateRemind、UpdateRemind 等 API 可创立自定义报警规定,确保每天基线顺利产出,一旦异样可告警告诉到人工,而后染指。

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();
        createRemindRequest.setRemindName("REMIND_CREATE_TEST");
        createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
        createRemindRequest.setRemindType(RemindType.ERROR.name());
        createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
        createRemindRequest.setDndEnd("08:00");
        createRemindRequest.setNodeIds("-1");
        createRemindRequest.setMaxAlertTimes(1);
        createRemindRequest.setAlertInterval(1800);
        createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
        createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
        try {CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
            MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
            Assert.assertTrue(createResponse.getData() > 0);
        } catch (Exception ex) {ex.printStackTrace();
            return;
        }

运维核心次要提供周期工作、手动业务流程、基线查问、告警配置和查问等相干 API,可参考 DataWorks OpenAPI 文档。

步骤五:配置数据品质监控

在这个业务场景中,咱们通过后面介绍的 API 曾经能够每天定时把数据从 RDS 同步到 MaxCompute 的表中了。如果咱们放心产生脏数据或者数据缺失影响到线上业务,那么可通过数据品质 API 来集成 DataWorks 数据品质监控能力,当表数据产出异样时,能够立即触发给规定订阅人。

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();
        request.setBlockType(0);
        request.setComment("test-createTemplateRuleSuccess");
        request.setCriticalThreshold("50");
        request.setEntityId(entityId);
        request.setOperator("abs");
        request.setPredictType(0);
        request.setProjectName(PROJECT_NAME);
        request.setProperty("table_count");
        request.setPropertyType("table");
        request.setRuleName("createTemplateRuleSuccess");
        request.setRuleType(0);
        request.setTemplateId(7);
        request.setWarningThreshold("10");
        try {CreateQualityRuleResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
            Long templateRuleId = Long.parseLong(data.toString());
            Assert.assertTrue(templateRuleId > 0);
            return templateRuleId;
        } catch (Exception e) {e.printStackTrace();
            Assert.assertFalse(true);
            return null;
        }

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode 等数据品质 API 集可治理数据品质规定,更多数据品质 API 可参考 DataWorks OpenAPI 文档。

步骤六:生成数据服务 API

咱们通过元数据 API 实现了表创立,通过数据开发 API 实现文件和周期工作创立,通过数据品质和运维核心 API 配置好了监控规定,MaxCompute 分区表数据亦可顺利产生,这时候咱们还须要最初一个步骤把 MaxCompute 分区表的数据通过数据服务 OpenAPI 生成一个数据服务 API 向零碎提供数据服务。

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
        createRequest.setTenantId(tenantId);
        createRequest.setProjectId(projectId);
        createRequest.setApiMode(apiMode);
        createRequest.setApiName(apiName);
        createRequest.setApiPath(apiPath);
        createRequest.setApiDescription("test");
        createRequest.setGroupId(groupId);
        createRequest.setVisibleRange(visibleRange);
        createRequest.setTimeout(10000);
        createRequest.setProtocols(protocols);
        createRequest.setRequestMethod(requestMethod);
        createRequest.setResponseContentType(responseType);

        CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
        Long apiId = createResponse.getData();
        Assert.assertNotNull(apiId);

        GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
        getRequest.setTenantId(tenantId);
        getRequest.setProjectId(projectId);
        getRequest.setApiId(apiId);
        GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
        GetDataServiceApiResponse.Data data = getResponse.getData();
        Assert.assertEquals(apiId, data.getApiId());
        Assert.assertEquals(0L, data.getFolderId().longValue());

应用 CreateDataServiceApi、PublishDataServiceApi 可把表数据转换成数据服务 API,那么整个数据生产链路就实现了,集成以上的 DataWorks OpenAPI 即实现了本地零碎和云上零碎的无缝对接。

API 调试小工具

DataWorks 公布的所有 API 全副可在线调试,并以可见即所得的形式产生源码,这样可大大提高 OpenAPI 的开发效率,强烈推荐应用。DataWorks OpenAPI 调试入口 >>

总结

工欲善其数,必先利其器!DataWorks OpenAPI 是 2020 年正式公布的企业数据开发提效神器。通过 OpenAPI 的形式,可能极大地提高企业应用 DataWorks 产品能力的灵活性。目前已公布 150+ 个 OpenAPI,并且还在继续减少中。本期实战旨在帮忙企业用户理解如何疾速上手 DataWorks OpenAPI 的实际利用,通过场景化的实战演练体验 DataWorks OpenAPI 的弱小能力。实战系列内容继续更新中,感激大家的关注!

对于 Dataworks OpenAPI 开明要求和凋谢地区可查阅 DataWorks OpenAPI 概述
限 DataWorks 企业版及以上应用立刻开明

开明 7 天试用与折扣 请应用钉钉扫码分割

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0