云智慧 AIOps 社区是由云智慧发动,针对运维业务场景,提供算法、算力、数据集整体的服务体系及智能运维业务场景的解决方案交换社区。该社区致力于流传 AIOps 技术,旨在与各行业客户、用户、研究者和开发者们独特解决智能运维行业技术难题,推动 AIOps 技术在企业中落地,建设衰弱共赢的AIOps 开发者生态。

Flink SQL动静表

创立Kafka动静表

下图为在Flink里创立kafka动静表。晓得kafka的信息和数据格式信息后创立kafka表,在建表语句的最初一个字段,咱们增加了一个kafka topic元数据信息:create_time(数据写入kafka的工夫),基于以上操作便能够实现kafka动静表创立,后续便能够在Flink SQL里对topic进行数据读取或者写入。

  • Kafka地址:10.2.3.14:9092
  • Topic名称:r_01
  • 消费者组id:8001
  • 样例数据:订单1购买了商品1,生产金额1元

{"order_id":1,"product_id":1,"trans_amount":1}

创立Clickhouse动静表

下图为在Flink里创立Clickhouse动静表。此时能够看到Clickhouse的表构造,蕴含相干字段的数据类型和主键信息,与Flink SQL建表语句中的字段、数据格式和主键也一一对应。 WITH外面是Clickhouse的连贯信息和数据操作的配置信息

  • Clickhouse地址:10.2.3.14:8123
  • 数据库名称:default
  • 数据表:product_sale
  • 表构造及样例数据如下:

创立Redis动静表

下图为在Flink里创立Redis动静表。因为Redis表设计初衷是用于做维表,故必须蕴含可供数据关联的主键和用于补齐数据的一般字段,在建表语句里体现为必须设置一个或多个主键,还必须具备一个或多个的一般字段。 数据在Redis中的存储是HASH格局,能够应用HGETALL查看数据内容。

  • redis地址:10.2.3.39:3301
  • Key前缀:index:product_sale
  • redis例数据如下:

Flink SQL连贯参数

连贯Kafka

  • connector:指定连接器类型,固定值kafka。
  • topic:指须要生产或写入数据的topic。
  • bootstrap.servers:kafka连贯地址,能够填写多个,以逗号宰割。
  • Broker地址:在集群失常运行时填写一个或多个节点均可读取到数据。此外,当kafka节点较多,topic分区较少时填写一个节点,当topic分区并不在该节点上时,也可能读取到数据。需注意,当kafka服务呈现问题时,如果个别节点服务中断,填写多个broker地址能够进步抗危险能力。
  • 生产模式

    • earliest-offset对应平台中的从头开始生产。工作的每次启动都会依照从前往后的程序读取topic内现有的所有数据。然而这个程序是绝对的,如果topic有多个分区,存在肯定的数据歪斜,那数据较少的分区从数据工夫上来看会读的绝对快一些。kafka数据的读取是依照分区来读的;
    • latest-offset对应平台中的从以后开始生产。工作在启动时会从最新的数据开始读取;
    • group-offsets对应平台中的依照group offset生产。这种模式下,工作在第一次启动时会读取最新的数据,在后续工作重启时,会接着上次运行完结时解决到的数据点位持续解决,这种模式也是kafka消费者的默认生产模式。该模式须要配合设置group id,kafka会依照group id把解决数据的偏移量记录下来。因为是kafka记录着偏移量,故group id能够跨平台、跨利用来应用。比方当有一个java工作须要做kafka数据的长久化且由flink来实现,此时flink工作应用即可用与java工作雷同的group id来实现工作平滑切换,做到无数据失落、无数据反复;
  • Group id:用于记录解决数据的偏移量,在工作重启或异样复原的时候持续从断点开始解决数据。
  • Value.format:kafka音讯格局。罕用的为csv、json、raw以及多种cdc格局。
  • Ignore-parse-errors:解析失败的起因蕴含多种,比方局部数据格式不是json,此时便会抛弃整条数据。需注意,如果其中一条json数据中的一个字段格局与建表语句中的格局不同,强转失败时只会影响这一个数据字段,不影响其它字段的解析。
  • Fail-on-missing-field:当kafka音讯中短少create table中定义的字段时是否终止flink工作。

连贯Clickhouse

url:这里能够填写单个jdbc url,示意以集群的形式写入逻辑表;也能够填写多个jdbc url,多个url应用逗号距离,示意以轮询的形式写入Clickhouse本地表;

Table-name:表名只能有一个,在轮询写入本地表的时候,url连贯和数据库能够雷同或不同,比方同一个url上的不同库,然而表名必须雷同;

Flush-max-rows:配合应用能够实现flink到clickhouse的同步输入、半同步输入、异步输入

连贯Redis

仅反对redis的hash构造,具体数据结构如下:

hash key:{key-prefix}{key-spacer}{k1}{key-spacer}{k2}

hash field:schema中除了key字段之外的其余字段

hash value:存储key之外其余字段对应的值,flink redis schema反对的类型:STRING、BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、DOUBLE

Flink SQL函数

Flink SQL函数分为内置函数和自定义函数

自定义函数:

ScalarFunction:行级数据处理,一行的一列或多列数据处理输入一个数据

TableFunction:也是行级数据处理,承受一个或多个参数输出,然而能够输入多行多列数据

AggregateFunction:聚合函数,配合group by应用,可依据多行多列数据计算输入一个指标值

TableAggregateFunction:表聚合函数,配合group by应用,依据多行多列数据计算输入多行多列数据,表聚合函数目前还不能应用在FlinkSQL中,实用于Flink Table API

操作示例:

这里以水果的ID获取到水果名称,演示一下自定义函数的用法。Udf能够应用Java代码来开发,具备java语言的个性,比方多态。示例里的类继承Flink的ScalarFunction,实现了一个eval函数,自定义函数的编写较为简单。此外,还能够依据Java的多态个性编写多个eval函数,实现多类型数据的解决。 编写好的自定义函数,打成jar包后通过平台的资源库页面上传至平台,在编写数据处理工作时,应用 create temporary function的形式引入该自定义函数,即可应用。

Flink SQL案例

需要形容

对topic r_01 中的水果销售流水进行统计,失去每种水果每分钟的销售额,将计算结果别离输入到kafka、clickhouse、redis。

topic中数据格式: { "order_id":1, "product_id":1, "trans_amount":1 }

计算结果应蕴含如下字段: { "product_id":1, "product_name":"苹果", "create_minute":'2021-12-02 12:00:00’, "trans_amount":3 }

具体操作

kafka to kafka

  1. 创立一个kafka source
  2. 创立一个kafka sink
  3. 编写数据处理sql

kafka to clickhouse

  1. 创立一个kafka source
  2. 创立一个clickhouse sink
  3. 编写数据处理sql

kafka to redis

  1. 创立一个kafka source
  2. 创立一个redis sink
  3. 编写数据处理sql

Checkpoint

状态的作用

Checkpoint是Flink存储工作运行状态的一个检查点,状态是flink的一等公民,能够让程序记住运行的两头后果,以便工作异样时的重启复原

Flink利用异样示例

  • 实时统计当日订单总额的程序异常中断,从状态复原不须要从0点开始从新计算
  • 实时ETL同步kafka数据到内部存储异常中断,从状态复原则不须要从头生产

状态数据的存储

  • 能够保留在内存中,当状态数据过大,内存Oom
  • 保留在长久化的文件系统中,比方本地或者hdfs
  • 通过状态过期工夫管制flink利用的状态大小
  • 状态数据须要周期性地保留下来,用于故障复原

如何从检查点复原

  • 读取最近一次checkpoint中的状态数据,比方累计销售额sum值为8000元
  • 读取最近一次checkpoint中提交的offset,比方partition 0,offset 1000
  • 上述状态数据表明,flink应用程序在生产到(0,1000)这个位点时统计的销售额为8000元
  • 利用恢复正常后,从(0,1001)开始生产,sum从8000开始累加

检查点外部状态数据的一致性

  • 外部状态数据一致性语义:准确统一或者至多一次
  • 同样是上述样例,如果是准确一致性语义,则sum值对每条kafka音讯只统计一次,如果是至多一次,则sum值的统计后果有可能偏高
  • 如果topic只有一个分区,则是准确统一,因为flink连贯kafka source的并行度为分区数,在并行度为1的状况下不存在多流不同时达到的状况
  • Kafka多分区状况下,flink默认是多并行度,此时设置为 至多一次语义,再加上多流很大概率不会同时达到,会导致统计后果偏高。

检查点设置

建设flink流工作时可选抉择是否开启检查点并设置检查点周期 。开启检查点有两个作用,一是在工作运行发生意外主动重启的时候会从检查点复原,能够确保工作从异样点持续计算,保持数据连贯性与准确性;另一个作用是在手动进行工作后,再次启动的时候,能够抉择是否从上一个检查点继续执行工作。

检查点复原

工作中断后再次启动时能够抉择是否从最近一个检查点复原状态数据。目前反对固定提早和失败比率的重启策略,别离对应固定重启次数和一段时间内失败次数超过阈值则不再重启。

福利放送

云智慧以开源集轻量级、聚合型、智能运维为一体的综合运维治理平台 OMP(Operation Management Platform) ,具备纳管、部署、监控、巡检、自愈、备份、复原等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员工作效率的同时,极大晋升业务的连续性和安全性。

点击下方地址链接,欢送大家给 OMP 点赞送 Star,理解更多相干内容~

GitHub 地址:https://github.com/CloudWise-OpenSource/OMP

Gitee 地址:https://gitee.com/CloudWise/OMP

微信扫描辨认下方二维码,备注【OMP】退出AIOps社区运维治理平台 OMP 开发者交换群,与 OMP 我的项目 PMC 面对面交换~