关于数据采集:从实战中了解数据开发全流程DataWorks-OpenAPI实战

简介:DataWorks作为飞天大数据平台操作系统,历经11年倒退,造成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在应用产品的过程中心愿他们的本地服务可能和阿里云上的DataWorks服务进行交互,从而晋升企业大数据处理的效率,缩小人工操作和运维工作,升高数据危险和企业老本,当初DataWorks凋谢OpenAPI能力满足企业的定制化需要。

DataWorks作为飞天大数据平台操作系统,历经11年倒退,造成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在应用产品的过程中心愿他们的本地服务可能和阿里云上的DataWorks服务进行交互,从而晋升企业大数据处理的效率,缩小人工操作和运维工作,升高数据危险和企业老本,当初DataWorks凋谢OpenAPI能力满足企业的定制化需要。
DataWorks OpenAPI涵盖租户、元数据、数据开发、运维核心、数据品质、数据服务等DataWorks外围能力,企业版和旗舰版别离赠送100万次/月、1000万次/月的收费调用额度。

对于Dataworks OpenAPI开明要求和凋谢地区可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上应用立刻开明
开明7天试用请应用钉钉扫码分割

实战简介

咱们假如这样一个简略的场景,开发人员想把RDS库外面的数据同步到一张MaxCompute分区表中,而后在自建零碎的页面上展现通过数据分析后的报表数据,那么如何通过DataWorks OpenAPI去实现整个链路的实现呢?

实战筹备

一、引入DataWorks OpenAPI SDK

这一部分可参考 装置 DataWorks OpenAPI Java SDK,除了java语言,咱们还反对Python,PHP,C#,Go 等语言反对。默认状况下咱们不须要显式去指定DataWorks OpenAPI的EndPoint,然而如果aliyun-java-sdk-core版本偏低的状况下可能会找不到DataWorks OpenAPI的Endpoint,这时候可在不降级版本的状况下通过应用如下代码进行申请。

    IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
    DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
    IAcsClient client = new DefaultAcsClient(profile);

如上代码是显式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com这样的域名格局在公网环境下可拜访,然而有些用户须要在VPC环境下调用OpenAPI,那么则须要把域名dataworks.${regionId}.aliyuncs.com 变更成 dataworks-vpc.${regionId}.aliyuncs.com,这样在VPC网络环境下即便不能拜访公网也能申请到DataWorks OpenAPI。
如果您不分明regionId(地区ID)的概念,可参考地区和可用区。

二、理解DataWorks OpenAPI文档

具体浏览DataWorks OpenAPI文档对开发十分有帮忙,做API开发时如果对参数的束缚不太了解时可参考DataWorks OpenAPI文档,外面对每个出入参、参数示例、错误码形容都有具体的解释。点击查看API参考>>

实战步骤

步骤一:创立RDS数据源

集成租户API可创立引擎、创立数据源、查看我的项目空间等信息。在咱们这个业务场景中,MaxCompute分区表存在于MaxCompute引擎中,咱们在DataWorks管控台创立完MaxCompute工作空间后会主动创立好MaxCompute引擎的数据源,所以咱们只须要应用【CreateConnection】创立好RDS数据源即可:

        CreateConnectionRequest createRequest = new CreateConnectionRequest();

        createRequest.setProjectId(-1L);
        createRequest.setName("TEST_CONNECTION");
        createRequest.setConnectionType("MYSQL");
        createRequest.setEnvType(1);
        createRequest.setContent("{\"password\":\"12345\"}");
        Long connectionId;

        try {
            CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
            Assert.assertNotNull(createResponse.getData());
            connectionId = createResponse.getData();

            UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
            updateRequest.setConnectionId(connectionId);
            updateRequest.setDescription("1");
            UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
            Assert.assertTrue(acsResponse.getData());

            DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
            deleteRequest.setConnectionId(connectionId);
            DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
            Assert.assertTrue(deleteResponse.getData());
        } catch (ClientException e) {
            e.printStackTrace();
            Assert.fail();
        }

UpdateConnection和DeleteConnection可别离批改和删除数据源信息。另外对我的项目空间的成员进行治理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。

步骤二:表的创立

集成DataWorks元数据OpenAPI咱们能治理引擎侧的表信息,通过DataWorks管控台和租户API咱们实现了MaxCompute引擎和RDS数据源的创立工作,下一步须要实现表的创立,可通过元数据的【CreateTable】实现:

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateTableRequest request = new CreateTableRequest();
        request.setTableName("table_test");
        request.setColumnss(new ArrayList<>());
        request.setEndpoint("endpoint");
        CreateTableResponse response = client.getAcsResponse(request);
        String nextTaskId = response.getTaskInfo().getNextTaskId();
        System.out.println(nextTaskId);

对于表治理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可对表进行治理,元数据API还能对表元数据、表主题进行治理,更多详情可参考DataWorks OpenAPI文档。

步骤三:工作开发和公布调度

集成数据开发API可管理文件,并对文件进行提交和公布后生成周期工作,周期工作会定时调度运行,创立不同类型的文件是依据FileType这个字段决定的,目前咱们已反对十分多的FileType,通过运维核心的API【ListProgramTypeCount】可获取所有已反对的零碎节点以及自定义节点。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateFileRequest createFileRequest = new CreateFileRequest();
        createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
        createFileRequest.setInputList(projectIdentifier+"_root");
        createFileRequest.setContent(content);
        createFileRequest.setFileName("create_file_" + caseId);
        createFileRequest.setFileFolderPath("业务流程/POP接口测试/MaxCompute/test_folder_3");
        createFileRequest.setFileDescription("create file " + caseId);
        createFileRequest.setRerunMode("ALL_ALLOWED");
        CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

content字段存储SQL脚本、Shell脚本、数据集成的脚本代码,数据集成的脚本格局可参考通过脚本模式配置工作。
应用【CreateFile】创立完脚本后,如需批改可应用UpdateFile、DeleteFile进行治理。和页面上的操作流程统一的是实现文件开发后得提交和公布文件才会生成周期实例,这里要留神的是须要轮询SubmitFile返回的 DeploymentId,只有当GetDeployment返回的状态是实现时(status.finished())才示意部署胜利。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        //查看提交后果
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }

如果是在规范模式的我的项目下开发,提交实现后,还须要公布文件能力最终提交到调度成为周期工作。公布文件应用DeployFile,和提交文件一样,也须要应用GetDeployment轮询部署状态。

    DeployFileRequest request = new DeployFileRequest();
    request.setFileId(fileId);
    request.setComment("deploy file");
    DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
    //查看公布部署后果
    DeploymentStatus status = null;
    GetDeploymentResponse.Data.Deployment deployment = null;
    int retryTimes = 0;
    while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                    deploymentId, new Gson().toJson(deployment));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
    }

数据开发API除了可对文件治理外,还能治理文件夹、资源、函数,更多详情可参考DataWorks OpenAPI文档。

步骤四:配置运维监控

通过API实现周期工作的生产之后,会在DataWorks平台每天生成调度实例被定时调度运行,应用运维核心API可对周期工作和周期实例进行运维操作,可通过GetNode、GetInstance、ListInstances等API查看周期工作和周期实例,监控实例运行状况。

        GetInstanceRequest request = new GetInstanceRequest();
        request.setInstanceId(INSTANCE_ID);
        request.setProjectEnv(PROJECT_ENV);
        try {
            GetInstanceResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
            BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
            Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
            Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
            Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
            Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
            Assert.assertEquals("", bizInstanceDto.getParamValues());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
            Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
            Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
            Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
            Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }

如果实例运行异样可通过RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance解决。
应用CreateRemind、UpdateRemind等API可创立自定义报警规定,确保每天基线顺利产出,一旦异样可告警告诉到人工,而后染指。

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();
        createRemindRequest.setRemindName("REMIND_CREATE_TEST");
        createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
        createRemindRequest.setRemindType(RemindType.ERROR.name());
        createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
        createRemindRequest.setDndEnd("08:00");
        createRemindRequest.setNodeIds("-1");
        createRemindRequest.setMaxAlertTimes(1);
        createRemindRequest.setAlertInterval(1800);
        createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
        createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
        try { 
            CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
            MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
            Assert.assertTrue(createResponse.getData() > 0);
        } catch (Exception ex) {
            ex.printStackTrace();
            return;
        }

运维核心次要提供周期工作、手动业务流程、基线查问、告警配置和查问等相干API,可参考DataWorks OpenAPI文档。

步骤五:配置数据品质监控

在这个业务场景中,咱们通过后面介绍的API曾经能够每天定时把数据从RDS同步到MaxCompute的表中了。如果咱们放心产生脏数据或者数据缺失影响到线上业务,那么可通过数据品质API来集成DataWorks数据品质监控能力,当表数据产出异样时,能够立即触发给规定订阅人。

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();
        request.setBlockType(0);
        request.setComment("test-createTemplateRuleSuccess");
        request.setCriticalThreshold("50");
        request.setEntityId(entityId);
        request.setOperator("abs");
        request.setPredictType(0);
        request.setProjectName(PROJECT_NAME);
        request.setProperty("table_count");
        request.setPropertyType("table");
        request.setRuleName("createTemplateRuleSuccess");
        request.setRuleType(0);
        request.setTemplateId(7);
        request.setWarningThreshold("10");
        try {
            CreateQualityRuleResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
            Long templateRuleId = Long.parseLong(data.toString());
            Assert.assertTrue(templateRuleId > 0);
            return templateRuleId;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertFalse(true);
            return null;
        }

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等数据品质API集可治理数据品质规定,更多数据品质API可参考DataWorks OpenAPI文档。

步骤六:生成数据服务API

咱们通过元数据API实现了表创立,通过数据开发API实现文件和周期工作创立,通过数据品质和运维核心API配置好了监控规定,MaxCompute分区表数据亦可顺利产生,这时候咱们还须要最初一个步骤把MaxCompute分区表的数据通过数据服务OpenAPI生成一个数据服务API向零碎提供数据服务。

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
        createRequest.setTenantId(tenantId);
        createRequest.setProjectId(projectId);
        createRequest.setApiMode(apiMode);
        createRequest.setApiName(apiName);
        createRequest.setApiPath(apiPath);
        createRequest.setApiDescription("test");
        createRequest.setGroupId(groupId);
        createRequest.setVisibleRange(visibleRange);
        createRequest.setTimeout(10000);
        createRequest.setProtocols(protocols);
        createRequest.setRequestMethod(requestMethod);
        createRequest.setResponseContentType(responseType);

        CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
        Long apiId = createResponse.getData();
        Assert.assertNotNull(apiId);

        GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
        getRequest.setTenantId(tenantId);
        getRequest.setProjectId(projectId);
        getRequest.setApiId(apiId);
        GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
        GetDataServiceApiResponse.Data data = getResponse.getData();
        Assert.assertEquals(apiId, data.getApiId());
        Assert.assertEquals(0L, data.getFolderId().longValue());

应用CreateDataServiceApi、PublishDataServiceApi可把表数据转换成数据服务API,那么整个数据生产链路就实现了,集成以上的DataWorks OpenAPI即实现了本地零碎和云上零碎的无缝对接。

API调试小工具

DataWorks公布的所有API全副可在线调试,并以可见即所得的形式产生源码,这样可大大提高OpenAPI的开发效率,强烈推荐应用。DataWorks OpenAPI调试入口>>

总结

工欲善其数,必先利其器!DataWorks OpenAPI是2020年正式公布的企业数据开发提效神器。通过OpenAPI的形式,可能极大地提高企业应用DataWorks产品能力的灵活性。目前已公布150+个OpenAPI,并且还在继续减少中。本期实战旨在帮忙企业用户理解如何疾速上手DataWorks OpenAPI的实际利用,通过场景化的实战演练体验DataWorks OpenAPI的弱小能力。实战系列内容继续更新中,感激大家的关注!

对于Dataworks OpenAPI开明要求和凋谢地区可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上应用立刻开明

开明7天试用与折扣请应用钉钉扫码分割

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理