背景阐明

随着流计算的倒退,挑战不再仅限于数据量和计算量,业务变得越来越简单,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何进步开发者的效率,升高流计算的门槛,对推广实时计算十分重要。

SQL 是数据处理中应用最宽泛的语言,它容许用户简明扼要地展现其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 反对全副利用场景,Flink SQL 的实现也齐全遵循 ANSI SQL 规范。之前,用户可能须要编写上百行业务代码,应用 SQL 后,可能只须要几行 SQL 就能够轻松搞定。

本文介绍如何应用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而解决简单的嵌套Json格局

Json内容

上面以cdl新增数据的json为例

{    "schema":{        "type":"struct",        "fields":[            {                "type":"string",                "optional":false,                "field":"DATA_STORE"            },            {                "type":"string",                "optional":false,                "field":"SEG_OWNER"            },            {                "type":"string",                "optional":false,                "field":"TABLE_NAME"            },            {                "type":"int64",                "optional":false,                "name":"org.apache.kafka.connect.data.Timestamp",                "version":1,                "field":"TIMESTAMP"            },            {                "type":"string",                "optional":false,                "field":"OPERATION"            },            {                "type":"string",                "optional":true,                "field":"LOB_COLUMNS"            },            {                "type":"struct",                "fields":[                    {                        "type":"array",                        "items":{                            "type":"struct",                            "fields":[                                {                                    "type":"string",                                    "optional":false,                                    "field":"name"                                },                                {                                    "type":"string",                                    "optional":true,                                    "field":"value"                                }                            ],                            "optional":false                        },                        "optional":false,                        "field":"properties"                    }                ],                "optional":false,                "name":"transaction",                "field":"transaction"            },            {                "type":"struct",                "fields":[                    {                        "type":"int64",                        "optional":false,                        "field":"uid"                    }                ],                "optional":true,                "name":"unique",                "field":"unique"            },            {                "type":"struct",                "fields":[                    {                        "type":"int64",                        "optional":false,                        "field":"uid"                    },                    {                        "type":"string",                        "optional":true,                        "default":"",                        "field":"uname"                    },                    {                        "type":"int64",                        "optional":true,                        "field":"age"                    },                    {                        "type":"string",                        "optional":true,                        "field":"sex"                    },                    {                        "type":"string",                        "optional":true,                        "field":"mostlike"                    },                    {                        "type":"string",                        "optional":true,                        "field":"lastview"                    },                    {                        "type":"int64",                        "optional":true,                        "field":"totalcost"                    }                ],                "optional":true,                "name":"data",                "field":"data"            },            {                "type":"struct",                "fields":[                ],                "optional":true,                "name":"EMPTY",                "field":"before"            },            {                "type":"string",                "optional":true,                "field":"HEARTBEAT_IDENTIFIER"            }        ],        "optional":false,        "name":"hudi.hudisource"    },    "payload":{        "DATA_STORE":"MYSQL",        "SEG_OWNER":"hudi",        "TABLE_NAME":"hudisource",        "TIMESTAMP":1631070742000,        "OPERATION":"INSERT",        "LOB_COLUMNS":"",        "transaction":{            "properties":[                {                    "name":"file",                    "value":"mysql-bin.000005"                },                {                    "name":"pos",                    "value":"32307"                },                {                    "name":"gtid",                    "value":""                }            ]        },        "unique":{            "uid":11        },        "data":{            "uid":11,            "uname":"蒋语堂",            "age":38,            "sex":"女",            "mostlike":"图",            "lastview":"播放器",            "totalcost":28732        },        "before":null,        "HEARTBEAT_IDENTIFIER":"998d66cc-1405-40e2-bbdc-41f2adf40724"    }}

下面的数据信息为简单的json嵌套构造,蕴含了 Map、Array、Row 等类型, 对于这样的简单格局须要有一种高效的形式进行解析,上面介绍如何实现。

华为FusionInsight MRS Flink WebUI介绍

Flink WebUI提供基于Web的可视化开发平台,用户只须要编写SQL即可开发作业,极大升高作业开发门槛。同时通过作业平台能力凋谢,反对业务人员自行编写SQL开发作业来疾速应答需要,大大减少Flink作业开发工作量。

Flink WebUI次要有以下特点:

  • 企业级可视化运维:运维治理界面化、作业监控、作业开发Flink SQL标准化等。
  • 疾速建设集群连贯:通过集群连接功能配置拜访一个集群,须要客户端配置、用户认证密钥文件。
  • 疾速建设数据连贯:通过数据连接功能配置拜访一个组件。创立“数据连贯类型”为“HDFS”类型时需创立集群连贯,其余数据连贯类型的“认证类型”为“KERBEROS”需创立集群连贯,“认证类型”为“SIMPLE”不需创立集群连贯。
  • 可视化开发平台:反对自定义输出/输入映射表,满足不同输出起源、不同输入指标端的需要。
  • 图形化作业管理:简略易用。

上面介绍如何应用Flink WebUI开发FlinkSQL DDL语句解析出无效信息

操作步骤

  • 登录华为FusionInisght MRS Flink WebUI
  • 在作业管理抉择新建作业创立一个FlinkSQL工作
  • 编辑Flink SQL语句

SQL阐明:创立两张kafka流表,起作用为从kafka源端读取cdl对应topic,解析出须要的字段。并将后果写入另外一个kafka topic

  1. Json 中的每个 {} 都须要用 Row 类型来示意
  2. Json 中的每个 [] 都须要用 Arrary 类型来示意
  3. 数组的下标是从 1 开始的不是 0 如上面 SQL 中的 schema.fields[1].type
  4. 关键字在任何中央都须要加反引号 如下面 SQL 中的 type
  5. select 语句中的字段类型和程序肯定要和后果表的字段类型和程序保持一致
  6. 可应用flink函数比方LOCALTIMESTAMP为获取flink零碎工夫

CREATE TABLE huditableout_source(  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,  uname VARCHAR(32),  age INT,  sex VARCHAR(30),  mostlike VARCHAR(30),  lastview VARCHAR(30),  totalcost INT> >,  type1 as `schema`.`fields`[1].type,  optional1 as `schema`.`fields`[1].optional,  field1 as `schema`.`fields`[1].field,  type2 as `schema`.`fields`[2].type,  optional2 as `schema`.`fields`[2].optional,  field2 as `schema`.`fields`[2].field,  ts as payload.`TIMESTAMP`,  uid as payload.`data`.uid,  uname as payload.`data`.uname,  age as payload.`data`.age,  sex as payload.`data`.sex,  mostlike as payload.`data`.mostlike,  lastview as payload.`data`.lastview,  totalcost as payload.`data`.totalcost,  localts as LOCALTIMESTAMP) WITH(  'connector' = 'kafka',  'topic' = 'huditableout',  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',  'properties.group.id' = 'example',  'scan.startup.mode' = 'latest-offset',  'format' = 'json',  'json.fail-on-missing-field' = 'false',  'json.ignore-parse-errors' = 'true',  'properties.sasl.kerberos.service.name' = 'kafka',  'properties.security.protocol' = 'SASL_PLAINTEXT',  'properties.kerberos.domain.name' = 'hadoop.hadoop.com');CREATE TABLE huditableout(  type1 VARCHAR(32),  optional1 BOOLEAN,  field1 VARCHAR(32),  type2 VARCHAR(32),  optional2 BOOLEAN,  field2 VARCHAR(32),  ts BIGINT,  uid INT,  uname VARCHAR(32),  age INT,  sex VARCHAR(30),  mostlike VARCHAR(30),  lastview VARCHAR(30),  totalcost INT,  localts TIMESTAMP) WITH(  'connector' = 'kafka',  'topic' = 'huditableout2',  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',  'properties.group.id' = 'example',  'scan.startup.mode' = 'latest-offset',  'format' = 'json',  'json.fail-on-missing-field' = 'false',  'json.ignore-parse-errors' = 'true',  'properties.sasl.kerberos.service.name' = 'kafka',  'properties.security.protocol' = 'SASL_PLAINTEXT',  'properties.kerberos.domain.name' = 'hadoop.hadoop.com');insert into  huditableout  select  type1,  optional1,  field1,  type2,  optional2,  field2,  ts,  uid,  uname,  age,  sex,  mostlike,  lastview,  totalcost,  localtsfrom  huditableout_source;
  • 点击语义校验,确保语义校验通过
  • 启动该Flink SQL工作
  • 查看后果

源端kafka 数据

指标端kafka 数据

本文由华为云公布