共计 4770 个字符,预计需要花费 12 分钟才能阅读完成。
作者: 张光辉
本文将为大家展示字节跳动公司怎么把 Storm 从 Jstorm 迁移到 Flink 的整个过程以及后续的计划。你可以借此了解字节跳动公司引入 Flink 的背景以及 Flink 集群的构建过程。字节跳动公司是如何兼容以前的 Jstorm 作业以及基于 Flink 做一个任务管理平台的呢?本文将一一为你揭开这些神秘的面纱。
本文内容如下:
- 引入 Flink 的背景
- Flink 集群的构建过程
- 构建流式管理平台
引入 Flink 的背景
下面这幅图展示的是字节跳动公司的业务场景
首先,应用层有广告,也有 AB 测,也有推送和数据仓库的一些业务。然后在使用 J storm 的过程中,增加了一层模板主要应用于 storm 的计算模型,使用的语言是 python。所以说中间相对抽象了一个 schema,跑在最下面一层 J storm 计算引擎的上面。
字节跳动公司有很多 J -storm 集群,在当时 17 年 7 月份的时候,也就是在计划迁移到 Flink 之前,J storm 集群的规模大概是下图所示的规模级别,当时已经有 5000 台机器左右了。
接下来,介绍下迁移 Flink 的整个过程。先详细地介绍一下当时 J -Storm 是怎么用的。
上面是一个 word count 的例子:左边是一个目录结构,这个目录结构在 resources 下面,里面的 Spout/Bolt 的逻辑都是一些 python 脚本写的。然后在最外层还有一个 topology_online.yaml 配置文件。
这个配置文件是用来干什么的?就是把所有的 Spout 和 Bolt 串联起来构成一个有向无关图,也就是 DAG 图。这就是使用 J storm 时的整个目录结构,大部分用户都是这样用的。右边是 Spout 和 Bolt 的逻辑,其实是抽象出来了一个函数,就在这里面写业务方面的函数,然后将 tuple_batch 也就是上游流下来的数据去做一些计算逻辑。
下面详细介绍一下配置文件的信息,其实我们有整个拓扑结构拓扑的信息,比如说作业名叫什么,作业需要多少资源,需要多少 work 数。这里面会有单个的 spout 和 Bolt 的配置信息,比如是消费的 topic 还是一些并发度?
除了这些信息还有整个这个数据流的流转,比如说 spout 的输出,输出 messsage 的消息等等。最后还有整个的 Spout 到 Bolt 之间的 shuffle 逻辑。这就是我们之前 Jstorm 的整个使用方式。最后会把整个目录结构里面的内容去解析出来,根据配置文件把整个 storm 的拓扑结构构建出来,然后提交到集群上面去跑。
使用 Jstorm 集群遇到了什么问题呢?第一个问题,因为我们当时是用使用 python 写的代码,整个集群是没有内存隔离的,job 和 work 之间是没有内存限制的。比如说在实际过程中会经常遇到一个用户,他可能代码写的有问题导致一个 work 可能占了 70G 内存,把机器的内存占了 1 /3。第二个问题就是说业务团队之间没有扩大管理,预算和审核是无头绪的。我们当时都是都是跑在一个大集群上面,然后个别业务是单独跑在一些小集群,但是我们每次都是资源不足,也没办法梳理这个预算。
第三个问题就是集群过多,运维平台化做得不太好,都是靠人来运维的。这个时候集群多了基本上是管不过来的。
第四个问题就是说我们用 python 写的代码,有些性能比较差。但是我们在 Storm 的基础上面去推广这个 Java 也比较难,因为我们部分同事实际上是不认可 Java 的,因为他觉得 java 开发速度太慢了。
我们当时想解决上面的问题,一个思路是把 Jstorm 放在 yarn 上面,直接把 Jstorm 在 yarn 上面兼容做这一套。后来因为知道阿里在用 Flink 所以去调研 Flink,发现了 Flink 的一些优势,所以想尝试用 Flink 解决存在的问题。
使用 Flink 首先第一个问题可以成功解决,因为 Flink 作业是跑在 yarn 上面的,这就解决了内存隔离的问题。然后 Yarn 也是支持队列的,我们可以根据业务去划分队列,这样我们的扩大预算审核的问题得到解决了。我们也不需要自己运维一个集群了,因为有 yarn 去管理我们的资源,这样也节省了运维成员。在此基础上还可以做一些物理隔离队列,其实物理隔离队列现在也遇到了问题。因为物理隔离队列只是说这个机器隔离了,但是相当于是机柜也没有隔离网络带宽也没有隔离,所以说即使是物理隔离队列,现在也遇到比如说和离线作业共用机柜的时候,这个机柜的出口带宽被打满的问题。针对这些问题,我们后续可能想在这个离线离线集群上面做 QOS 这种流量级别的方式来解决这个问题。
Flink 实际上是可以兼容 Storm 的,比如说之前的历史作业是可以迁移过来的,不需要维护两套计算引擎。Flink 支持一些高优先级的 API 比如说支持 SQL 以及窗口等特性包括说 checkpoint。我们头条的业务对 exactly-once 的需求不是特别的强烈。
以上就是 Flink 的优势,于是我们就决定从 J storm 往 Flink 去迁移。
Flink 集群的构建过程
在迁移的过程中,第一件事情是先把 Flink 集群建立起来。一开始肯定都是追求稳定性,比如说把离线的 yarn 集群隔离开,然后不依赖于 HDFS 也可以把 Hdfs 线上的 name node,name space 隔离出来。然后我们梳理了原来 storm 上面的作业,哪些作业属于不同的业务,然后映射到不同的队列里面去,最后把一些特殊的队列也隔离开来。这是我们准备这个 Fink 集群的时候考虑的几个点。
下面就考虑 Flink 怎么兼容 J storm,然后把它迁移过来。
我们当时 Flink 用的是 1.32 版本,因为 Flink 有 Flink-storm 这个工程,它能把 Storm 作业转化成 Flink 作业,我们就借鉴这些技术上实现了一个 Flink –jstorm。相当于把一个 J storm 的拓扑结构转化成了一个 Flink job。只做完这件事情是不够的,因为我们有一系列的外围工具需要去对齐。比如说之前提交作业的时候是通过一个脚本提交的让用户去屏蔽一些其他的参数。使用 flink 的话我们同样也是需要构建这么一个脚本,然后去提交 Flink Job,最后停止 flink Job。第三点是构建 flink job 外围工具,自动注册报警,比如说消费延迟报警,自动注册这个 Dashboard 以及一些 log service,所有的这些为外围工具都要和原来的服务去对齐。
对齐完之后,我们需要构建一个迁移脚本,迁移的过程中最困难的是资源配置这一块。因为原来 Storm 用了多少资源,Storm 怎么配,这对于迁移的用户来说,如果是第一次做肯定是不了解这些东西。因此我们写这么一个脚本,帮用户生成它 Flink 集群里面对应的资源使用情况。这些工作做完了之后,我们就开始去迁移。到现在为止,整体迁移完了,还剩下十个左右的作业没有迁移完。现在集群规模达到了大概是 6000 多台。
在迁移的过程中我们有一些其他优化,比如说 J storm 是能够支持 task 和 work 维度的重启的,Flink 这一块做得不是特别好。我们在这方面做了一些优化实现了一个 single task 和 single tm 粒度的重启,这样就解决部分作业因为 task 重启导致整个作业全部重启。
构建流式管理平台
迁移完之后,我们又构建了一个流式管理平台。这个平台是为了解决实际过程中遇到了一些问题,比如说整个机群挂了无法确定哪些作业在上面跑着,也通知不到具体的用户,有些用户作业都不知道自己提交了哪些作业。我们构建流式作业的时候目标实际上就是和其他的管理平台是一样的,比如说我们提供一些界面操作,然后提供一个版本管理,就是为了方便方便用户升级和回滚的操作,我们还提供了一站式的查问题的工具:把一些用户需要的信息都聚合在一个页面上面,防止用户不断跳来跳去以及避免不同系统之间的切换。有一些历史记录之前不管是跑在 yarn 上面还是跑到 storm 上面,我一个作业被别人 kill 到了,其实我都是不知道的。针对这个问题我们提供了一些历史操作记录的一些目标。
设计这个管理平台的时候,我们考虑到提供这么一个前端管理平台可能只是针对公司内部的一部分产品,其他的产品也做了自己的一套前端。他们可以用一个模板,根据自己的逻辑去生成一个 storm 任务。基于此,我们把整个管理平台抽象了两层:最上一层实际上相当于一个面向用户或者说是类似于前端的一个产品。中间这一层实际上是一个类似于提交作业调度任务,这一层只负责提任务,然后停任务,管理生命周期以及因为故障导致作业失败了,将作业重新拉起来。这是中间层 TSS 层做的事情。
这样,我们就可以对接到所有的前端平台。通过一个 RPC 进行 TSS 通信,就把所有的底层的服务和 Filnk 和 Yarn 还有 HDFS 这些交互的底层的逻辑完全屏蔽开来了。
接下来,用户写一个作业就比较简单了,流程如下:
第一步用户先要生成自己的一个作业模板,我们这边通过 maven 提供的脚本架去生成一些作业的 schema,这个作业执行完之后,它会把帮你把一些 porm 文件,还有一些类似于 kafkasource 这种常规的组件都帮你准备好,然后你直接在这个模板里面填自己的主要逻辑就可以了。因为我们写 Java 程序遇到最多的一个问题就是包冲突问题。所以 porm 文件帮助用户把一些可能冲突的一些 jar 包都给以 exclude 掉,这样包冲突的概率会越来越小。
我们测试作业基本上是用 IDEA 或者 local 模式去测试,也提供了一个脚本去提交作业,通过这个脚本提交到 stage 环境上面。在提交注册在平台上面去注册这个作业,然后添加一些配置信息。
下面是一个代码版本管理的界面:
把整个作业提交之后如下图所示:
提交完一个作业之后,用户可能想看作业运行的状态怎么样,我们通过四种方式去给用户展示他的作业运行状态的。
第一个是 Flink UI,也就是官方自带的 UI 用户可以去看。第二个是 Dashboard,我们展示了作业里面的 task 维度,QPS 以及 task 之间的网络 buffer,这些重要的信息汇聚到一起创建了一个 Dashboard,这样可能查问题的时候方便一些。第三个是错误日志,其实和大家的思路一样,把一个分布式的日志然后聚合起来,然后写到 ES 上面去。第四是做了一个 Jobtrace 的工具,就是我们把 Flink 里面常见的一些异常匹配出来,然后直接给用户一个 wiki 的使用指南,告诉用户比如说你的作业 OM 了需要扩大内存。只要用户的作业出现了某些问题,我们把已知的所有的异常都会匹配给用户。
下面是 ES 的 kibana:
这是我们 Jobtrace 的功能,我们把 Flink 的这些常见的异常都匹配出来,每一个异常其实对应了一个 wiki 然后去让用户去解决自己的问题。
最后分享下我们的近期规划,前面的基本做完并且趋于稳定了,但是现在又遇到了一些新的问题。比如资源使用率这个问题,因为用户提交作业的时候,用户对资源不是特别敏感就随意把一个资源提上去了,可能他明明需要两个 CPU,但是他提了四个 CPU。我们想通过一个工具能够监控到他需要多少资源,然后通知 yarn 去把这个资源给重置了。就是动态调整 job 资源,自动把资源重置。
第二个问题是优化作业重启速度。我们这边好多业务是根据流式计算的指标来监控它业务的稳定性,如果最上游重启一个作业,底下一群人收到报警说线上出现一些问题了。原因是最上游某一个作业再重启。我们想把重启时间间隔去做到最短或者是无缝重启,这是下一阶段需要去探索探索的一个问题。
第四点:Flink SQL 也刚上线,可能需要一些精力投入去推广。
最后一点,我们希望在此抽象出更多的模式作业模型来,因为我们本身是有一些比如说 kafka2ES,kafka2hdfs 这些需求,能不能把他们抽象成一个 schema,然后去对外提供一些服务。
以上就是我本次分享的主要内容,感谢 Flink 的举办者和参与者,感谢我们的同事,因为以上的分享内容是我和我们同事一起做的。