如何抉择适宜本人业务的流计算引擎?除了比拟各自的性能矩阵外,基准测试(benchmark)便是用来评估零碎性能的一个重要和常见的办法。
然而在流计算畛域,目前还没有一个行业标准的基准测试。本文将探讨流计算基准测试设计上的难点,分享如何设计流计算基准测试框架——Nexmark,以及未来的布局。
背景
随着数据时效性对企业的精细化经营越来越重要,“实时即将来”、“实时数仓”、“数据湖”成为了近几年煊赫一时的词。流计算畛域的格局也在这几年产生了微小的变动,Apache Flink 在流批一体的方向上一直深耕,Apache Spark 的近实时处理有着肯定的受众,Apache Kafka 也有了 ksqlDB 高调地进军流计算,而 Apache Storm 却开始逐步地退出历史的舞台。
每一种引擎有其劣势的中央,如何抉择适宜本人业务的流计算引擎成了一个由来已久的话题。除了比拟各个引擎提供的不同的性能矩阵之外,性能是一个无奈绕开的评估因素。基准测试(benchmark)就是用来评估零碎性能的一个重要和常见的过程。
二 现有流计算基准测试的问题
目前在流计算畛域中,还没有一个行业标准的基准测试。目前业界较为人知的流计算 benchmark 是五年前雅虎 Storm 团队公布的 Yahoo Streaming Benchmarks[4]。雅虎的原意是因为业界短少反映实在场景的 benchmark,模仿了一个简略的广告场景来比拟各个流计算框架,起初被宽泛援用。具体场景是从 Kafka 生产的广告的点击流,关联 Redis 中的广告所属的 campaign 信息,而后做工夫窗口聚合计数。
然而,正是因为雅虎团队太过于谋求还原实在的生产环境,导致这些内部零碎服务(Kafka, Redis)成为了作业的瓶颈。Ververica 曾在这篇文章 [5] 中做过一个扩大试验,将数据源从 Kafka 替换成了一个内置的 datagen source,性能晋升了 37 倍!由此可见,引入的 Kafka 组件导致了无奈精确反映引擎实在的性能。更重要的一个问题是,Yahoo Benchmark 只蕴含一个非常简单的,相似“Word Count”的作业,它无奈全面地反映当今简单的流计算零碎和业务。试想,谁会用一个简略的“Word Count”去掂量比拟各个数据库之间的性能差别呢?正是这些起因使得 Yahoo Benchmark 无奈成为一个行业标准的基准测试。这也正是咱们想要解决的问题。
因而,咱们认为一个行业标准的基准测试应该具备以下几个特点:
1. 可复现性
可复现性是使得 benchmark 被信赖的一个重要条件。许多 benchmark 的后果是难以重现的。有的是因为只摆了个 benchmark 后果图,用于生成这些后果的代码并没有公开。有的是因为用于 benchmark 的硬件不容易被他人获取到。有的是因为 benchmark 依赖的服务太多,以致测试后果不稳固。
2. 能代表和笼罩行业实在的业务场景(query 量)
例如数据库畛域十分驰名的 TPC-H、TPC-DS 涵盖了大量的 query 汇合,来捕捉查问引擎之间轻微的差异。而且这些 query 汇合都立于实在业务场景之上(商品批发行业),数据规模大,因而也很受一些大数据系统的青眼。
3. 能调整作业的负载(数据量、数据分布)
在大数据畛域,不同的数据规模对于引擎来说可能会是齐全不同的事件。例如 Yahoo Benchmark 中应用的 campaign id 只有 100 个,使得状态十分小,内存都能够装的下。这样使得同步 IO 和 checkpoint 等的影响能够忽略不计。而实在的场景往往要面对大状态,面临的挑战要简单艰难的多。像 TPC-DS 的数据生成工具会提供 scalar factor 的参数来控制数据量。其次在数据分布上最好也能贴近真实世界的数据,如有数据歪斜,及调整歪斜比例。从而能全面、综合地反映业务场景和引擎之间的差别。
4. 有对立的性能掂量指标和采集汇总工具
基准测试的性能指标的定义须要清晰、统一,且能实用于各种计算引擎。然而流计算的性能指标要比传统批处理的更难定义、更难采集。是流计算 benchmark 最具挑战性的一个问题,这也会在下文开展形容。
咱们也钻研了很多其余的流计算相干的基准测试,包含:StreamBench、HiBench、BigDataBench,然而它们都在上述几个基本面有所欠缺。基准测试的行业标杆无疑是 TPC 公布的一系列 benchmark,如 TPC-H,TPC-DS。然而这些 benchmark 是面向传统数据库、传统数仓而设计的,并不适用于明天的流计算零碎。例如 benchmark 中没有思考事件工夫、数据的乱序、窗口等流计算中常见的场景。因而咱们不得不思考从新设计并开源一个流计算基准测试框架——Nexmark。
地址:https://github.com/nexmark/ne…。
三 Nexmark 基准测试框架的设计
为了提供一个满足以上几个基本面的流计算基准测试,咱们设计和开发了 Nexmark 基准测试框架,并致力让其成为流计算畛域的规范 benchmark。
Nexmark 基准测试框架来源于 NEXMark 钻研论文[1],以及 Apache Beam Nexmark Suite[6],并在其之上进行了扩大和欠缺。Nexmark 基准测试框架不依赖任何第三方服务,只须要部署好引擎和 Nexmark,通过脚本 nexmark/bin/run_query.sh all 即可期待并取得所有 query 下的 benchmark 后果。上面咱们将探讨 Nexmark 基准测试在设计上的一些决策。
移除内部 source、sink 依赖
如上所述,Yahoo Benchmark 应用了 Kafka 数据源,却使得最终后果无奈精确反映引擎的真实性能。此外,咱们还发现,在 benchmark 快慢流双流 JOIN 的场景时,如果应用了 Kafka 数据源,慢流会超前消费(快流易被反压),导致 JOIN 节点的状态会缓存大量超前的数据。这其实不能反映实在的场景,因为在实在的场景下,慢流是无奈被超前消费的(数据还未产生)。所以咱们在 Nexmark 中应用了 datagen source,数据间接在内存中生成,数据不落地,间接向上游节点发送。多个事件流都由繁多的数据生成器生成,所以当快流被反压时,也能克制慢流的生成,较好地反映了实在场景。
与之类似的,咱们也移除了内部 sink 的依赖,不再输入到 Kafka/Redis,而是输入到一个空 sink 中,即 sink 会抛弃收到的所有数据。
通过这种形式,咱们保障了瓶颈只会在引擎本身,从而能准确地测量出引擎之间轻微的差别。
Metrics
批处理零碎 benchmark 的 metric 通常采纳总体耗时来掂量。然而流计算零碎解决的数据是源源不断的,无奈统计 query 耗时。因而,咱们提出三个次要的 metric:吞吐、提早、CPU。Nexmark 测试框架会主动帮咱们采集 metric,并做汇总,不须要部署任何第三方的 metric 服务。
■ 吞吐
吞吐(throughput)也常被称作 TPS,形容流计算零碎每秒能解决多少条数据。因为咱们有多个事件流,所有事件流都由一个数据生成器生成,为了对立观测角度,咱们采纳数据生成器的 TPS,而非繁多事件流的 TPS。咱们将一个 query 能达到的最大吞吐,作为其吞吐指标。例如,针对 Flink 引擎,咱们通过 Flink REST API 裸露的 <source_operator_name>.numRecordsOutPerSecond metric 来获取以后吞吐量。
■ 提早
提早(Latency)形容了从数据进入流计算零碎,到它的后果被输入的工夫距离。对于窗口聚合,Yahoo Benchmark 中应用 output_system_time – window_end 作为提早指标,这其实并没有思考数据在窗口输入前的等待时间,这种计算结果也会极大地受到反压的影响,所以其计算结果是不精确的。一种更精确的计算形式应为 output_system_time – max(ingest_time)。然而在非窗口聚合,或双流 JOIN 中,提早又会有不同的计算形式。
所以提早的定义和采集在流计算零碎中有很多现实存在的问题,须要依据具体 query 具体分析,这在参考文献 [2] 中有具体的探讨,这也是咱们目前还未在 Nexmark 中实现提早 metric 的起因。
■ CPU
资源使用率是很多流计算 benchmark 中漠视的一个指标。因为在实在生产环境,咱们并不会限度流计算引擎所能应用的核数,从而给零碎更大的弹性。所以咱们引入了 CPU 使用率,作为辅助指标,即作业一共耗费了多少核。通过吞吐 /cores,能够计算出均匀每个核对于吞吐的奉献。对于过程的 CPU 使用率的采集,咱们没有应用 JVM CPU load,而是借鉴了 YARN 中的实现,通过采样 /proc/<pid>/stat 并计算取得,该形式能够取得较为实在的过程 CPU 使用率。因而咱们的 Nexmark 测试框架须要在测试开始前,先在每台机器上部署 CPU 采集过程。
Query 与 Schema
Nexmark 的业务模型基于一个实在的在线拍卖零碎。所有的 query 都基于雷同的三个数据流,三个数据流会有一个数据生成器生成,来管制他们之间的比例、数据偏斜、关联关系等等。这三个数据流别离是:
- 用户(Person):代表一个提交拍卖,或参加竞标的用户。
- 拍卖(Auction):代表一个拍卖品。
- 竞标(Bid):代表一个对拍卖品的出价。
咱们一共定义了 16 个 query,所有的 query 都应用 ANSI SQL 规范语法。基于 SQL,咱们能够更容易地扩大 query 测试集,反对更多的引擎。然而,因为 Spark 在流计算性能上的限度,大部分的 query 都无奈通过 Structured Streaming 来实现。因而咱们目前只反对测试 Flink SQL 引擎。
作业负载的配置化
咱们也反对配置调整作业的负载,包含数据生成器的吞吐量以及吞吐曲线、各个数据流之间的数据量比例、每个数据流的数据均匀大小以及数据歪斜比例等等。具体的能够参考 Source DDL 参数。
四 试验后果
咱们在阿里云的三台机器上进行了 Nexmark 针对 Flink 的基准测试。每台机器均为 ecs.i2g.2xlarge 规格,配有 Xeon 2.5 GHz CPU (8 vCores) 以及 32 GB 内存,800 GB SSD 本地磁盘。机器之间的带宽为 2 Gbps。
测试了 Flink-1.11 版本,咱们在这 3 台机器上部署了 Flink standalone 集群,由 1 个 JobManager,8 个 TaskManager(每个只有 1 slot)组成,都是 4 GB 内存。集群默认并行度为 8。开启 checkpoint 以及 exactly once 模式,checkpoint 距离 3 分钟。应用 RocksDB 状态后端。测试发现,对于有状态的 query,每次 checkpoint 的大小在 GB 级以上,所以无效地测试的大状态的场景。
Datagen source 放弃 1000 万每秒的速率生成数据,三个数据流的数据比例别离是 Bid: 92%,Auction: 6%,Person: 2%。每个 query 都先运行 3 分钟热身,之后 3 分钟采集性能指标。
运行 nexmark/bin/run_query.sh all 后,打印测试后果如下:
五 总结
咱们开发和设计 Nexmark 的初衷是为了推出一套规范的流计算 benchmark 测试集,以及测试流程。尽管目前仅反对了 Flink 引擎,但在以后也具备肯定的意义,例如:
- 推动流计算 benchmark 的倒退和标准化。
- 作为 Flink 引擎版本迭代之间的性能测试工具,甚至是日常回归工具,及时发现性能回退的问题。
- 在开发 Flink 性能优化的性能时,能够用来验证性能优化的成果。
- 局部公司可能会有 Flink 的外部版本,能够用作外部版本与开源版本之间的性能比照工具。
当然,咱们也打算继续改良和欠缺 Nexmark 测试框架,例如反对 Latency metric,反对更多的引擎,如 Spark Structured Streaming, Spark Streaming, ksqlDB, Flink DataStream 等等。也欢送有志之士一起退出奉献和扩大。
参考及援用:
[1]Pete Tucker and Kristin Tufte. “NEXMark – A Benchmark for Queries over Data Streams”. June 2010.
[2]Jeyhun Karimov and Tilmann Rabl. “Benchmarking Distributed Stream Data Processing Systems”. arXiv:1802.08496v2 [cs.DB] Jun 2019
[3]Yangjun Wang. “Stream Processing Systems Benchmark: StreamBench”. May 2016.
[4]https://github.com/yahoo/stre…
[5]https://www.ververica.com/blo…
[6]https://beam.apache.org/docum…