关于Flink:从-Spark-做批处理到-Flink-做流批一体

32次阅读

共计 9411 个字符,预计需要花费 24 分钟才能阅读完成。

本⽂由社区志愿者苗文婷整顿,内容起源⾃ LinkedIn 大数据高级开发工程师张晨娅在 Flink Forward Asia 2020 分享的《从 Spark 做批处理到 Flink 做流批一体》,分享的主题是如何从 Spark 做批处理到 Flink 做流批一体,在 LinkedIn 的一些摸索实践经验。次要内容为:

  1. 为什么要做流批一体?
  2. 以后行业已有的解决方案和现状,劣势和劣势
  3. 摸索生产实践场景的教训
  4. Shuflle Service 在 Spark 和 Flink 上的比照,以及 Flink 社区前面能够思考做的工作
  5. 总结

1. 为什么要做流批一体

做流批一体到底有哪些好处,尤其是在 BI/AI/ETL 的场景下。整体来看,如果能帮忙用户做到流批一体,会有以上 4 个比拟显著的好处:

  • 能够防止代码反复,复用代码外围解决逻辑

代码逻辑能完全一致是最好的,但这会有肯定的难度。但整体来讲,当初的商业逻辑越来越长,越来越简单,要求也很多,如果咱们应用不同的框架,不同的引擎,用户每次都要从新写一遍逻辑,压力很大并且难以保护。所以整体来讲,尽量避免代码反复,帮忙用户复用代码逻辑,就显得尤为重要。

  • 流批一体有两个方向

这两个方向要思考的问题很不一样,目前 Flink 做 Streaming、Spark 做 Batch 等等一些框架在批处理或流解决上都比拟成熟,都曾经产生了很多的单方面用户。当咱们想帮忙用户移到另外一个方向上时,比方一些商业需要,通常会分成两类,是先从流解决开始到批处理,还是从批处理开始到流解决。之后介绍的两个生产实践场景案例,正好对应这两个方向。

  • 缩小保护工作量

防止保护多套零碎,零碎之间的差别可能十分大,框架和引擎都不一样,会带来比拟多的问题。如果公司外部有多条 pipeline,一个实时一个离线,会造成数据不一致性,因而会在数据验证、数据准确性查问、数据存储等方面做很多工作,尽量去保护数据的一致性。

  • 学习更多

框架和引擎很多,商业逻辑既要跑实时,也要跑离线,所以,反对用户时须要学习很多货色。

2. 以后行业现状

Flink 和 Spark 都是同时反对流解决和批处理的引擎。咱们统一认为 Flink 的流解决做的比拟好,那么它的批处理能做到多好?同时,Spark 的批处理做的比拟好,那么它的流解决能不能足够帮忙用户解决现有的需要?

当初有各种各样的引擎框架,能不能在它们之上有一个对立的框架,相似于联邦解决或者是一些简略的 physical API,比方 Beam API 或者是自定义接口。

Beam 方面须要思考的问题,是它在批处理和流解决上的优化能做到多好?Beam 目前还是偏物理执行,之后的打算是咱们须要讲究的。

LinkedIn,包含其余公司,会思考做一些自定义接口的解决方案,思考有一个共通的 SQL 层,通用的 SQL 或 API 层,底下跑不同的框架引擎。这里须要思考的问题是,像 Spark、Flink 都是比拟成熟的框架了,曾经领有大量的用户群体。当咱们提出一个新的 API,一个新的解决方案,用户的接受度如何? 在公司外部应该如何保护一套新的解决方案?

3. 生产案例场景

前面内容次要聚焦在 Flink 做 batch 的成果,Flink 和 Spark 的简略比照,以及 LinkedIn 外部的一些解决方案。分享两个生产上的实例场景,一个是在机器学习特色工程生成时如何做流批一体,另一个是简单的 ETL 数据流中如何做流批一体。

3.1 案例 A – 机器学习特色工程

第一类方向,流解决 -> 批处理,归类为流批一体。

案例 A 的主体逻辑是在机器学习中做特色生成时,如何从流解决到批处理的流批一体。外围的业务逻辑就是特色转换,转化的过程和逻辑比较复杂,用它做一些标准化。

比方在 LinkedIn 的页面上输出的一些会员信息背景等,须要将这些信息提取进去标准化掉,能力进行一些举荐,帮你找一些工作等等。当会员的身份信息有更新时,会有过滤、预处理的逻辑、包含读取 Kafka 的过程,做特色转换的过程中,可能会有一些小表查问。这个逻辑是十分间接的,没有简单的 join 操作及其他的数据处理过程。

以前它的 pipeline 是实时的,须要定期从离线 pipeline 中读取补充信息来更新流。这种 backfill 对实时集群的压力是很大的,在 backfill 时,须要期待 backfill 工作起来,须要监控工作流不让实时集群宕掉。所以,用户提出能不能做离线的 backfill,不想通过实时流解决做 backfill。

以后咱们的用户是应用 Beam on Samza 做流解决,他们十分相熟 Beam API 和 Spark Dataset API,也会用 Dataset API 去做除了 backfill 之外的一些其余业务解决。

须要特别强调的是,Dataset API 很多都是间接对 Object 操作,对 type 安全性要求很高,如果倡议这些用户间接改成 SQL 或者 DataFrame 等 workflow 是不切实际的,因为他们已有的业务逻辑都是对 Object 进行间接操作和转化等。

在这个案例下,咱们能提供给用户一些计划抉择,Imperative API。看下业界提供的计划:

第一个抉择是行将要统一化的 Flink DataStream API,此前咱们在做计划评估时也有调研 Flink DataSet API(deprecated),DataStream API 能够做到对立,并且在流解决和批处理方面的反对都是比较完善的。但毛病是,毕竟是 Imperative API,可能没有较多的优化,后续应该会继续优化。能够看下 FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) 和 FLIP-134: Batch execution for the DataStream API。

第二个抉择是 Spark Dataset,也是用户一个比拟天然的抉择。能够用 Dataset API 做 Streaming,区别于 Flink 的 Dataset、DataStream API 等物理 API,它是基于 Spark Dataframe SQL engine 做一些 type safety,优化水平绝对好一些。能够看下文章 Databricks: Introducing Apache Spark Datasets 和 Spark Structured Streaming Programming Guide: Unsupported-operations。

第三个抉择是 Beam On Spark,它目前次要还是用 RDD runner,目前反对带 optimization 的 runner 还是有肯定难度的。之后会具体说下 Beam 在案例 B 中的一些 ongoing 的工作。能够看下 Beam Documentation – Using the Apache Spark Runner 和 BEAM-8470 Create a new Spark runner based on Spark Structured streaming framework。

从用户的反馈上来说,Flink 的 DataStream (DataSet) API 和 Spark 的 Dataset API 在用户 interface 上是十分靠近的。作为 Infra 工程师来说,想要帮用户解决问题,对 API 的相熟水平就比拟重要了。

然而 Beam 和 Flink、Spark 的 API 是十分不一样的,它是 Google 的一条生态系统,咱们之前也帮忙用户解决了一些问题,他们的 workflow 是在 Beam on Samza 上,他们用 p collections 或者 p transformation 写了一些业务逻辑,output、input 办法的 signature 都很不一样,咱们开发了一些轻量级 converter 帮忙用户复用已有的业务逻辑,可能更好的用在从新写的 Flink 或 Spark 作业里。

从 DAG 上来看,案例 A 是一个非常简单的业务流程,就是简略间接的对 Object 进行转换。Flink 和 Spark 在这个案例下,性能体现上是十分靠近的。

通常,咱们会用 Flink Dashboard UI 看一些异样、业务流程等,相比 Spark 来说是一个比拟显著的劣势。Spark 去查问 Driver log,查问异样是比拟麻烦的。然而 Flink 仍旧有几个须要晋升的中央:

  • History Server – 反对更丰盛的 Metrics 等

Spark History Server UI 出现的 metrics 比拟丰盛的,对用户做性能剖析的帮忙是比拟大的。Flink 做批处理的中央是否也能让 Spark 用户能看到等同的 metrics 信息量,来升高用户的开发难度,进步用户的开发效率。

  • 更好的批处理运维工具

分享一个 LinkedIn 从两三年前就在做的事件。LinkedIn 每天有 200000 的作业跑在集群上,须要更好的工具反对批处理用户运维本人的作业,咱们提供了 Dr. Elephant 和 GridBench 来帮忙用户调试和运维本人的作业。

Dr. Elephant 已开源,能帮忙用户更好的调试作业,发现问题并提供倡议。另外,从测试集群到生产集群之前,会依据 Dr. Elephant 生成的报告里评估后果的分数来决定是否容许投产。

GridBench 次要是做一些数据统计分析,包含 CPU 的办法热点剖析等,帮忙用户优化晋升本人的作业。GridBench 后续也有打算开源,能够反对各种引擎框架,包含能够把 Flink 加进来,Flink job 能够用 GridBench 更好的做评估。GridBench Talk: Project Optimum: Spark Performance at LinkedIn Scale。

用户不仅能够看到 GridBench 生成的报告,Dr. Elephant 生成的报告,也能够通过命令行看到 job 的一些最根本信息,利用 CPU 工夫、资源耗费等,还能够对不同 Spark job 和 Flink job 之间进行比照剖析。

以上就是 Flink 批处理须要晋升的两块中央。

3.2 案例 B – 简单的 ETL 数据流

第二类方向,批处理 -> 流解决,归类为流批一体。

ETL 数据流的外围逻辑绝对简单一些,比方包含 session window 聚合窗口,每个小时计算一次页面的用户浏览量,分不同的作业,两头共享 metadata table 中的 page key,第一个作业处理 00 工夫点,第二个作业处理 01 工夫点,做一些 sessionize 的操作,最初输入后果,分 open session、close session,以此来做增量解决每个小时的数据。

这个 workflow 原先是通过 Spark SQL 做的离线增量解决,是纯离线的增量解决。当用户想把作业移到线上做一些实时处理,须要从新搭建一个比方 Beam On Samza 的实时的 workflow,在搭建过程中咱们和用户有十分严密的分割和沟通,用户是遇到十分多的问题的,包含整个开发逻辑的复用,确保两条业务逻辑产生雷同的后果,以及数据最终存储的中央等等,花了很长时间迁徙,最终成果是不太好的。

另外,用户的作业逻辑里同时用 Hive 和 Spark 写了十分多很大很简单的 UDF,这块迁徙也是十分大的工作量。用户对 Spark SQL 和 Spark DataFrame API 是比拟相熟的。

上图中的彩色实线是实时处理的过程,灰色箭头次要是批处理的过程,相当于是一个 Lambda 的构造。

针对案例 B,作业中包含很多 join 和 session window,他们之前也是用 Spark SQL 开发作业的。很显著我 们要从 Declartive API 动手,以后提供了 3 种计划:

第一个抉择是 Flink Table API/SQL,流解决批处理都能够做,同样的 SQL,性能反对很全面,流解决和批处理也都有优化。能够看下文章 Alibaba Cloud Blog: What’s All Involved with Blink Merging with Apache Flink? 和 FLINK-11439 INSERT INTO flink_sql SELECT * FROM blink_sql。

第二个抉择是 Spark DataFrame API/SQL,也是能够用雷同的 interface 做批处理和流解决,然而 Spark 的流解决反对力度还是不够的。能够看下文章 Databricks Blog: Deep Dive into Spark SQL’s Catalyst Optimizer 和 Databricks Blog: Project Tungsten: Bringing Apache Spark Closer to Bare Metal。

第三个抉择是 Beam Schema Aware API/SQL,Beam 更多的是物理的 API,在 Schema Aware API/SQL 上目前都在发展比拟晚期的工作,暂不思考。所以,之后的次要剖析后果和教训都是从 Flink Table API/SQL 和 Spark DataFrame API/SQL 的之间的比照得进去的。能够看下文章 Beam Design Document – Schema-Aware PCollections 和 Beam User Guide – Beam SQL overview。

从用户的角度来说,Flink Table API/SQL 和 Spark DataFrame API/SQL 是十分靠近的,有一些比拟小的差异,比方 keywords、rules、join 具体怎么写等等,也会给用户带来肯定的困扰,会狐疑本人是不是用错了。

Flink 和 Spark 都很好的集成了 Hive,比方 HIve UDF 复用等,对案例 B 中的 UDF 迁徙,加重了一半的迁徙压力。

Flink 在 pipeline 模式下的性能是显著优于 Spark 的,可想而知,要不要落盘对性能影响必定是比拟大的,如果须要大量落盘,每个 stage 都要把数据落到磁盘上,再从新读出来,必定是要比不落盘的 pipeline 模式的解决性能要差的。pipeline 比拟适宜短小的解决,在 20 分钟 40 分钟还是有比拟大的劣势的,如果再长的 pipeline 的容错性必定不能和 batch 模式相比。Spark 的 batch 性能还是要比 Flink 好一些的。这一块须要依据本人公司外部的案例进行评估。

Flink 对 window 的反对显著比其余引擎要丰盛的多,比方 session window,用户用起来十分不便。咱们用户为了实现 session window,特意写了十分多的 UDF,包含做增量解决,把 session 全副 build 起来,把 record 拿进去做解决等等。当初间接用 session window operator,省了大量的开发耗费。同时 group 聚合等 window 操作也都是流批同时反对的。

Session Window:

// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
    
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on $"proctime" as $"w")

Slide Window:

// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
    
// Sliding Processing-time Window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")

// Sliding Row-count Window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")

UDF 是在引擎框架之间迁徙时最大的阻碍。如果 UDF 是用 Hive 写的,那是不便迁徙的,因为不论是 Flink 还是 Spark 对 Hive UDF 的反对都是很好的,但如果 UDF 是用 Flink 或者 Spark 写的,迁徙到任何一个引擎框架,都会遇到十分大的问题,比方迁徙到 Presto 做 OLAP 近实时查问。

为了实现 UDF 的复用,咱们 LinkedIn 在外部开发了一个 transport 我的项目,曾经开源至 github 上, 能够看下 LinkedIn 发表的博客:Transport: Towards Logical Independence Using Translatable Portable UDFs。

transport 给所有引擎框架提供一个面向用户的 User API,提供通用的函数开发接口,底下主动生成基于不同引擎框架的 UDF,比方 Presto、Hive、Spark、Flink 等。

用一个共通的 UDF API 买通所有的引擎框架,能让用户复用本人的业务逻辑。用户能够很容易的上手应用,比方如下用户开发一个 MapFromTwoArraysFunction :

public class MapFromTwoArraysFunction extends StdUDF2<StdArray,StdArray,StdMap>{

    private StdType _mapType;
    
    @Override
    public List<String> getInputParameterSignatures(){
        return ImmutableList.of("array[K]",
            "array[V]"
        );
    }
    
    @Override
    public String getOutputParameterSignature(){return "map(K,V)";
    }
}
@Override
public void init(StdFactory stdFactory){super.init(stdFactory);
}
@Override
public StdMap eval(StdArray a1, StdArray a2){if(a1.size() != a2.size()) {return null;}
    StdMap map = getStdFactory().createMap(_mapType);
    for(int i = 0; i < a1.size; i++) {map.put(a1.get(i), a2.get(i));
    }
    return map;
}

解决用户的 SQL 迁徙问题,用户之前是用 Spark SQL 开发的作业,之后想应用流批一体,改成 Flink SQL。目前的引擎框架还是比拟多的,LinkedIn 开发出一个 coral 的解决方案,已在 github 上开源,在 facebook 上也做了一些 talk,包含和 transport UDF 一起给用户提供一个隔离层使用户能够更好的做到跨引擎的迁徙,复用本人的业务逻辑。

看下 coral 的执行流程,首先作业脚本中定义 相熟的 ASCII SQL 和 table 的属性等,之后会生成一个 Coral IR 树状构造,最初翻译成各个引擎的 physical plan。

在案例 B 剖析中,流批对立,在集群业务量特地大的状况下,用户对批处理的性能、稳定性、成功率等是非常重视的。其中 Shuffle Service,对批处理性能影响比拟大。

4.Shuffle Service 在 Spark 和 Flink 上的比照

In-memory Shuffle,Spark 和 Flink 都反对,比拟快,但不反对可扩大。

Hash-based Shuffle,Spark 和 Flink 都反对,相比 In-memory Shuffle,容错性反对的更好一些,但同样不反对可扩大。

Sort-based Shuffle,对大的 Shuffle 反对可扩大,从磁盘读上来一点一点 Sort match 好再读回去,在 FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink 中也曾经反对。

External Shuffle Service,在集群十分忙碌,比方在做动静资源调度时,外挂服务就会十分重要,对 Shuffle 的性能和资源依赖有更好的隔离,隔离之后就能够更好的去调度资源。FLINK-11805 A Common External Shuffle Service Framework 目前处于 reopen 状态。

Disaggregate Shuffle,大数据畛域都提倡 Cloud Native 云原生,计算存储拆散在 Shuffle Service 的设计上也是要思考的。FLINK-10653 Introduce Pluggable Shuffle Service Architecture 引入了可插拔的 Shuffle Service 架构。

Spark 对 Shuffle Service 做了一个比拟大的晋升,这个工作也是由 LinkedIn 主导的 magnet 我的项目,造成了一篇名称为 introducing-magnet 的论文 (Magnet: A scalable and performant shuffle architecture for Apache Spark),收录到了 LinkedIn blog 2020 里。magnet 很显著的晋升了磁盘读写的效率,从比拟小的 random range,到比拟大的程序读,也会做一些 merging,而不是随便的随机读取 shuffle data,防止 random IO 的一些问题。

通过 Magent Shuffle Service 缓解了 Shuffle 稳定性和可扩展性方面的问题。在此之前,咱们发现了很多 Shuffle 方面的问题,比方 Job failure 等等十分高。如果想用 Flink 做批处理,帮忙到以前用 Spark 做批处理的用户,在 Shuffle 上的确要花更大功夫。

  • 在 Shuffle 可用性上,会采纳 best-effort 形式去推 shuffle blocks,疏忽一些大的 block,保障最终的一致性和准确性。
  • 为 shuffle 长期数据生成一个正本,确保准确性。

如果 push 过程特地慢,会有提前终止技术。

Magent Shuffle 相比 Vanilla Shuffle,读取 Shuffle data 的等待时间缩较少了简直 100%,task 执行工夫缩短了简直 50%,端到端的工作时长也缩短了简直 30%。

5. 总结

  • LinkedIn 十分认可和开心看到 Flink 在流解决和批处理上的显著劣势,做的更加对立,也在继续优化中。
  • Flink 批处理能力有待晋升,如 history server,metrics,调试。用户在开发的时候,须要从用户社区看一些解决方案,整个生态要搭建起来,用户能力不便的用起来。
  • Flink 须要对 shuffle service 和大集群离线工作流投入更多的精力,确保 workflow 的成功率,如果规模大起来之后,如何提供更好的用户反对和对集群进行衰弱监控等。
  • 随着越来越多的框架引擎呈现,最好能给到用户一个更加对立的 interface,这一块的挑战是比拟大的,包含开发和运维方面,依据 LinkedIn 的教训,还是看到了很多问题的,并不是通过一个繁多的解决方案,就能囊括所有的用户应用场景,哪怕是一些 function 或者 expression,也很难齐全笼罩到。像 coral、transport UDF。

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群;

第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0