乐趣区

关于Flink:一种基于Flink-Window的实时指标统计方法

作者:闻乃松

需要背景

假如有一种数据源(比方 CDC binlog Events、Kafka 实时数据流等),须要实时展现时序数据的摄入数量趋势,并能查看任意工夫范畴内的数据分布品质(比方每个字段的数据密度、总取值数量、去重后的数据取值总量等),最小工夫范畴距离为 1 分钟,最大范畴不限度。展现样例见下图,如何在计算资源无限的状况下(不应用分布式 MPP 查问引擎等),在可承受的工夫内给出响应。

解决方案

思考到查问工夫范畴不限,数据摄入速率和规模不限,在可承受的工夫内响应查问,必然须要提前物化查问后果。梳理指标,能够将这些指标分为两类:可物化的指标和不可物化的指标。

  • 可物化的指标 指能够提前预计算的指标,包含密度散布(字段不为空的数量占总记录 Events 的比例)、总 Events 数量、最早解决工夫、最晚解决工夫等
  • 不可物化的指标 指不能提前预计算的指标,这里次要是 distinct 去重值数量

先说可物化的指标,因为数据指标查问最小 1 分钟,因而能够应用 Flink Window 机制:先在内存窗口暂存 1 分钟数据,当工夫窗口过期,触发数据指标的统计和输入。大范畴工夫的指标基于分钟级的指标汇总得出。总体方案如下图:

整个过程包含以下几个外围局部:

  • 内部数据源的接入和数据解析 内部数据源反对多种,比方 mysql、Kafka、S3 等,数据格式也多种多样,比方 JSON、CSV、AVRO 等,数据解析将源格局解析成规范化的关系记录格局(含 Schema)。
  • 实时指标计算 基于 Flink 的 Window 实时统计是整个流程的外围,实现分布式并行统计和后果汇总。
  • 指标存储和查问 实践上每分钟 1 条统计后果,数据总量不算大,然而思考到这样的实时统计作业会很多,因而一种反对可扩大的分布式存储系统显得至关重要,因为 Iceberg 数据湖存储框架轻量,不引入新的存储系统,是本方案设计的选项之一。另外,Iceberg 反对 SDK 查问形式,在计算资源和查问资源无限,满足响应工夫的状况下,能够在单 JVM 线程中运行查问。

具体设计与实现

内部数据源的接入和数据解析

这里以 Kafka JSON 格局数据接入为例阐明:

KafkaSource<MetaAndValue> kafkaSource =
        KafkaSource.<ObjectNode>builder()
                .setBootstrapServers(servers)
                .setGroupId(DataSinkIcebergJob.class.getName())
                .setTopics(topic)
                .setDeserializer(recordDeserializer)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBounded(OffsetsInitializer.latest())
                .setProperties(properties)
                .build();

示例中依照 String 反序列形式将 Kafka 字节数据反序列化为 String Json 格局,设置从分区起始地位拉取,并在达到最新地位进行。

数据解析将 JSON 的每个 Field 依照所在 path 展平成一维关系型记录数据,比方上面的一条 JSON 数据,通过展平后存储在 Map 中的成果:

将原始 JSON 字段 path 用下划线连接起来,同时增加一些 Kafka 的元数据字段(topic,offset,partition)和额定的解决工夫字段(processing),如下图所示:

实时指标计算

实时指标计算拓扑构造如下所示,分为并行计算局部和汇总计算两局部:

其中并行计算局部将 ParseResult 类型的数据流依照字段维度进行统计,每个字段别离统计最大值、最小值、不为空的记录数量、字段类型。另外分钟级的统计指标还包含整体性的指标,如本统计周期的开始工夫、完结工夫、胜利解析的事件数量和解析失败的事件数量等。

// 将原始 JSON 数据流转换为解析后果流
DataStream<ParseResult> parseResultStream = sourceStream.transform(“ParseResultStreamOperator”, TypeInformation.of(ParseResult.class), new ParseResultStreamOperator()).rebalance();

// 将解析后果流依照分钟级窗口划分
ProcessWindowFunction processWindowFunction = new StatProcessWindowsFunction();
SingleOutputStreamOperator<Map<String, Object>> wndStream = parseResultStream.keyBy(pr -> pr.getProcessingTime() % 1000)
        .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // 设置工夫窗口
        .process(processWindowFunction);

计算结果款式:

图中的 failEventCount 示意本统计周期内的解析失败数量,statPeriodEnd 示意本统计周期的开始工夫(窗口开始工夫),其余 Map 类型的字段存储本字段的统计信息,如本统计周期内的最大、最小值。

上一步骤中的并行窗口计算结果有两个维度:窗口工夫和 key,同一个统计窗口会输入多个 key 的子后果,汇总计算就是将这些子统计后果合并。这里的难点有三个:

  1. 窗口计算基于 Processing Time,每个 Task 运行在分布式环境下,无奈保证系统工夫的准确同步和零碎解决能力统一。上游可能接管到不同窗口周期的子计算结果。
  2. 同一窗口周期的子计算结果按 key 维度有多条记录,但数量不确定,上游不晓得什么时候才能够触发合并动作。
  3. 合并算子可能并行,合并不能成为影响性能的瓶颈。

咱们的解决办法是将并行统计的窗口按窗口工夫再次聚合后,在一个同样工夫大小的窗口内合并后果,窗口工夫属性也是基于解决工夫的:

// 将并行统计的窗口按窗口工夫再次聚合后再合并后果
wndStream.keyBy(new KeySelector<Map<String, Object>,String>(){
                    @Override
                    public String getKey(Map<String, Object> value) {return ((LocalDateTime) value.get("statPeriodBegin")).toString();}
                })
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(winTimeMills)))
.process(mergeStatProcessWindowFunction)

上面用一张图来看,是否能解决下面提到的问题,首先基于工夫窗口,保障上游的后果数据在最长 1 分钟内全副达到上游窗口,在达到上游窗口之前是通过工夫聚合后的,这保障同一个窗口周期的计算结果不会落到上游不同窗口内,而上游不同窗口的数据即便在上游同一个窗口解决,然而因为 key 隔离到不同的窗口解决函数调用,所以不会后果混在一起。另外,假如 TM1 和 TM2 主机工夫严格统一,因为上游达到窗口计算右边际,触发后果计算,因为计算自身有肯定时延,实践上上游第一个工夫窗口的数据肯定会落到上游的第二个工夫窗口,顺次类推,然而因为上游合并后果时,只有应用上游的工夫窗口属性,就能够保障后果数据的正确性不受影响。

指标存储和查问

基于 Iceberg 存储指标,可能以不引入内部存储系统,免保护的形式反对可扩大,同时 Iceberg 表可能对计算引擎和查问引擎凋谢。这部分包含存储表 Schema 的动态创建、数据实时存储和和指标查问。

首先是 TableSchema 的动态创建,基于 JSON 解析成的 Schema 创立 Iceberg TableSchema:

private static TableSchema getStatTableSchema(Schema schema) {TableSchema.Builder schemaBuilder = TableSchema.builder();
  List<Types.NestedField> fieldList = schema.columns();
  for (Map.Entry<String, DataType> entry : dataTypeByName.entrySet()) {schemaBuilder.field(entry.getKey(), entry.getValue());
  }
  for (Types.NestedField field : fieldList) {DataType dataType = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
    schemaBuilder.field(field.name(), dataType);
  }
  return schemaBuilder.build();}

其次,数据实时存储基于 Flink 实时流:

TableLoader statTableLoader = TableLoader.fromCatalog(catalogLoader, statIdentifier);
FlinkSink.forRow(statStream, statTableSchema)
        .tableLoader(statTableLoader)
        .tableSchema(statTableSchema)
        .build();

最初是指标查问,基于 SDK 即可实现数据的便当,此外,SDK 还反对工夫范畴和字段级别的过滤,借助 Iceberg 自身存储的统计元数据信息,查问过程还是很快的。

TableLoader statTableLoader = TableLoader.fromHadoopTable(statWarehouseDir.getAbsolutePath());
statTableLoader.open();
Table statTable = statTableLoader.loadTable();
CloseableIterable<Record> statIterable = IcebergGenerics.read(statTable)
        //.where(Expressions.equal()
        .build();
statIterable.forEach(record -> {System.out.println(record.toString());
});

总结

本文介绍了一种基于 Flink Window 实现实时数据指标统计的办法,内容包含数据源的解析、实时指标统计和存储查问,解决了在大数据集和资源受限状况下,疾速查问数据品质的问题。另外,内容还波及到了存储表和源数据 Schema 的联动,然而因为 Iceberg SDK 的限度,TableSchema 一开始就固定下来了,无奈实现在数据解析过程中动静批改 Schema。最初须要一提的是,不可预计算指标,因为本文篇幅限度,此处不再开展,然而不可预计算指标的确是一大难点,为了解决疾速查问,可能须要占用更多的存储和计算资源。

退出移动版