作者:闻乃松

需要背景

假如有一种数据源(比方CDC binlog Events、Kafka 实时数据流等),须要实时展现时序数据的摄入数量趋势,并能查看任意工夫范畴内的数据分布品质(比方每个字段的数据密度、总取值数量、去重后的数据取值总量等),最小工夫范畴距离为1分钟,最大范畴不限度。展现样例见下图,如何在计算资源无限的状况下(不应用分布式MPP查问引擎等),在可承受的工夫内给出响应。

解决方案

思考到查问工夫范畴不限,数据摄入速率和规模不限,在可承受的工夫内响应查问,必然须要提前物化查问后果。梳理指标,能够将这些指标分为两类:可物化的指标和不可物化的指标。

  • 可物化的指标 指能够提前预计算的指标,包含密度散布(字段不为空的数量占总记录Events的比例)、总Events数量、最早解决工夫、最晚解决工夫等
  • 不可物化的指标 指不能提前预计算的指标,这里次要是distinct去重值数量

先说可物化的指标,因为数据指标查问最小1分钟,因而能够应用Flink Window机制:先在内存窗口暂存1分钟数据,当工夫窗口过期,触发数据指标的统计和输入。大范畴工夫的指标基于分钟级的指标汇总得出。总体方案如下图:

整个过程包含以下几个外围局部:

  • 内部数据源的接入和数据解析 内部数据源反对多种,比方mysql、Kafka、S3等,数据格式也多种多样,比方JSON、CSV、AVRO等,数据解析将源格局解析成规范化的关系记录格局(含Schema)。
  • 实时指标计算 基于Flink 的Window实时统计是整个流程的外围,实现分布式并行统计和后果汇总。
  • 指标存储和查问 实践上每分钟1条统计后果,数据总量不算大,然而思考到这样的实时统计作业会很多,因而一种反对可扩大的分布式存储系统显得至关重要,因为Iceberg数据湖存储框架轻量,不引入新的存储系统,是本方案设计的选项之一。另外,Iceberg反对SDK查问形式,在计算资源和查问资源无限,满足响应工夫的状况下,能够在单JVM线程中运行查问。

具体设计与实现

内部数据源的接入和数据解析

这里以Kafka JSON格局数据接入为例阐明:

KafkaSource<MetaAndValue> kafkaSource =        KafkaSource.<ObjectNode>builder()                .setBootstrapServers(servers)                .setGroupId(DataSinkIcebergJob.class.getName())                .setTopics(topic)                .setDeserializer(recordDeserializer)                .setStartingOffsets(OffsetsInitializer.earliest())                .setBounded(OffsetsInitializer.latest())                .setProperties(properties)                .build();

示例中依照String反序列形式将Kafka字节数据反序列化为String Json格局,设置从分区起始地位拉取,并在达到最新地位进行。

数据解析将JSON的每个Field依照所在path展平成一维关系型记录数据,比方上面的一条JSON数据,通过展平后存储在Map中的成果:

将原始JSON字段path用下划线连接起来,同时增加一些Kafka的元数据字段(topic,offset,partition)和额定的解决工夫字段(processing),如下图所示:

实时指标计算

实时指标计算拓扑构造如下所示,分为并行计算局部和汇总计算两局部:

其中并行计算局部将ParseResult类型的数据流依照字段维度进行统计,每个字段别离统计最大值、最小值、不为空的记录数量、字段类型。另外分钟级的统计指标还包含整体性的指标,如本统计周期的开始工夫、完结工夫、胜利解析的事件数量和解析失败的事件数量等。

//将原始JSON数据流转换为解析后果流
DataStream<ParseResult> parseResultStream = sourceStream.transform("ParseResultStreamOperator", TypeInformation.of(ParseResult.class), new ParseResultStreamOperator()).rebalance();

//将解析后果流依照分钟级窗口划分ProcessWindowFunction processWindowFunction = new StatProcessWindowsFunction();SingleOutputStreamOperator<Map<String, Object>> wndStream = parseResultStream.keyBy(pr -> pr.getProcessingTime() % 1000)        .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) //设置工夫窗口        .process(processWindowFunction);

计算结果款式:

图中的failEventCount示意本统计周期内的解析失败数量,statPeriodEnd示意本统计周期的开始工夫(窗口开始工夫),其余Map类型的字段存储本字段的统计信息,如本统计周期内的最大、最小值。

上一步骤中的并行窗口计算结果有两个维度:窗口工夫和key,同一个统计窗口会输入多个key的子后果,汇总计算就是将这些子统计后果合并。这里的难点有三个:

  1. 窗口计算基于Processing Time,每个Task运行在分布式环境下,无奈保证系统工夫的准确同步和零碎解决能力统一。上游可能接管到不同窗口周期的子计算结果。
  2. 同一窗口周期的子计算结果按key维度有多条记录,但数量不确定,上游不晓得什么时候才能够触发合并动作。
  3. 合并算子可能并行,合并不能成为影响性能的瓶颈。

咱们的解决办法是将并行统计的窗口按窗口工夫再次聚合后,在一个同样工夫大小的窗口内合并后果,窗口工夫属性也是基于解决工夫的:

//将并行统计的窗口按窗口工夫再次聚合后再合并后果wndStream.keyBy(new KeySelector<Map<String, Object>,String>(){                    @Override                    public String getKey(Map<String, Object> value) {                        return ((LocalDateTime) value.get("statPeriodBegin")).toString();                    }                }).window(TumblingProcessingTimeWindows.of(Time.milliseconds(winTimeMills))).process(mergeStatProcessWindowFunction)

上面用一张图来看,是否能解决下面提到的问题,首先基于工夫窗口,保障上游的后果数据在最长1分钟内全副达到上游窗口,在达到上游窗口之前是通过工夫聚合后的,这保障同一个窗口周期的计算结果不会落到上游不同窗口内,而上游不同窗口的数据即便在上游同一个窗口解决,然而因为key隔离到不同的窗口解决函数调用,所以不会后果混在一起。另外,假如TM1和TM2主机工夫严格统一,因为上游达到窗口计算右边际,触发后果计算,因为计算自身有肯定时延,实践上上游第一个工夫窗口的数据肯定会落到上游的第二个工夫窗口,顺次类推,然而因为上游合并后果时,只有应用上游的工夫窗口属性,就能够保障后果数据的正确性不受影响。

指标存储和查问

基于Iceberg存储指标,可能以不引入内部存储系统,免保护的形式反对可扩大,同时Iceberg表可能对计算引擎和查问引擎凋谢。这部分包含存储表Schema的动态创建、数据实时存储和和指标查问。

首先是TableSchema的动态创建,基于JSON解析成的Schema创立Iceberg TableSchema:

private static TableSchema getStatTableSchema(Schema schema) {  TableSchema.Builder schemaBuilder = TableSchema.builder();  List<Types.NestedField> fieldList = schema.columns();  for (Map.Entry<String, DataType> entry : dataTypeByName.entrySet()) {    schemaBuilder.field(entry.getKey(), entry.getValue());  }  for (Types.NestedField field : fieldList) {    DataType dataType = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());    schemaBuilder.field(field.name(), dataType);  }  return schemaBuilder.build();}

其次,数据实时存储基于Flink实时流:

TableLoader statTableLoader = TableLoader.fromCatalog(catalogLoader, statIdentifier);FlinkSink.forRow(statStream, statTableSchema)        .tableLoader(statTableLoader)        .tableSchema(statTableSchema)        .build();

最初是指标查问,基于SDK即可实现数据的便当,此外,SDK还反对工夫范畴和字段级别的过滤,借助Iceberg自身存储的统计元数据信息,查问过程还是很快的。

TableLoader statTableLoader = TableLoader.fromHadoopTable(statWarehouseDir.getAbsolutePath());statTableLoader.open();Table statTable = statTableLoader.loadTable();CloseableIterable<Record> statIterable = IcebergGenerics.read(statTable)        //.where(Expressions.equal()        .build();statIterable.forEach(record -> {    System.out.println(record.toString());});

总结

本文介绍了一种基于Flink Window实现实时数据指标统计的办法,内容包含数据源的解析、实时指标统计和存储查问,解决了在大数据集和资源受限状况下,疾速查问数据品质的问题。另外,内容还波及到了存储表和源数据Schema的联动,然而因为Iceberg SDK的限度,TableSchema一开始就固定下来了,无奈实现在数据解析过程中动静批改Schema。最初须要一提的是,不可预计算指标,因为本文篇幅限度,此处不再开展,然而不可预计算指标的确是一大难点,为了解决疾速查问,可能须要占用更多的存储和计算资源。