乐趣区

关于框架:技术实践-如何基于-Flink-实现通用的聚合指标计算框架

1 引言

网易云信作为一个 PaaS 服务,须要对线上业务进行实时监控,实时感知服务的“心跳”、“脉搏”、“血压”等健康状况。通过采集服务拿到 SDK、服务器等端的心跳埋点日志,是一个十分宏大且杂乱无序的数据集,而如何能力无效利用这些数据?服务监控平台要做的事件就是对海量数据进行实时剖析,聚合出表征服务的“心跳”、“脉搏”、“血压”的外围指标,并将其直观的展现给相干同学。这其中外围的能力便是:实时剖析和实时聚合

在之前的《网易云信服务监控平台实际》一文中,咱们围绕数据采集、数据处理、监控告警、数据利用 4 个环节,介绍了网易云信服务监控平台的整体框架。本文是对网易云信在聚合指标计算逻辑上的进一步详述。

基于明细数据集进行实时聚合,生产一个聚合指标,业界罕用的实现形式是 Spark Streaming、Flink SQL / Stream API。不论是何种形式,咱们都须要通过写代码来指定数据起源、数据荡涤逻辑、聚合维度、聚合窗口大小、聚合算子等。如此繁冗的逻辑和代码,无论是开发、测试,还是后续工作的保护,都须要投入大量的人力 / 物力老本。而咱们程序员要做的便是化繁为简、实现大巧不工。

本文将论述网易云信是如何基于 Flink 的 Stream API,实现一套通用的聚合指标计算框架。

2 整体架构

如上图所示,是咱们基于 Flink 自研的聚合指标残缺加工链路,其中波及到的模块包含:

  • source:定期加载聚合规定,并依据聚合规定按需创立 Kafka 的 Consumer,并继续生产数据。
  • process:包含分组逻辑、窗口逻辑、聚合逻辑、环比计算逻辑等。从图中能够看到,咱们在聚合阶段分成了两个,这样做的目标是什么?其中的益处是什么呢?做过分布式和并发计算的,都会遇到一个独特的敌人:数据歪斜。在咱们 PaaS 服务中头部客户会更加显著,所以歪斜十分重大,分成两个阶段进行聚合的奥秘下文中会具体阐明。
  • sink:是数据输入层,目前默认输入到 Kafka 和 InfluxDB,前者用于驱动后续计算(如告警告诉等),后者用于数据展现以及查问服务等。
  • reporter:全链路统计各个环节的运行状况,如输出 / 输入 QPS、计算耗时、生产沉积、早退数据量等。

下文将具体介绍这几个模块的设计和实现思路。

3 source

规定配置

为了便于聚合指标的生产和保护,咱们将指标计算过程中波及到的要害参数进行了形象提炼,提供了可视化配置页面,如下图所示。下文会联合具体场景介绍各个参数的用处。

规定加载

在聚合工作运行过程中,咱们会定期加载配置。如果检测到有 新增 的 Topic,咱们会创立 kafka-consumer 线程,接管上游实时数据流。同理,对于曾经 生效 的配置,咱们会敞开生产线程,并清理相干的 reporter。

数据生产

对于数据源雷同的聚合指标,咱们共用一个 kafka-consumer,拉取到记录并解析后,对每个聚合指标别离调用 collect() 进行数据散发。如果指标的数据筛选规定(配置项)非空,在数据散发前须要进行数据过滤,不满足条件的数据间接抛弃。

4 process

整体计算流程

基于 Flink 的 Stream API 实现聚合计算的外围代码如下所示:

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
  • MetricWatermark():依据指定的工夫字段(配置项⑧)获取输出数据的 timestamp,并驱动计算流的 watermark 往前推动。
  • MetricKeyBy():指定聚合维度,相似于 MySQL 中 groupby,依据分组字段(配置项⑥),从数据中获取聚合维度的取值,拼接成分组 key。
  • MetricTimeWindow():配置项⑧中指定了聚合计算的窗口大小。如果配置了定时输入,咱们就创立滑动窗口,否则就创立滚动窗口。
  • MetricAggFuction():实现配置项②指定的各种算子的计算,下文将具体介绍各个算子的实现原理。

二次聚合

对于大数据量的聚合计算,数据歪斜 是不得不思考的问题,数据歪斜意味着规定中配置的分组字段(配置项⑥)指定的聚合 key 存在热点。咱们的计算框架在设计之初就思考了如何解决数据歪斜问题,就是将聚合过程拆分成 2 阶段:

  • 第 1 阶段:将数据随机打散,进行预聚合。
  • 第 2 阶段:将第 1 阶段的预聚合后果作为输出,进行最终的聚合。

具体实现:判断并发度参数 parallelism(配置项⑦)是否大于 1,如果 parallelism 大于 1,生成一个 [0, parallelism) 之间的随机数作为 randomKey,在第 1 阶段聚合 keyBy() 中,将根据分组字段(配置项⑥)获取的 key 与 randomKey 拼接,生成最终的聚合 key,从而实现了数据随机打散。

聚合算子

作为一个平台型的产品,咱们提供了如下常见的聚合算子。因为采纳了二次聚合逻辑,各个算子在第 1 阶段和第 2 阶段采纳了相应的计算策略。

算子 第 1 阶段聚合 第 2 阶段聚合
min/max/sum/count 间接对输出数据进行预聚合计算,输入预聚合后果 对第 1 阶段预聚合后果进行二次聚合计算,输入最终后果
first/last 对输出数据的 timestamp 进行比拟,记录最小 / 最大的 timestamp 以及对应的 value 值,输入 <timestamp,value> 数据对 对 <timestamp,value> 数据对进行二次计算,输入最终的 first/last
avg 计算该分组的和值和记录数,输入 <sum,cnt> 数据对 对 <sum,cnt> 数据对别离求和,而后输入:总 sum / 总 cntcount
median/tp90/tp95 统计输出数据的散布,输入 NumericHistogram 对输出的 NumericHistogram 做 merge 操作,最终输入中位数 /tp90/tp95
count-distinct 输入记录桶信息和位图的 RoaringArray 对 RoaringArray 进行 merge 操作,最终输入准确的去重计数后果
count-distinct(近似) 输入基数计数对象 HyperLoglog 对 HyperLoglog 进行 merge 操作,最终输入近似的去重计数后果

对于计算结果受全副数据影响的算子,如 count-distinct(去重计数),惯例思路是利用 set 的去重个性,将所有统计数据放在一个 Set 中,最终在聚合函数的 getResult 中输入 Set 的 size。如果统计数据量十分大,这个 Set 对象就会十分大,对这个 Set 的 I/O 操作所耗费的工夫将不能承受。

对于类 MapReduce 的大数据计算框架,性能的瓶颈往往呈现在 shuffle 阶段大对象的 I/O 上,因为数据须要序列化 / 传输 / 反序列化,Flink 也不例外。相似的算子还有 median 和 tp95。

为此,须要对这些算子做专门的优化,优化的思路就是尽量减少计算过程中应用的数据对象的大小,其中:

  • median/tp90/tp95:参考了 hive percentile_approx 的近似算法,该算法通过 NumericHistogram(一种非等距直方图)记录数据分布,而后通过插值的形式失去相应的 tp 值(median 是 tp50)。
  • count-distinct:采纳 RoaringBitmap 算法,通过压缩位图的形式标记输出样本,最终失去 准确 的去重计数后果。
  • count-distinct(近似):采纳 HyperLoglog 算法,通过基数计数的形式,失去 近似 的去重计数后果。该算法实用于大数据集的去重计数。

后处理

后处理模块,是对第 2 阶段聚合计算输入数据进行再加工,次要有 2 个性能:

  • 复合指标计算:对原始统计指标进行组合计算,失去新的组合指标。例如,要统计登录成功率,咱们能够先别离统计出分母(登录次数)和分子(登录胜利的次数),而后将分子除以分母,从而失去一个新的组合指标。配置项③就是用来配置组合指标的计算规定。
  • 绝对指标计算:告警规定中常常要判断某个指标的绝对变动状况(同比 / 环比)。咱们利用 Flink 的 state,可能不便的计算出同比 / 环比指标,配置项④就是用来配置绝对指标规定。

异样数据的解决

这里所说的异样数据,分为两类:早退的数据和提前到的数据。

  • 早退数据

    • 对于重大早退的数据(大于聚合窗口的 allowedLateness),通过 sideOutputLateData 进行收集,并通过 reporter 统计上报,从而可能在监控页面进行可视化监控。
    • 对于轻微早退的数据(小于聚合窗口的 allowedLateness),会触发窗口的重计算。如果每来一条早退数据就触发一次第 1 阶段窗口的重计算,重计算结果传导到第 2 阶段聚合计算,就会导致局部数据的反复统计。为了解决反复统计的问题,咱们在第 1 阶段聚合 Trigger 中进行了非凡解决:窗口触发采纳 FIRE_AND_PURGE(计算并清理),及时清理曾经参加过计算的数据。
  • 提前到的数据:这部分数据往往是数据上报端的时钟不准导致。在计算这些数据的 timestamp 时要人为干涉,防止影响整个计算流的 watermark。

5 sink

聚合计算失去的指标,默认输入到 Kafka 和时序数据库 InfluxDB。

  • kafka-sink:将指标标识(配置项①)作为 Kafka 的 topic,将聚合后果发送进来,上游接管到该数据流后能够进一步解决加工,如告警事件的生产等。
  • InfluxDB-sink:将指标标识(配置项①)作为时序数据库的表名,将聚合后果长久化下来,用于 API 的数据查问、以及可视化报表展现等。

6 reporter

为了实时监控各个数据源和聚合指标的运行状况,咱们通过 InfluxDB+Grafana 组合,实现了聚合计算全链路监控:如各环节的输出 / 输入 QPS、计算耗时、生产沉积、早退数据量等。

7 结语

目前,通过该通用聚合框架,承载了网易云信 100+ 个不同维度的指标计算,带来的收益也是比拟可观的:

  • 提效:采纳了页面配置化形式实现聚合指标的生产,开发周期从天级缩短到分钟级。没有数据开发教训的同学也可能本人入手实现指标的配置。
  • 保护简略,资源利用率高:100+ 个指标只需保护 1 个 flink-job,资源耗费也从 300+ 个 CU 缩小到 40CU。
  • 运行过程通明:借助于全链路监控,哪个计算环节有瓶颈,哪个数据源有问题,高深莫测。

作者介绍

圣少友,网易云信数据平台资深开发工程师,从事数据平台相干工作,负责服务监控平台、数据利用平台、品质服务平台的设计开发工作。

更多技术干货,欢送关注【网易智企技术 +】微信公众号

退出移动版