关于程序员:华为云FusionInsight-MRS-FlinkSQL-复杂嵌套Json解析最佳实践

9次阅读

共计 5218 个字符,预计需要花费 14 分钟才能阅读完成。

背景阐明

随着流计算的倒退,挑战不再仅限于数据量和计算量,业务变得越来越简单,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何进步开发者的效率,升高流计算的门槛,对推广实时计算十分重要。

SQL 是数据处理中应用最宽泛的语言,它容许用户简明扼要地展现其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 反对全副利用场景,Flink SQL 的实现也齐全遵循 ANSI SQL 规范。之前,用户可能须要编写上百行业务代码,应用 SQL 后,可能只须要几行 SQL 就能够轻松搞定。

本文介绍如何应用华为 FusionInsight MRS FlinkServer 服务进行界面化的 FlinkSQL 编辑,从而解决简单的嵌套 Json 格局

Json 内容

上面以 cdl 新增数据的 json 为例

{
    "schema":{
        "type":"struct",
        "fields":[
            {
                "type":"string",
                "optional":false,
                "field":"DATA_STORE"
            },
            {
                "type":"string",
                "optional":false,
                "field":"SEG_OWNER"
            },
            {
                "type":"string",
                "optional":false,
                "field":"TABLE_NAME"
            },
            {
                "type":"int64",
                "optional":false,
                "name":"org.apache.kafka.connect.data.Timestamp",
                "version":1,
                "field":"TIMESTAMP"
            },
            {
                "type":"string",
                "optional":false,
                "field":"OPERATION"
            },
            {
                "type":"string",
                "optional":true,
                "field":"LOB_COLUMNS"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"array",
                        "items":{
                            "type":"struct",
                            "fields":[
                                {
                                    "type":"string",
                                    "optional":false,
                                    "field":"name"
                                },
                                {
                                    "type":"string",
                                    "optional":true,
                                    "field":"value"
                                }
                            ],
                            "optional":false
                        },
                        "optional":false,
                        "field":"properties"
                    }
                ],
                "optional":false,
                "name":"transaction",
                "field":"transaction"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"int64",
                        "optional":false,
                        "field":"uid"
                    }
                ],
                "optional":true,
                "name":"unique",
                "field":"unique"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"int64",
                        "optional":false,
                        "field":"uid"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "default":"","field":"uname"
                    },
                    {
                        "type":"int64",
                        "optional":true,
                        "field":"age"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"sex"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"mostlike"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"lastview"
                    },
                    {
                        "type":"int64",
                        "optional":true,
                        "field":"totalcost"
                    }
                ],
                "optional":true,
                "name":"data",
                "field":"data"
            },
            {
                "type":"struct",
                "fields":[ ],
                "optional":true,
                "name":"EMPTY",
                "field":"before"
            },
            {
                "type":"string",
                "optional":true,
                "field":"HEARTBEAT_IDENTIFIER"
            }
        ],
        "optional":false,
        "name":"hudi.hudisource"
    },
    "payload":{
        "DATA_STORE":"MYSQL",
        "SEG_OWNER":"hudi",
        "TABLE_NAME":"hudisource",
        "TIMESTAMP":1631070742000,
        "OPERATION":"INSERT",
        "LOB_COLUMNS":"","transaction":{"properties":[
                {
                    "name":"file",
                    "value":"mysql-bin.000005"
                },
                {
                    "name":"pos",
                    "value":"32307"
                },
                {
                    "name":"gtid",
                    "value":""
                }
            ]
        },
        "unique":{"uid":11},
        "data":{
            "uid":11,
            "uname":"蒋语堂",
            "age":38,
            "sex":"女",
            "mostlike":"图",
            "lastview":"播放器",
            "totalcost":28732
        },
        "before":null,
        "HEARTBEAT_IDENTIFIER":"998d66cc-1405-40e2-bbdc-41f2adf40724"
    }
}

下面的数据信息为简单的 json 嵌套构造,蕴含了 Map、Array、Row 等类型, 对于这样的简单格局须要有一种高效的形式进行解析,上面介绍如何实现。

华为 FusionInsight MRS Flink WebUI 介绍

Flink WebUI 提供基于 Web 的可视化开发平台,用户只须要编写 SQL 即可开发作业,极大升高作业开发门槛。同时通过作业平台能力凋谢,反对业务人员自行编写 SQL 开发作业来疾速应答需要,大大减少 Flink 作业开发工作量。

Flink WebUI 次要有以下特点:

  • 企业级可视化运维:运维治理界面化、作业监控、作业开发 Flink SQL 标准化等。
  • 疾速建设集群连贯:通过集群连接功能配置拜访一个集群,须要客户端配置、用户认证密钥文件。
  • 疾速建设数据连贯:通过数据连接功能配置拜访一个组件。创立“数据连贯类型”为“HDFS”类型时需创立集群连贯,其余数据连贯类型的“认证类型”为“KERBEROS”需创立集群连贯,“认证类型”为“SIMPLE”不需创立集群连贯。
  • 可视化开发平台:反对自定义输出 / 输入映射表,满足不同输出起源、不同输入指标端的需要。
  • 图形化作业管理:简略易用。

上面介绍如何应用 Flink WebUI 开发 FlinkSQL DDL 语句解析出无效信息

操作步骤

  • 登录华为 FusionInisght MRS Flink WebUI
  • 在作业管理抉择新建作业创立一个 FlinkSQL 工作
  • 编辑 Flink SQL 语句

SQL 阐明:创立两张 kafka 流表,起作用为从 kafka 源端读取 cdl 对应 topic,解析出须要的字段。并将后果写入另外一个 kafka topic

  1. Json 中的每个 {} 都须要用 Row 类型来示意
  2. Json 中的每个 [] 都须要用 Arrary 类型来示意
  3. 数组的下标是从 1 开始的不是 0 如上面 SQL 中的 schema.fields[1].type
  4. 关键字在任何中央都须要加反引号 如下面 SQL 中的 type
  5. select 语句中的字段类型和程序肯定要和后果表的字段类型和程序保持一致
  6. 可应用 flink 函数比方 LOCALTIMESTAMP 为获取 flink 零碎工夫

CREATE TABLE huditableout_source(
  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT> >,
  type1 as `schema`.`fields`[1].type,
  optional1 as `schema`.`fields`[1].optional,
  field1 as `schema`.`fields`[1].field,
  type2 as `schema`.`fields`[2].type,
  optional2 as `schema`.`fields`[2].optional,
  field2 as `schema`.`fields`[2].field,
  ts as payload.`TIMESTAMP`,
  uid as payload.`data`.uid,
  uname as payload.`data`.uname,
  age as payload.`data`.age,
  sex as payload.`data`.sex,
  mostlike as payload.`data`.mostlike,
  lastview as payload.`data`.lastview,
  totalcost as payload.`data`.totalcost,
  localts as LOCALTIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(type1 VARCHAR(32),
  optional1 BOOLEAN,
  field1 VARCHAR(32),
  type2 VARCHAR(32),
  optional2 BOOLEAN,
  field2 VARCHAR(32),
  ts BIGINT,
  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT,
  localts TIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout2',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
  huditableout
  select
  type1,
  optional1,
  field1,
  type2,
  optional2,
  field2,
  ts,
  uid,
  uname,
  age,
  sex,
  mostlike,
  lastview,
  totalcost,
  localts
from
  huditableout_source;
  • 点击语义校验,确保语义校验通过
  • 启动该 Flink SQL 工作
  • 查看后果

源端 kafka 数据

指标端 kafka 数据

本文由华为云公布

正文完
 0