Flink在饿了么的应用与实践

32次阅读

共计 6993 个字符,预计需要花费 18 分钟才能阅读完成。

本文作者:易伟平(饿了么)

整理:姬平(阿里巴巴实时计算部)

本文将为大家展示饿了么大数据平台在实时计算方面所做的工作,以及计算引擎的演变之路,你可以借此了解 Storm、Spark、Flink 的优缺点。如何选择一个合适的实时计算引擎?Flink 凭借何种优势成为饿了么首选?本文将带你一一解开谜题。

平台现状

下面是目前饿了么平台现状架构图:


来源于多个数据源的数据写到 kafka 里,计算引擎主要是 Storm , Spark 和 Flink,计算引擎出来的结果数据再落地到各种存储上。

目前 Storm 任务大概有 100 多个,Spark 任务有 50 个左右,Flink 暂时还比较少。

目前我们集群规模每天数据量有 60TB,计算次数有 1000000000,节点有 400 个。这里要提一下,Spark 和 Flink 都是 on yarn 的,其中 Flink onyarn 主要是用作任务间 jobmanager 隔离,Storm 是 standalone 模式。

应用场景

1. 一致性语义

在讲述我们应用场景之前,先强调实时计算一个重要概念,一致性语义:

1) at-most-once: 即 fire and forget,我们通常写一个 java 的应用,不去考虑源头的 offset 管理,也不去考虑下游的幂等性的话,就是简单的 at-most-once,数据来了,不管中间状态怎样,写数据的状态怎样,也没有 ack 机制。

2) at-least-once: 重发机制,重发数据保证每条数据至少处理一次。

3) exactly-once: 使用粗 checkpoint 粒度控制来实现 exactly-once,我们讲的 exactly-once 大多数指计算引擎内的 exactly-once,即每一步的 operator 内部的状态是否可以重放;上一次的 job 如果挂了,能否从上一次的状态顺利恢复,没有涉及到输出到 sink 的幂等性概念。

4) at-least-one + idempotent = exactly-one: 如果我们能保证说下游有幂等性的操作,比如基于 mysql 实现 update on duplicate key;或者你用 es, cassandra 之类的话,可以通过主键 key 去实现 upset 的语义, 保证 at-least-once 的同时,再加上幂等性就是 exactly-once。

2. Storm

饿了么早期都是使用 Storm,16 年之前还是 Storm,17 年才开始有 Sparkstreaming,Structed-streaming。Storm 用的比较早,主要有下面几个概念:

1) 数据是 tuple-based

2) 毫秒级延迟

3) 主要支持 java,现在利用 apache beam 也支持 python 和 go。

4) Sql 的功能还不完备,我们自己内部封装了 typhon,用户只需要扩展我们的一些接口,就可以使用很多主要的功能;flux 是 Storm 的一个比较好的工具,只需要写一个 yaml 文件,就可以描述一个 Storm 任务,某种程度上说满足了一些需求,但还是要求用户是会写 java 的工程师,数据分析师就使用不了。

★ 2.1 总结

1) 易用性:因为使用门槛高,从而限制了它的推广。

2)StateBackend:更多的需要外部存储,比如 redis 之类的 kv 存储。

3) 资源分配方面:用 worker 和 slot 提前设定的方式,另外由于优化点做的较少,引擎吞吐量相对比较低一点。

3. Sparkstreaming

有一天有个业务方过来提需求说 我们能不能写个 sql,几分钟内就可以发布一个实时计算任务。于是我们开始做 Sparkstreaming。它的主要概念如下:

1) Micro-batch:需要提前设定一个窗口,然后在窗口内处理数据。

2) 延迟是秒级级别,比较好的情况是 500ms 左右。

3) 开发语言是 java 和 scala。

4) Streaming SQL,主要是我们的工作,我们希望提供 Streaming SQL 的平台。

特点:

1) Spark 生态和 SparkSQL: 这是 Spark 比较好的地方,技术栈是统一的,SQL,图计算,machine learning 的包都是可以互调的。因为它先做的是批处理,和 Flink 不一样,所以它天然的实时和离线的 api 是统一的。

2) Checkpointon hdfs。

3) On Yarn:Spark 是属于 hadoop 生态体系,和 yarn 集成度高。

4) 高吞吐:因为它是 micro-batch 的方式,吞吐也是比较高的。

下面给大家大致展示一下我们平台用户快速发布一个实时任务的操作页面,它需要哪些步骤。我们这里不是写 DDL 和 DML 语句,而是 UI 展示页面的方式。

页面里面会让用户选一些必要的参数,首先会选哪一个 kafka 集群,每个分区消费多少,反压也是默认开启的。消费位置需要让用户每次去指定,有可能用户下一次重写实时任务的时候,可以根据业务需求去选择 offset 消费点。

中间就是让用户描述 pipeline。SQL 就是 kafka 的多个 topic,输出选择一个输出表,SQL 把上面消费的 kafka DStream 注册成表,然后写一串 pipeline,最后我们帮用户封装了一些对外 sink (刚刚提到的各种存储都支持,如果存储能实现 upsert 语义的话,我们都是支持了的 )。

3.1 MultiStream-Join

虽然刚刚满足一般无状态批次内的计算要求,但就有用户想说,我想做流的 join 怎么办,早期的 Spark1.5 可以参考 Spark-streamingsql 这个开源项目把 DStream 注册为一个表,然后对这个表做 join 的操作,但这只支持 1.5 之前的版本,Spark2.0 推出 structured streaming 之后项目就废弃了。我们有一个 tricky 的方式:

让 Sparkstreaming 去消费多个 topic,但是我根据一些条件把消费的 DStream 里面的每个批次 RDD 转化为 DataFrame,这样就可以注册为一张表,根据特定的条件,切分为两张表,就可以简单的做个 join,这个 join 的问题完全依赖于本次消费的数据,它们 join 的条件是不可控的,是比较 tricky 的方式。比如说下面这个例子,消费两个 topic,然后简单通过 filer 条件,拆成两个表,然后就可以做个两张表的 join,但它本质是一个流。

3.2 Exactly-once

exactly-once 需要特别注意一个点:

我们必须要求数据 sink 到外部存储后,offset 才能 commit,不管是到 zookeeper,还是 mysql 里面,你最好保证它在一个 transaction 里面,而且必须在输出到外部存储(这里最好保证一个 upsert 语义,根据 unique key 来实现 upset 语义)之后,然后这边源头 driver 再根据存储的 offeset 去产生 kafka RDD,executor 再根据 kafka 每个分区的 offset 去消费数据。如果满足这些条件,就可以实现端到端的 exactly-once 这是一个大前提。

3.3 总结

1) Stateful Processing SQL (<2.x mapWithState、updateStateByKey): 我们要实现跨批次带状态的计算的话,在 1.X 版本,我们通过这两个接口去做,但还是需要把这个状态存到 hdfs 或者外部去,实现起来比较麻烦一点。

2) Real Multi-Stream Join: 没办法实现真正的多个流 join 的语义。

3) End-To-End Exactly-Once Semantics: 它的端到端的 exactly-once 语义实现起来比较麻烦,需要 sink 到外部存储后还需要手动的在事务里面提交 offset。

4. STRUCTURED STREAMING

我们调研然后并去使用了 Spark2.X 之后带状态的增量计算。下面这个图是官方网站的:


所有的流计算都参照了 Google 的 data flow,里面有个重要的概念:数据的 processing time 和 event time,即数据的处理时间和真正的发生时间有个 gap。于是流计算领域还有个 watermark,当前进来的事件水位需要 watermark 来维持,watermark 可以指定时间 delay 的范围,在延迟窗口之外的数据是可以丢弃的,在业务上晚到的数据也是没有意义的。

下面是 structured streaming 的架构图:


这里面就是把刚才 sparkstreaming 讲 exactly-once 的步骤 1,2,3 都实现了,它本质上还是分批的 batch 方式,offset 自己维护,状态存储用的 hdfs,对外的 sink 没有做类似的幂等操作,也没有写完之后再去 commit offset,它只是再保证容错的同时去实现内部引擎的 exactly-once。

4.1 特点

1) Stateful Processing SQL&DSL: 可以满足带状态的流计算

2) Real Multi-Stream Join: 可以通过 Spark2.3 实现多个流的 join,多个流的 join 做法和 Flink 类似,你需要先定义两个流的条件 (主要是时间作为一个条件),比如说有两个 topic 的流进来,然后你希望通过某一个具体的 schema 中某个字段(通常是 event time)来限定需要 buffer 的数据,这样可以实现真正意义上的流的 join。

3)比较容易实现端到端的 exactly-once 的语义,只需要扩展 sink 的接口支持幂等操作是可以实现 exactly-once 的。

特别说一下,structured streaming 和原生的 streaming 的 API 有一点区别,它创建表的 Dataframe 的时候,是需要指定表的 schema 的,意味着你需要提前指定 schema。另外它的 watermark 是不支持 SQL 的,于是我们加了一个扩展,实现完全写 SQL,可以从左边到右边的转换(下图),我们希望用户不止是程序员,也希望不会写程序的数据分析师等同学也能用到。

4.2 总结

1) Trigger(Processing Time、Continuous):2.3 之前主要基于 processing Time,每个批次的数据处理完了立马触发下一批次的计算。2.3 推出了 record by record 的持续处理的 trigger。

2) Continuous Processing (Only Map-Like Operations): 目前它只支持 map like 的操作,同时 sql 的支持度也有些限制。

3) LowEnd-To-End Latency With Exactly-Once Guarantees: 端到端的 exactly-once 的保证需要自己做一些额外的扩展,我们发现 kafka0.11 版本提供了事务的功能,是可以从基于这方面考虑从而去实现从 source 到引擎再到 sink,真正意义上的端到端的 exactly-once。

4) CEP(Drools): 我们发现有业务方需要提供 CEP 这样复杂事件处理的功能,目前我们的语法无法直接支持,我们让用户使用规则引擎 Drools,然后跑在每个 executor 上面,依靠规则引擎功能去实现 CEP。

于是基于以上几个 Spark Structured Streaming 的特点和缺点,我们考虑使用 Flink 来做这些事情。

5.Flink

Flink 目标是对标 Spark,流这块是领先比较多,它野心也比较大,图计算,机器学习等它都有,底层也是支持 yarn,tez 等。对于社区用的比较多的存储,Flink 社区官方都支持比较好,相对来说。

Flink 的框架图:

Flink 中的 JobManager,相当于 Spark 的 Driver 角色,TaskManger 相当于 Executor,里面的 Task 也有点类似 Spark 的那些 Task。不过 Flink 用的 RPC 是 akka,同时 Flink Core 自定义了内存序列化框架,另外 Task 无需像 Spark 每个 Stage 的 Task 必须相互等待而是处理完后即往下游发送数据。

Flink binary data 处理 operator:

Spark 的序列化用户一般会使用 kryo 或者 java 默认的序列化,同时也有 Tungsten 项目对 Spark 程序做一 JVM 层面以及代码生成方面的优化。相对于 Spark,Flink 自己实现了基于内存的序列化框架,里面维护着 key 和 pointer 的概念,它的 key 是连续存储,在 CPU 层面会做一些优化,cache miss 概率极低。比较和排序的时候不需要比较真正的数据,先通过这个 key 比较,只有当它相等的时候,才会从内存中把这个数据反序列化出来,再去对比具体的数据,这是个不错的性能优化点。

Flink Task Chain:

Task 中 operator chain,是比较好的概念。如果上下游数据分布不需要重新 shuffle 的话,比如图中 source 是 kafka source,后面跟的 map 只是一个简单的数据 filter,我们把它放在一个线程里面,就可以减少线程上下文切换的代价。

并行度概念

比如说这里面会有 5 个 Task,就会有几个并发线程去跑,chain 起来的话放在一个线程去跑就可以提升数据传输性能。Spark 是黑盒的,每个 operator 无法设并发度,而 Flink 可以对每个 operator 设并发度,这样可以更灵活一点,作业运行起来对资源利用率也更高一点。

Spark 一般通过 Spark.default.parallelism 来调整并行度,有 shuffle 操作的话,并行度一般是通 Spark.sql.shuffle.partitions 参数来调整,实时计算的话其实应该调小一点,比如我们生产中和 kafka 的 partition 数调的差不多,batch 在生产上会调得大一点,我们设为 1000,左边的图我们设并发度为 2,最大是 10,这样首先分 2 个并发去跑,另外根据 key 做一个分组的概念,最大分为 10 组,就可以做到把数据尽量的打散。

State & Checkpoint

因为 Flink 的数据是一条条过来处理,所以 Flink 中的每条数据处理完了立马发给下游,而不像 spark,需要等该 operator 所在的 stage 所有的 task 都完成了再往下发。

Flink 有粗粒度的 checkpoint 机制,以非常小的代价为每个元素赋予一个 snapshot 概念,只有当属于本次 snapshot 的所有数据都进来后才会触发计算,计算完后,才把 buffer 数据往下发,目前 Flink sql 没有提供控制 buffer timeout 的接口,即我的数据要 buffer 多久才往下发。可以在构建 Flink context 时,指定 buffer timeout 为 0,处理完的数据才会立马发下去,不需要等达到一定阈值后再往下发。

Backend 默认是维护在 jobmanager 内存,我们更多使用的的是写到 hdfs 上,每个 operator 的状态写到 rocksdb 上,然后异步周期增量同步到外部存储。

容错

图中左半部分的红色节点发生了 failover,如果是 at-least-once,则其最上游把数据重发一次就好;但如果是 exactly-once,则需要每个计算节点从上一次失败的时机重放。

Exactly Once Two-Phase Commit

Flink1.4 之后有两阶段提交来支持 exactly-once。它的概念是从上游 kafka 消费数据后,每一步都会发起一次投票,来记录状态,通过 checkpoint 的屏障来处理标记,只有最后再写到 kafka(0.11 之后的版本),只有最后完成之后,才会把每一步的状态让 jobmanager 中的 cordinator 去通知可以固化下来,这样实现 exactly-once。

Savepoints

还有一点 Flink 比较好的就是,基于它的 checkpoint 来实现 savepoint 功能。业务方需要每个应用恢复节点不一样,希望恢复到的版本也是可以指定的,这是比较好的。这个 savepoint 不只是数据的恢复,也有计算状态的恢复。

特点:

1) Trigger (Processing Time、Event Time、IngestionTime): 对比下,Flink 支持的流式语义更丰富,不仅支持 Processing Time,也支持 Event timeIngestion Time

2)Continuous Processing & Window: 支持纯意义上的持续处理,record by record 的,window 也比 Spark 处理的好。

3) Low End-To-End Latency With Exactly-Once Guarantees: 因为有两阶段提交,用户是可以选择在牺牲一定吞吐量的情况下,根据业务需求情况来调整来保证端到端的 exactly-once。

4) CEP: 支持得好。

5) Savepoints: 可以根据业务的需求做一些版本控制。

也有做的还不好的:

1)SQL (Syntax Function、Parallelism):SQL 功能还不是很完备,大部分用户是从 hive 迁移过来,Spark 支持 hive 覆盖率达到 99% 以上。SQL 函数不支持,目前还无法对单个 operator 做并行度的设置。

2) ML、Graph 等 : 机器学习,图计算等其他领域比 Spark 要弱一点,但社区也在着力持续改进这个问题。

后续规划

因为现在饿了么已经属于阿里的一员,后续会更多地使用 Flink,也期待用到 Blink。

正文完
 0