1 引言
网易云信作为一个 PaaS 服务,须要对线上业务进行实时监控,实时感知服务的“心跳”、“脉搏”、“血压”等健康状况。通过采集服务拿到 SDK、服务器等端的心跳埋点日志,是一个十分宏大且杂乱无序的数据集,而如何能力无效利用这些数据?服务监控平台要做的事件就是对海量数据进行实时剖析,聚合出表征服务的“心跳”、“脉搏”、“血压”的外围指标,并将其直观的展现给相干同学。这其中外围的能力便是:实时剖析和实时聚合。
在之前的《网易云信服务监控平台实际》一文中,咱们围绕数据采集、数据处理、监控告警、数据利用 4 个环节,介绍了网易云信服务监控平台的整体框架。本文是对网易云信在聚合指标计算逻辑上的进一步详述。
基于明细数据集进行实时聚合,生产一个聚合指标,业界罕用的实现形式是 Spark Streaming、Flink SQL / Stream API。不论是何种形式,咱们都须要通过写代码来指定数据起源、数据荡涤逻辑、聚合维度、聚合窗口大小、聚合算子等。如此繁冗的逻辑和代码,无论是开发、测试,还是后续工作的保护,都须要投入大量的人力 / 物力老本。而咱们程序员要做的便是化繁为简、实现大巧不工。
本文将论述网易云信是如何基于 Flink 的 Stream API,实现一套通用的聚合指标计算框架。
2 整体架构
如上图所示,是咱们基于 Flink 自研的聚合指标残缺加工链路,其中波及到的模块包含:
- source:定期加载聚合规定,并依据聚合规定按需创立 Kafka 的 Consumer,并继续生产数据。
- process:包含分组逻辑、窗口逻辑、聚合逻辑、环比计算逻辑等。从图中能够看到,咱们在聚合阶段分成了两个,这样做的目标是什么?其中的益处是什么呢?做过分布式和并发计算的,都会遇到一个独特的敌人:数据歪斜。在咱们 PaaS 服务中头部客户会更加显著,所以歪斜十分重大,分成两个阶段进行聚合的奥秘下文中会具体阐明。
- sink:是数据输入层,目前默认输入到 Kafka 和 InfluxDB,前者用于驱动后续计算(如告警告诉等),后者用于数据展现以及查问服务等。
- reporter:全链路统计各个环节的运行状况,如输出 / 输入 QPS、计算耗时、生产沉积、早退数据量等。
下文将具体介绍这几个模块的设计和实现思路。
3 source
规定配置
为了便于聚合指标的生产和保护,咱们将指标计算过程中波及到的要害参数进行了形象提炼,提供了可视化配置页面,如下图所示。下文会联合具体场景介绍各个参数的用处。
规定加载
在聚合工作运行过程中,咱们会定期加载配置。如果检测到有 新增 的 Topic,咱们会创立 kafka-consumer 线程,接管上游实时数据流。同理,对于曾经 生效 的配置,咱们会敞开生产线程,并清理相干的 reporter。
数据生产
对于数据源雷同的聚合指标,咱们共用一个 kafka-consumer,拉取到记录并解析后,对每个聚合指标别离调用 collect() 进行数据散发。如果指标的数据筛选规定(配置项⑤)非空,在数据散发前须要进行数据过滤,不满足条件的数据间接抛弃。
4 process
整体计算流程
基于 Flink 的 Stream API 实现聚合计算的外围代码如下所示:
SingleOutputStreamOperator<MetricContext> aggResult = src
.assignTimestampsAndWatermarks(new MetricWatermark())
.keyBy(new MetricKeyBy())
.window(new MetricTimeWindow())
.aggregate(new MetricAggFuction());
- MetricWatermark():依据指定的工夫字段(配置项⑧)获取输出数据的 timestamp,并驱动计算流的 watermark 往前推动。
- MetricKeyBy():指定聚合维度,相似于 MySQL 中 groupby,依据分组字段(配置项⑥),从数据中获取聚合维度的取值,拼接成分组 key。
- MetricTimeWindow():配置项⑧中指定了聚合计算的窗口大小。如果配置了定时输入,咱们就创立滑动窗口,否则就创立滚动窗口。
- MetricAggFuction():实现配置项②指定的各种算子的计算,下文将具体介绍各个算子的实现原理。
二次聚合
对于大数据量的聚合计算,数据歪斜 是不得不思考的问题,数据歪斜意味着规定中配置的分组字段(配置项⑥)指定的聚合 key 存在热点。咱们的计算框架在设计之初就思考了如何解决数据歪斜问题,就是将聚合过程拆分成 2 阶段:
- 第 1 阶段:将数据随机打散,进行预聚合。
- 第 2 阶段:将第 1 阶段的预聚合后果作为输出,进行最终的聚合。
具体实现:判断并发度参数 parallelism(配置项⑦)是否大于 1,如果 parallelism 大于 1,生成一个 [0, parallelism) 之间的随机数作为 randomKey,在第 1 阶段聚合 keyBy() 中,将根据分组字段(配置项⑥)获取的 key 与 randomKey 拼接,生成最终的聚合 key,从而实现了数据随机打散。
聚合算子
作为一个平台型的产品,咱们提供了如下常见的聚合算子。因为采纳了二次聚合逻辑,各个算子在第 1 阶段和第 2 阶段采纳了相应的计算策略。
算子 | 第 1 阶段聚合 | 第 2 阶段聚合 |
---|---|---|
min/max/sum/count | 间接对输出数据进行预聚合计算,输入预聚合后果 | 对第 1 阶段预聚合后果进行二次聚合计算,输入最终后果 |
first/last | 对输出数据的 timestamp 进行比拟,记录最小 / 最大的 timestamp 以及对应的 value 值,输入 <timestamp,value> 数据对 | 对 <timestamp,value> 数据对进行二次计算,输入最终的 first/last |
avg | 计算该分组的和值和记录数,输入 <sum,cnt> 数据对 | 对 <sum,cnt> 数据对别离求和,而后输入:总 sum / 总 cntcount |
median/tp90/tp95 | 统计输出数据的散布,输入 NumericHistogram | 对输出的 NumericHistogram 做 merge 操作,最终输入中位数 /tp90/tp95 |
count-distinct | 输入记录桶信息和位图的 RoaringArray | 对 RoaringArray 进行 merge 操作,最终输入准确的去重计数后果 |
count-distinct(近似) | 输入基数计数对象 HyperLoglog | 对 HyperLoglog 进行 merge 操作,最终输入近似的去重计数后果 |
对于计算结果受全副数据影响的算子,如 count-distinct(去重计数),惯例思路是利用 set 的去重个性,将所有统计数据放在一个 Set 中,最终在聚合函数的 getResult 中输入 Set 的 size。如果统计数据量十分大,这个 Set 对象就会十分大,对这个 Set 的 I/O 操作所耗费的工夫将不能承受。
对于类 MapReduce 的大数据计算框架,性能的瓶颈往往呈现在 shuffle 阶段大对象的 I/O 上,因为数据须要序列化 / 传输 / 反序列化,Flink 也不例外。相似的算子还有 median 和 tp95。
为此,须要对这些算子做专门的优化,优化的思路就是尽量减少计算过程中应用的数据对象的大小,其中:
- median/tp90/tp95:参考了 hive percentile_approx 的近似算法,该算法通过 NumericHistogram(一种非等距直方图)记录数据分布,而后通过插值的形式失去相应的 tp 值(median 是 tp50)。
- count-distinct:采纳 RoaringBitmap 算法,通过压缩位图的形式标记输出样本,最终失去 准确 的去重计数后果。
- count-distinct(近似):采纳 HyperLoglog 算法,通过基数计数的形式,失去 近似 的去重计数后果。该算法实用于大数据集的去重计数。
后处理
后处理模块,是对第 2 阶段聚合计算输入数据进行再加工,次要有 2 个性能:
- 复合指标计算:对原始统计指标进行组合计算,失去新的组合指标。例如,要统计登录成功率,咱们能够先别离统计出分母(登录次数)和分子(登录胜利的次数),而后将分子除以分母,从而失去一个新的组合指标。配置项③就是用来配置组合指标的计算规定。
- 绝对指标计算:告警规定中常常要判断某个指标的绝对变动状况(同比 / 环比)。咱们利用 Flink 的 state,可能不便的计算出同比 / 环比指标,配置项④就是用来配置绝对指标规定。
异样数据的解决
这里所说的异样数据,分为两类:早退的数据和提前到的数据。
-
早退数据:
- 对于重大早退的数据(大于聚合窗口的 allowedLateness),通过 sideOutputLateData 进行收集,并通过 reporter 统计上报,从而可能在监控页面进行可视化监控。
- 对于轻微早退的数据(小于聚合窗口的 allowedLateness),会触发窗口的重计算。如果每来一条早退数据就触发一次第 1 阶段窗口的重计算,重计算结果传导到第 2 阶段聚合计算,就会导致局部数据的反复统计。为了解决反复统计的问题,咱们在第 1 阶段聚合 Trigger 中进行了非凡解决:窗口触发采纳 FIRE_AND_PURGE(计算并清理),及时清理曾经参加过计算的数据。
- 提前到的数据:这部分数据往往是数据上报端的时钟不准导致。在计算这些数据的 timestamp 时要人为干涉,防止影响整个计算流的 watermark。
5 sink
聚合计算失去的指标,默认输入到 Kafka 和时序数据库 InfluxDB。
- kafka-sink:将指标标识(配置项①)作为 Kafka 的 topic,将聚合后果发送进来,上游接管到该数据流后能够进一步解决加工,如告警事件的生产等。
- InfluxDB-sink:将指标标识(配置项①)作为时序数据库的表名,将聚合后果长久化下来,用于 API 的数据查问、以及可视化报表展现等。
6 reporter
为了实时监控各个数据源和聚合指标的运行状况,咱们通过 InfluxDB+Grafana 组合,实现了聚合计算全链路监控:如各环节的输出 / 输入 QPS、计算耗时、生产沉积、早退数据量等。
7 结语
目前,通过该通用聚合框架,承载了网易云信 100+ 个不同维度的指标计算,带来的收益也是比拟可观的:
- 提效:采纳了页面配置化形式实现聚合指标的生产,开发周期从天级缩短到分钟级。没有数据开发教训的同学也可能本人入手实现指标的配置。
- 保护简略,资源利用率高:100+ 个指标只需保护 1 个 flink-job,资源耗费也从 300+ 个 CU 缩小到 40CU。
- 运行过程通明:借助于全链路监控,哪个计算环节有瓶颈,哪个数据源有问题,高深莫测。
作者介绍
圣少友,网易云信数据平台资深开发工程师,从事数据平台相干工作,负责服务监控平台、数据利用平台、品质服务平台的设计开发工作。
更多技术干货,欢送关注【网易智企技术 +】微信公众号