乐趣区

关于数据库:数据平台SQL开发详解与函数使用

云智慧 AIOps 社区是由云智慧发动,针对运维业务场景,提供算法、算力、数据集整体的服务体系及智能运维业务场景的解决方案交换社区。该社区致力于流传 AIOps 技术,旨在与各行业客户、用户、研究者和开发者们独特解决智能运维行业技术难题,推动 AIOps 技术在企业中落地,建设衰弱共赢的 AIOps 开发者生态。

Flink SQL 动静表

创立 Kafka 动静表

下图为在 Flink 里创立 kafka 动静表。晓得 kafka 的信息和数据格式信息后创立 kafka 表,在建表语句的最初一个字段,咱们增加了一个 kafka topic 元数据信息:create_time(数据写入 kafka 的工夫),基于以上操作便能够实现 kafka 动静表创立,后续便能够在 Flink SQL 里对 topic 进行数据读取或者写入。

  • Kafka 地址:10.2.3.14:9092
  • Topic 名称:r_01
  • 消费者组 id:8001
  • 样例数据:订单 1 购买了商品 1,生产金额 1 元

{“order_id”:1,”product_id”:1,”trans_amount”:1}

创立 Clickhouse 动静表

下图为在 Flink 里创立 Clickhouse 动静表。此时能够看到 Clickhouse 的表构造,蕴含相干字段的数据类型和主键信息,与 Flink SQL 建表语句中的字段、数据格式和主键也一一对应。WITH 外面是 Clickhouse 的连贯信息和数据操作的配置信息

  • Clickhouse 地址:10.2.3.14:8123
  • 数据库名称:default
  • 数据表:product_sale
  • 表构造及样例数据如下:

创立 Redis 动静表

下图为在 Flink 里创立 Redis 动静表。因为 Redis 表设计初衷是用于做维表,故必须蕴含可供数据关联的主键和用于补齐数据的一般字段,在建表语句里体现为必须设置一个或多个主键,还必须具备一个或多个的一般字段。数据在 Redis 中的存储是 HASH 格局,能够应用 HGETALL 查看数据内容。

  • redis 地址:10.2.3.39:3301
  • Key 前缀:index:product_sale
  • redis 例数据如下:

Flink SQL 连贯参数

连贯 Kafka

  • connector:指定连接器类型,固定值 kafka。
  • topic:指须要生产或写入数据的 topic。
  • bootstrap.servers:kafka 连贯地址,能够填写多个,以逗号宰割。
  • Broker 地址:在集群失常运行时填写一个或多个节点均可读取到数据。此外,当 kafka 节点较多,topic 分区较少时填写一个节点,当 topic 分区并不在该节点上时,也可能读取到数据。需注意,当 kafka 服务呈现问题时,如果个别节点服务中断,填写多个 broker 地址能够进步抗危险能力。
  • 生产模式

    • earliest-offset 对应平台中的从头开始生产。工作的每次启动都会依照从前往后的程序读取 topic 内现有的所有数据。然而这个程序是绝对的,如果 topic 有多个分区,存在肯定的数据歪斜,那数据较少的分区从数据工夫上来看会读的绝对快一些。kafka 数据的读取是依照分区来读的;
    • latest-offset 对应平台中的从以后开始生产。工作在启动时会从最新的数据开始读取;
    • group-offsets 对应平台中的依照 group offset 生产。这种模式下,工作在第一次启动时会读取最新的数据,在后续工作重启时,会接着上次运行完结时解决到的数据点位持续解决,这种模式也是 kafka 消费者的默认生产模式。该模式须要配合设置 group id,kafka 会依照 group id 把解决数据的偏移量记录下来。因为是 kafka 记录着偏移量,故 group id 能够跨平台、跨利用来应用。比方当有一个 java 工作须要做 kafka 数据的长久化且由 flink 来实现,此时 flink 工作应用即可用与 java 工作雷同的 group id 来实现工作平滑切换,做到无数据失落、无数据反复;
  • Group id:用于记录解决数据的偏移量,在工作重启或异样复原的时候持续从断点开始解决数据。
  • Value.format:kafka 音讯格局。罕用的为 csv、json、raw 以及多种 cdc 格局。
  • Ignore-parse-errors:解析失败的起因蕴含多种,比方局部数据格式不是 json,此时便会抛弃整条数据。需注意,如果其中一条 json 数据中的一个字段格局与建表语句中的格局不同,强转失败时只会影响这一个数据字段,不影响其它字段的解析。
  • Fail-on-missing-field:当 kafka 音讯中短少 create table 中定义的字段时是否终止 flink 工作。

连贯 Clickhouse

url:这里能够填写单个 jdbc url,示意以集群的形式写入逻辑表;也能够填写多个 jdbc url,多个 url 应用逗号距离,示意以轮询的形式写入 Clickhouse 本地表;

Table-name:表名只能有一个,在轮询写入本地表的时候,url 连贯和数据库能够雷同或不同,比方同一个 url 上的不同库,然而表名必须雷同;

Flush-max-rows:配合应用能够实现 flink 到 clickhouse 的同步输入、半同步输入、异步输入

连贯 Redis

仅反对 redis 的 hash 构造,具体数据结构如下:

hash key:{key-prefix}{key-spacer}{k1}{key-spacer}{k2}

hash field:schema 中除了 key 字段之外的其余字段

hash value:存储 key 之外其余字段对应的值,flink redis schema 反对的类型:STRING、BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、DOUBLE

Flink SQL 函数

Flink SQL 函数分为内置函数和自定义函数

自定义函数:

ScalarFunction:行级数据处理,一行的一列或多列数据处理输入一个数据

TableFunction:也是行级数据处理,承受一个或多个参数输出,然而能够输入多行多列数据

AggregateFunction:聚合函数,配合 group by 应用,可依据多行多列数据计算输入一个指标值

TableAggregateFunction:表聚合函数,配合 group by 应用,依据多行多列数据计算输入多行多列数据,表聚合函数目前还不能应用在 FlinkSQL 中,实用于 Flink Table API

操作示例:

这里以水果的 ID 获取到水果名称,演示一下自定义函数的用法。Udf 能够应用 Java 代码来开发,具备 java 语言的个性,比方多态。示例里的类继承 Flink 的 ScalarFunction,实现了一个 eval 函数,自定义函数的编写较为简单。此外,还能够依据 Java 的多态个性编写多个 eval 函数,实现多类型数据的解决。编写好的自定义函数,打成 jar 包后通过平台的资源库页面上传至平台,在编写数据处理工作时,应用 create temporary function 的形式引入该自定义函数,即可应用。

Flink SQL 案例

需要形容

对 topic r_01 中的水果销售流水进行统计,失去每种水果每分钟的销售额,将计算结果别离输入到 kafka、clickhouse、redis。

topic 中数据格式:{“order_id”:1, “product_id”:1, “trans_amount”:1}

计算结果应蕴含如下字段:{“product_id”:1, “product_name”:” 苹果 ”, “create_minute”:’2021-12-02 12:00:00’, “trans_amount”:3}

具体操作

kafka to kafka

  1. 创立一个 kafka source
  2. 创立一个 kafka sink
  3. 编写数据处理 sql

kafka to clickhouse

  1. 创立一个 kafka source
  2. 创立一个 clickhouse sink
  3. 编写数据处理 sql

kafka to redis

  1. 创立一个 kafka source
  2. 创立一个 redis sink
  3. 编写数据处理 sql

Checkpoint

状态的作用

Checkpoint 是 Flink 存储工作运行状态的一个检查点,状态是 flink 的一等公民,能够让程序记住运行的两头后果,以便工作异样时的重启复原

Flink 利用异样示例

  • 实时统计当日订单总额的程序异常中断,从状态复原不须要从 0 点开始从新计算
  • 实时 ETL 同步 kafka 数据到内部存储异常中断,从状态复原则不须要从头生产

状态数据的存储

  • 能够保留在内存中,当状态数据过大,内存 Oom
  • 保留在长久化的文件系统中,比方本地或者 hdfs
  • 通过状态过期工夫管制 flink 利用的状态大小
  • 状态数据须要周期性地保留下来,用于故障复原

如何从检查点复原

  • 读取最近一次 checkpoint 中的状态数据,比方累计销售额 sum 值为 8000 元
  • 读取最近一次 checkpoint 中提交的 offset,比方 partition 0,offset 1000
  • 上述状态数据表明,flink 应用程序在生产到(0,1000)这个位点时统计的销售额为 8000 元
  • 利用恢复正常后,从(0,1001)开始生产,sum 从 8000 开始累加

检查点外部状态数据的一致性

  • 外部状态数据一致性语义:准确统一或者至多一次
  • 同样是上述样例,如果是准确一致性语义,则 sum 值对每条 kafka 音讯只统计一次,如果是至多一次,则 sum 值的统计后果有可能偏高
  • 如果 topic 只有一个分区,则是准确统一,因为 flink 连贯 kafka source 的并行度为分区数,在并行度为 1 的状况下不存在多流不同时达到的状况
  • Kafka 多分区状况下,flink 默认是多并行度,此时设置为 至多一次语义,再加上多流很大概率不会同时达到,会导致统计后果偏高。

检查点设置

建设 flink 流工作时可选抉择是否开启检查点并设置检查点周期。开启检查点有两个作用,一是在工作运行发生意外主动重启的时候会从检查点复原,能够确保工作从异样点持续计算,保持数据连贯性与准确性;另一个作用是在手动进行工作后,再次启动的时候,能够抉择是否从上一个检查点继续执行工作。

检查点复原

工作中断后再次启动时能够抉择是否从最近一个检查点复原状态数据。目前反对固定提早和失败比率的重启策略,别离对应固定重启次数和一段时间内失败次数超过阈值则不再重启。

福利放送

云智慧以开源集轻量级、聚合型、智能运维为一体的综合运维治理平台 OMP(Operation Management Platform),具备纳管、部署、监控、巡检、自愈、备份、复原等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员工作效率的同时,极大晋升业务的连续性和安全性。

点击下方地址链接,欢送大家给 OMP 点赞送 Star,理解更多相干内容~

GitHub 地址:https://github.com/CloudWise-OpenSource/OMP

Gitee 地址:https://gitee.com/CloudWise/OMP

微信扫描辨认下方二维码,备注【OMP】退出 AIOps 社区运维治理平台 OMP 开发者交换群,与 OMP 我的项目 PMC 面对面交换~

退出移动版