简介:介绍 Flink 1.12 资源管理的一些个性,包含内存治理、资源调度、扩大资源框架。
本文由社区志愿者陈政羽整顿,Apache Flink Committer、阿里巴巴技术专家宋辛童,Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽分享,次要介绍 Flink 1.12 资源管理的一些个性。内容次要分为 4 局部:
内存治理
资源调度
扩大资源框架
将来布局
一、内存治理
首先回顾 Flink 的内存模型变迁。下图右边别离为 Flink 1.10、Flink 1.11 引入的新的内存模型。只管波及的模块较多,但 80% – 90% 的用户仅需关注真正用于工作执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四局部。
其它模块大部分是 Flink 的框架内存,失常不须要调整,即便遇到问题也能够通过社区文档来解决。除此之外,“一个作业到底须要多少内存能力满足理论生产需要”也是大家不得不面临的问题,比方其余指标的性能应用、作业是否因为内存不足影响了性能,是否存在资源节约等。
针对上述内容,社区在 Flink 1.12 版本提供了一个全新的,对于 Task manager 和 Jobmanager 的 Web UI。
在新的 Web UI 中,能够间接将每一项监控指标配置值、理论应用状况对应到内存模型中进行直观的展现。在此基础上,能够更分明的理解到作业的运行状况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供反对)。通过新的 Web UI,大家能更好的理解作业的应用状况,内存治理也更不便。
1. 本地内存(Managed Memory)
Flink 托管内存实际上是 Flink 特有的一种本地内存,不受 JVM 和 GC 的治理,而是由 Flink 自行进行治理。
本地内存的特点次要体现在两方面:
- 一方面是 slot 级别的估算布局,它能够保障作业运行过程中不会因为内存不足,造成某些算子或者工作无奈运行;也不会因为预留了过多的内存没有应用造成资源节约。同时 Flink 能保障当工作运行完结时精确将内存开释,确保 Task Manager 执行新工作时有足够的内存可用。
- 另一方面,资源适应性也是托管内存很重要的个性之一,指算子对于内存的需要是动静可调整的。具备了适应性,算子就不会因为给予工作过多的内存造成资源应用上的节约,也不会因为提供的内存绝对较少导致整个作业无奈运行,使内存的使用放弃在肯定的正当范畴内。
当然,在内存调配绝对比拟少状况下,作业会受到肯定限度,例如须要通过频繁的落盘保障作业的运行,这样可能会影响性能。
以后,针对托管内存,Flink 的应用场景如下:
- RocksDB 状态后端:在流计算的场景中,每个 Slot 会应用 State 的 Operator,从而共享同一底层 的 RocksDB 缓存;
- Flink 内置算子:蕴含批处理、Table SQL、DataSet API 等算子,每个算子有独立的资源估算,不会互相共享;
- Python 过程:用户应用 PyFlink,应用 Python 语言定义 UDF 时须要启动 Python 的虚拟机过程。
2. Job Graph 编译阶段
Flink 对于 management memory 的治理次要分为两个阶段。
2.1 作业的 Job Graph 编译阶段
在这个阶段须要留神三个问题:
第一个问题是:slot 当中到底有哪些算子或者工作会同时执行。这个问题关系到在一个查问作业中如何对内存进行布局,是否还有其余的工作须要应用 management memory,从而把相应的内存留进去。在流式的作业中,这个问题是比较简单的,因为咱们须要所有的算子同时执行,能力保障上游产出的数据能被上游及时的生产掉,这个数据才可能在整个 job grep 当中流动起来。然而如果咱们是在批处理的一些场景当中,实际上咱们会存在两种数据 shuffle 的模式,
一种是 pipeline 的模式,这种模式跟流式是一样的,也就是咱们后面说到的 bounded stream 解决形式,同样须要上游和上游的算子同时运行,上游随时产出,上游随时生产。
另外一种是所谓的 batch 的 blocking 的形式,它要求上游把数据全副产出,并且落盘完结之后,上游能力开始读数据。
这两种模式会影响到哪些工作能够同时执行。目前在 Flink 当中,依据作业拓扑图中的一个边的类型 (如图上)。咱们划分出了定义的一个概念叫做 pipelined region,也就是全副都由 pipeline 的边锁连通起来的一个子图,咱们把这个子图辨认进去,用来判断哪些 task 会同时执行。
第二个问题是:slot 当中到底有哪些应用场景?咱们方才介绍了三种 manage memory 的应用场景。在这个阶段,对于流式作业,可能会呈现 Python UDF 以及 Stateful Operator。这个阶段当中咱们须要留神的是,这里并不能必定 State Operator 肯定会用到 management memory,因为这跟它的状态类型是相干的。
如果它应用了 RocksDB State Operator,是须要应用 manage memory 的;
然而如果它应用的是 Heap State Backend,则并不需要。
然而,作业在编译的阶段,其实并不知道状态的类型,这里是须要去留神的中央。
第三个问题:对于 batch 的作业,咱们除了须要分明有哪些应用场景,还须要分明一件事件,就是后面提到过 batch 的 operator。它应用 management memory 是以一种算子独享的形式,而不是以 slot 为单位去进行共享。咱们须要晓得不同的算子应该别离调配多少内存,这个事件目前是由 Flink 的打算作业来主动进行设置的。
2.2 执行阶段
第一个步骤是依据 State Backend 的类型去判断是否有 RocksDB。如上图所示,比方一个 slot,有 ABC 三个算子,B 跟 C 都用到了 Python,C 还用到了 Stateful 的 Operator。这种状况下,如果是在 heap 的状况下,咱们走下面的分支,整个 slot 当中只有一种在应用,就是 Python。之后会存在两种应用形式:
其中一个是 RocksDB State Backend,有了第一步的判断之后,第二步咱们会依据用户的配置,去决定不同应用形式之间怎么样去共享 slot 的 management memory。
在这个 Steaming 的例子当中,咱们定义的 Python 的权重是 30%,State Backend 的权重是 70%。在这样的状况下,如果只有 Python,Python 的局部天然是应用 100% 的内存(Streaming 的 Heap State Backend 分支);
而对于第二种状况(Streaming 的 RocksDB State Backend 分支),B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF,另外 C 再独享 70% 的内存用于 RocksDB State Backend。最初 Flink 会依据 Task manager 的资源配置,一个 slot 当中有多少 manager memory 来决定每个 operator 理论能够用的内存的数量。
批处理的状况跟流的状况有两个不同的中央,首先它不须要去判断 State Backend 的类型,这是一个简化;其次对于 batch 的算子,上文提到每一个算子有本人独享的资源的估算,这种状况下咱们会去依据使用率算出不同的应用场景须要多少的 Shared 之后,还要把比例进一步的细分到每个 Operator。
3. 参数配置
上方图表展现了咱们须要的是 manager,memory 大小有两种配置形式:
- 一种是绝对值的配置形式,
- 还有一种是作为 Task Manager 总内存的一个相对值的配置形式。
taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说咱们在这外面实际上是给了一个 key 冒号 value,而后逗号再加上下一组 key 冒号 value 的这样的一个数据的构造。这外面咱们目前反对两种 consumer 的 key:
- 一个是 DATAPROC,DATAPROC 既蕴含了流解决当中的状态后端 State Backend 的内存,也蕴含了批处理当中的 Batch Operator;
- 另外一种是 Python。
二、资源调度
局部资源调度相干的 Feature 是其余版本或者邮件列表外面大家询问较多的,这里咱们也做对应的介绍。
1. 最大 Slot 数
Flink 在 1.12 反对了最大 slot 数的一个限度(slotmanager.number-of-slots.max),在之前咱们也有提到过对于流式作业咱们要求所有的 operator 同时执行起来,才可能保证数据的顺畅的运行。在这种状况下,作业的并发度决定了咱们的工作须要多少个 slot 和资源去执行作业。
然而对于批处理其实并不是这样的,批处理作业往往能够有一个很大的并发度,但理论并不需要这么多的资源,批处理用很少的资源,跑完后面的工作腾出 Slot 给后续的工作应用。通过这种串行的形式去执行工作能防止 YARN/K8s 集群的资源过多的占用。目前这个参数反对在 yarn/mesos/native k8 应用。
2. TaskManager 容错
在咱们理论生产中有可能会有程序的谬误、网络的抖动、硬件的故障等问题造成 TaskManager 无奈连贯,甚至间接挂掉。咱们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种状况须要进行作业重启,在重启的过程中须要从新申请资源和重启 TaskManager 过程,这种性能耗费代价是十分昂扬的。
对于稳定性要求绝对比拟高的作业,Flink1.12 提供了一个新的 feature,可能反对在 Flink 集群当中始终持有大量的冗余的 TaskManager,这些冗余的 TaskManager 能够用于在单点故障的时候疾速的去复原,而不须要期待一个从新的资源申请的过程。
通过配置 slotmanager.redundant-taskmanager-num 能够实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于我所须要的总共的资源数量,会多出两个 TaskManager。
工作可能是绝对比拟平均的散布在下面,在可能在利用闲暇 TaskManager 的同时,也可能达到一个绝对比拟好的负载。一旦产生故障的时候,能够去先把工作疾速的调度到现有的还存活的 TaskManager 当中,而后再去进行新一轮的资源申请。目前这个参数反对在 yarn/mesos/native k8 应用。
3. 工作平铺散布
工作平铺问题次要呈现在 Flink Standalone 模式下或者是比拟旧版本的 k8s 模式部署下的。在这种模式下因为当时定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能局部 manager 放的工作很满,有的则放的比拟涣散。
在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数可能管制它,去进行一个绝对比拟平衡的调度。
留神:
第一,这个参数咱们只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是依据你作业的需要来决定起多少 task manager 的,所以是先有了需要再有 TaskManager,而不是先有 task manager,再有 slot 的调度需要。
在每次调度工作的时候,实际上只能看到以后注册上来的那一个 TaskManager,Flink 没方法全局的晓得前面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么个性关上了之后如同并没有起到一个很好的成果,这是第一件事件。
第二个须要留神的点是,这外面咱们只能决定每一个 TaskManager 上有多少闲暇 slot,然而并不可能决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个平均的散布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是齐全看不到 task 的。
三、扩大资源框架
1. 背景
近年来,随着人工智能畛域的一直倒退,深度学习模型曾经被利用到了各种各样的生产需要中,比拟典型的场景如举荐零碎,广告推送,智能危险管制。这些也是 Flink 始终以来被宽泛应用的场景,因而,反对人工智能始终以来都是 Flink 社区的长远目标之一。针对这个指标,目前曾经有了很多第三方的开源扩大工作。由阿里巴巴开源的工作次要有两个:
- 一个是 Flink AI Extended 的我的项目,是基于 Flink 的深度学习扩大框架,目前反对 TensorFlow、PyTorch 等框架的集成,它使用户能够将 TensorFlow 当做一个算子,放在 Flink 工作中。
- 另一个是 Alink,它是一个基于 Flink 的通用算法平台,外面也内置了很多罕用的机器学习算法。
以上的两个工作都是从功能性上对 Flink 进行扩大,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个工作的计算瓶颈所在。GPU 则是这个畛域被宽泛应用用来减速训练或者预测的资源。因而,反对 GPU 资源来减速计算是 Flink 在 AI 畛域的倒退过程中必不可少的性能。
2. 应用扩大资源
目前 Flink 反对用户配置的资源维度只有 CPU 与内存,而在理论应用中,不仅是 GPU,咱们还会遇到其余资源需要,如 SSD 或 RDMA 等网络减速设施。因而,咱们心愿提供一个通用的扩大资源框架,任何扩大资源都能够以插件的模式来退出这个框架,GPU 只是其中的一种扩大资源。
对于扩大资源的应用,能够形象出两个通用需要:
- 须要反对该类扩大资源的配置与调度。用户能够在配置中指明对这类扩大资源的需要,如每个 TaskManager 上须要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,须要将用户对扩大资源的需要进行转发,以保障申请到的 Container/Pod 中存在对应的扩大资源。
- 须要向算子提供运行时的扩大资源信息。用户在自定义算子中可能须要一些运行时的信息能力应用扩大资源,以 GPU 为例,算子须要晓得它外部的模型能够部署在那一块 GPU 卡上,因而,须要向算子提供这些信息。
3. 扩大资源框架应用办法
应用资源框架咱们能够分为以下这 3 个步骤:
- 首先为该扩大资源设置相干配置;
- 而后为所需的扩大资源筹备扩大资源框架中的插件;
- 最初在算子中,从 RuntimeContext 来获取扩大资源的信息并应用这些资源
3.1 配置参数
# 定义扩大资源名称,“gpu”external-resources: gpu
# 定义每个 TaskManager 所需的 GPU 数量
external-resource.gpu.amount: 1
# 定义 Yarn 或 Kubernetes 中扩大资源的配置键
external-resource.gpu.yarn.config-key: yarn.io/gpu
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# 定义插件 GPUDriver 的工厂类。external-resource.gpu.driver-factory.class:
org.apache.flink.externalresource.gpu.GPUDriverFactory
以上是应用 GPU 资源的配置示例:
- 对于任何扩大资源,用户首先须要将它的名称退出 “external-resources” 中,这个名称也会被用作该扩大资源其余相干配置的前缀来应用。示例中,咱们定义了一种名为 “gpu” 的资源。
- 在调度层,目前反对用户在 TaskManager 的粒度来配置扩大资源需要。示例中,咱们定义每个 TaskManager 上的 GPU 设施数为 1。
- 将 Flink 部署在 Kubernetes 或是 Yarn 上时,咱们须要配置扩大资源在对应的资源底座上的配置键,以便 Flink 对资源需要进行转发。示例中展现了 GPU 对应的配置。
- 如果提供了插件,则须要将插件的工厂类名放入配置中。
3.2 前置筹备
在理论应用扩大资源前,还须要做一些前置筹备工作,以 GPU 为例:
- 在 Standalone 模式下,集群管理员须要保障 GPU 资源对 TaskManager 过程可见。
- 在 Kubernetes 模式下,须要集群反对 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中装置了 GPU 对应的插件。
- 在 Yarn 模式下,GPU 调度须要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。
3.3 扩大资源框架插件
实现了对扩大资源的调度后,用户自定义算子可能还须要运行时扩大资源的信息能力应用它。扩大资源框架中的插件负责实现该信息的获取,它的接口如下:
public interface ExternalResourceDriverFactory {
/**
* 依据提供的设置创立扩大资源的 Driver
*/
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}
public interface ExternalResourceDriver {
/**
* 获取所需数量的扩大资源信息
*/
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}
ExternalResourceDriver 会在各个 TaskManager 上启动,扩大资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来取得 TaskManager 上的扩大资源信息,并将失去的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。
4. GPU 插件
Flink 目前内置了针对 GPU 资源的插件,其外部通过执行名为 Discovery Script 的脚本来获取以后环境可用的 GPU 信息,目前信息中蕴含了 GPU 设施的 Index。
Flink 提供了一个默认脚本,位于我的项目的 “plugins/external-resource-gpu/” 目录,用户也能够实现自定义的 Discovery Script 并通过配置来指定应用自定义脚本。该脚本与 GPU 插件的协定为:
- 当调用脚本时,所须要的 GPU 数量将作为第一个参数输出,之后为用户自定义参数列表。
- 若脚本执行失常,则输入 GPU Index 列表,以逗号分隔。
- 若脚本出错或执行后果不合乎预期,则脚本以非零值退出,这会导致 TaskManager 初始化失败,并在日志中打印脚本的错误信息。
Flink 提供的默认脚本是通过 “nvidia-smi” 工具来获取以后的机器中可用的 GPU 数量以及 index,并依据所须要的 GPU 数量返回对应数量的 GPU Index 列表。当无奈获取到所需数量的 GPU 时,脚本将以非零值退出。
GPU 设施的资源分为两个维度,流处理器与显存,其显存资源只反对独占应用。因而,当多个 TaskManager 运行在同一台机器上时,若一块 GPU 被多个过程应用,可能导致其显存 OOM。因而,Standalone 模式下,须要 TaskManager 级别的资源隔离机制。
默认脚本提供了 Coordination Mode 来反对单机中多个 TaskManager 过程之间的 GPU 资源隔离。该模式通过应用文件锁来实现多过程间 GPU 应用信息同步,协调同一台机器上多个 TaskManager 过程对 GPU 资源的应用。
5. 在算子中获取扩大资源信息
在用户自定义算子中,可应用在 “external-resources” 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos 接口获取对应扩大资源的信息。以 GPU 为例,失去的每个 ExternalResourceInfo 代表一块 GPU 卡,而其中蕴含名为 “index” 的字段代表该 GPU 卡的设施 Index。
public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
private static finalRESOURCE_NAME="gpu";
@Override
public String map(String value) {Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
List<String> indexes = gpuInfos.stream()
.map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList());
// Map function with GPU// ...
}
}
6. MNIST Demo
下图以 MNIST 数据集的辨认工作来演示应用 GPU 减速 Flink 作业。
MNIST 如上图所示,为手写数字图片数据集,每个图片可示意为为 28*28 的矩阵。在该工作中,咱们应用预训练好的 DNN 模型,图片输出通过一层全连贯网络失去一个 10 维向量,该向量最大元素的下标即为辨认后果。
咱们在一台领有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 过程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 性能,咱们能够保障每个 TaskManager 各应用其中一块 GPU 卡。
该工作的外围算子为图像识别函数 MNISTClassifier,外围实现如下所示
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {
@Override
public void open(Configuration parameters) {
// 获取 GPU 信息并且抉择第一块 GPU
Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName);
final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");
// 应用第一块 GPU 的 index 初始化 JCUDA 组件
JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));
JCublas.cublasInit();}
}
在 Open 办法中,从 RuntimeContext 获取以后 TaskManager 可用的 GPU,并抉择第一块来初始化 JCuda 以及 JCublas 库。
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {
@Override
public Integer map(List<Float> value) {
// 应用 Jucblas 做矩阵算法
JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,
matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);
// 取得乘法后果并得出该图所示意的数字
JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);
JCublas.cublasFree(inputPointer);
JCublas.cublasFree(outputPointer);
int result = 0;
for (int i = 0; i < DIMENSIONS.f1; ++i) {result = output[i] > output[result] ? i : result;
}
return result;
}
}
在 Map 办法中,将事后训练好的模型参数与输出矩阵放入 GPU 显存,应用 JCublas 进行 GPU 中的矩阵乘法运算,最初将后果向量从 GPU 显存中取出并失去辨认后果数字。
具体案例演示流程能够返回观看视频或者参考 Github 下面的链接入手尝试。
四、将来打算
除了上文介绍的这些曾经公布的个性外,Apache Flink 社区也正在踊跃筹备更多资源管理方面的优化个性,在将来的版本中将陆续和大家见面。
- 被动资源调度模式:托管内存使得 Flink 工作能够灵便地适配不同的 TaskManager/Slot 资源,充分利用可用资源,为计算工作提供给定资源限度下的最佳算力。但用户仍需指定计算工作的并行度,Flink 须要申请到满足该并行度数量的 TaskManager/Slot 能力顺利执行。被动资源调度将使 Flink 可能依据可用资源动静扭转并行度,在资源有余时可能 best effort 进行数据处理,同时在资源短缺时复原到指定的并行度保障解决性能。
- 细粒度资源管理:Flink 目前基于 Slot 的资源管理与调度机制,认为所有的 Slot 都具备雷同的规格。对于一些简单的规模化生产工作,往往须要将计算工作拆分成多个子图,每个子图独自应用一个 Slot 执行。当子图间的资源需要差别较大时,应用雷同规格的 Slot 往往难以满足资源效率方面的需要,特地是对于 GPU 这类老本较高的扩大资源。细粒度资源管理容许用户为作业的子图指定资源需要,Flink 会依据资源需要应用不同规格的 TaskManager/Slot 执行计算工作,从而优化资源效率。
五、总结
通过文章的介绍,置信大家对 Flink 内存治理有了更加清晰的认知。
- 首先从本地内存、Job Graph 编译阶段、执行阶段来解答每个流程的内存治理以及内存调配细节,通过新的参数配置管制 TaskManager 的内存调配;
- 而后从大家平时遇到资源调度相干问题,包含最大 Slot 数应用,如何进行 TaskManager 进行容错,工作如何通过工作平铺均摊工作资源;
- 最初在机器学习和深度学习畛域经常用到 GPU 进行减速计算,通过解释 Flink 在 1.12 版本如何应用扩大资源框架和演示 Demo,给咱们展现了资源扩大的应用。再针对资源利用率方面提出 2 个社区将来正在做的打算,包含被动资源模式和细粒度的资源管理。
原文链接
本文为阿里云原创内容,未经容许不得转载。