探究-flink111-Application-模式

34次阅读

共计 4593 个字符,预计需要花费 12 分钟才能阅读完成。

随着流式计算的衰亡,实时剖析成为古代商业的利器。越来越多的平台和公司基于 Apache Flink 构建他们的实时计算平台,并 saas 化。

这些平台旨在通过简化利用的提交来升高最终用户的应用累赘。通常的做法是,会提供一个诸如治理平台的 web 程序,不便使用者提交利用,并且该平台集成了一些权限,监控等内容。这个治理平台我能够叫作部署服务。

然而当初这些平台遇到一个大问题是部署服务是一个耗费资源比拟大的服务,并且很难计算出理论资源限度。比方,如果咱们取负载的平均值,则可能导致部署服务的资源实在所需的值远远大于限度值,最坏的状况是在肯定工夫影响所有的线上利用。然而如果咱们将取负载的最大值,又会造成很多不必要的节约。基于此,Flink 1.11 引入了另外一种部署选项 Application Mode,该模式容许更加轻量级,可扩大的利用提交过程,将之前客户端的利用部署能力平均扩散到集群的每个节点上。

为了了解这个问题以及 Application Mode 是如何解决这个问题,咱们将在下文介绍以后 flink 中利用执行的模式。

Flink 中的利用执行

Flink 中利用的执行会波及到三局部:_Client,JobManager 和 TaskManagers。_Client 负责提交利用到集群,JobManager 负责利用执行期间一些必要的记录工作,TaskManager 负责具体的利用执行。具体的架构图如下:

以后部署模式

在引入 Application Mode(Flink1.11) 之前,Flink 反对 Session 和 Per-Job 两种 mode,这两种有不同的集群生命周期和资源隔离。

Session 模式

Session 模式假设曾经存在一个集群,并任何的提交的利用都在该集群里执行。因而会导致资源的竞争。该模式的劣势是你无需为每一个提交的工作破费精力去合成集群。然而,如果 Job 异样或是 TaskManager 宕掉,那么该 TaskManager 运行的其余 Job 都会失败。除了影响到工作,也意味着潜在须要更多的复原操作,重启所有的 Job,会并发拜访文件系统,会导致该文件系统对其余服务不可用。此外,单集群运行多个 Job,意味着 JobManager 更大的负载。这种模式适宜启动提早十分重要的短期作业。

Per-Job 模式

在 Per-Job 模式下,集群管理器框架(例如 YARN 或 Kubernetes)用于为每个提交的 Job 启动一个 Flink 集群。Job 实现后,集群将敞开,所有残留的资源(例如文件)也将被革除。此模式能够更好地隔离资源,因为行为异样的 Job 不会影响任何其余 Job。另外,因为每个应用程序都有其本人的 JobManager,因而它将记录的负载扩散到多个实体中。思考到后面提到的 Session 模式的资源隔离问题,Per-Job 模式适宜长期运行的 Job,这些 Job 能够承受启动提早的减少以反对弹性。

总而言之,在 Session 模式下,集群生命周期独立于集群上运行的任何 Job,并且集群上运行的所有 Job 共享其资源。Per-Job 模式抉择为每个提交的 Job 承当拆分集群的费用,以提供更好的资源隔离保障,因为资源不会在 Job 之间共享。在这种状况下,集群的生命周期将与 job 的生命周期绑定在一起。

利用提交

Flink 利用的执行蕴含两个阶段:

  • pre-flight: 在 main() 办法调用之后开始。
  • runtime: 一旦用户代码调用 execute() 就会触发该阶段。

main()办法应用 Flink 的 API(DataStream API,Table API,DataSet API)之一结构用户程序。当 main() 办法调用 env.execute() 时,用户定义的 pipeline 将转换为 Flink 运行时能够了解的模式,称为job graph,并将其传送到集群中。

只管有一些不同,然而 对于 Session 模式 和 Per-Job 模式,pre-flight 阶段都是在客户端实现的。

对于那些在本人本地计算机上提交工作的场景(本地计算机蕴含了所有运行 Job 所需的依赖),这通常不是问题。然而,对于通过诸如部署服务之类的近程进行提交的场景,此过程包含:

  • 下载利用所需的依赖
  • 执行 main() 办法提取 job graph
  • 将依赖和 job graph 传输到集群
  • 有可能须要期待后果

这样客户端大量耗费资源,因为它可能须要大量的网络带宽来下载依赖项并将二进制文件运送到集群,并且须要 CPU 周期来执行 main() 办法。随着更多用户共享同一客户端,此问题会更加显著。

红色,蓝色和绿色代表 3 个应用程序,每个应用程序三个并发。彩色矩形代表不同的过程:TaskManagers,JobManagers 和 Deployer(集中式部署服务)。并且咱们假如在所有状况下都只有一个 Deployer 过程。黑白三角形示意提交过程的负载,而黑白矩形示意 TaskManager 和 JobManager 过程的负载。如图所示,不论是 per-job 还是 session 模式,部署程序承当雷同的负载。它们的区别在于 Job 的调配和 JobManager 的负载。在 session 模式下,集群中的所有作业只有一个 JobManager,而在 per-job 模式下,每个 Job 都有一个 JobManager。另外,在 session 模式下的 Job 被随机调配给 TaskManager,而在 per-job 模式下,每个 TaskManager 只有单个 Job。

Application 模式

Application 模式 尝试去将 per-job 模式的资源隔离性和轻量级,可扩大的利用提交过程相结合。为了实现这个目标,它会每个 Job 创立一个集群,然而 利用的 main() 将被在 JobManager 执行。

每个应用程序创立一个集群,能够看作创立仅在特定应用程序的 Job 之间共享的 session 集群,并在应用程序实现时销毁。通过这种架构,Application 模式能够提供与 per-job 模式雷同的资源隔离和负载平衡保障,但前提是保障一个残缺应用程序的粒度。显然,属于同一应用程序的 Job 应该被关联起来,并视为一个单元。

在 JobManager 中执行 main()办法,更大大加重客户端的资源耗费。更进一步讲,因为每个应用程序有一个 JobManager,因而能够更均匀地扩散网络负载。上图对此进行了阐明,在该图中,这次客户端负载已转移到每个应用程序的 JobManager。

在 Application 模式下,与其余模式不一样的是,main() 办法在集群上而不是在客户端执行。这可能会对您的代码产生影响,例如,您必须应用应用程序的 JobManager 能够拜访应用 registerCachedFile()在环境中注册的任何门路。

与 per-job 模式相比,Application 模式容许提交由多个 Job 组成的应用程序。Job 执行的程序不受部署模式的影响,但受启动 Job 的调用的影响。应用阻塞的 execute()办法,将是一个程序执行的成果,后果就是 ” 下一个 ”Job 的执行被推延到“该”Job 实现为止。相同,一旦提交以后作业,非阻塞 executeAsync() 办法将立刻持续提交“下一个”Job。

缩小网络需要

如上所述,通过在 JobManager 上执行应用程序的 main() 办法,Application 模式能够节俭很多提交利用所需的资源。然而仍有改良的空间。

专一于 YARN,因为社区对于 yarn 的优化反对更全面。即便应用 Application 模式,依然须要客户端将用户 jar 发送到 JobManager。此外,对于每个应用程序,客户端都必须将“flink-dist”门路输送到集群,该目录蕴含框架自身的二进制文件,包含 flink-dist.jarlib/plugin/ 目录。这两个能够占用客户端大量的带宽。此外,在每个提交中传送雷同的flink-dist 二进制文件不仅节约带宽,而且节约存储空间,只需容许应用程序共享雷同的二进制文件就能够缓解。

对于 Flink1.11 , 引入了上面的两个选项可供大家应用:

  1. 指定目录的近程门路,YARN 能够在该目录中找到 Flink 散发二进制文件
  2. 指定 YARN 能够在其中找到用户 jar 的近程门路。

对于 1.,咱们利用 YARN 的分布式缓存,并容许应用程序共享这些二进制文件。因而,如果因为先前在同一 TaskManager 上执行的应用程序而导致某个应用程序凑巧在其 TaskManager 的本地存储上找到 Flink 的正本,则它甚至不用在外部下载它。

留神两种优化都可用于 YARN 上的所有部署模式,而不仅仅是 Application 模式。

示例: Application 模式 on Yarn

无关残缺阐明,请参阅正式的 Flink 文档,尤其是波及集群治理框架,例如 YARN 或 Kubernetes。在这里,咱们将提供无关 YARN 的一些示例:

Application 模式下,应用以下语句提交一个利用:

./bin/flink run-application -t yarn-application ./MyApplication.jar

应用此命令,所有配置参数都能够通过其配置选项(以 -D 为前缀)来指定。无关可用配置选项的目录,请参阅 Flink 的配置页面。

例如,用于指定 JobManager 和 TaskManager 的内存大小的命令如下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    ./MyApplication.jar

为了进一步节俭将 Flink 发行版传送到集群的带宽,请思考将 Flink 发行版预上传到 YARN 能够拜访的地位,并应用 yarn.provided.lib.dirs 配置选项,如下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
    ./MyApplication.jar

最初,为了进一步节俭提交应用程序 jar 所需的带宽,您能够将其预上传到 HDFS,并指定指向 ./MyApplication.jar 的近程门路,如下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
    hdfs://myhdfs/jars/MyApplication.jar

这将使 Job 提交特地笨重,因为所需的 Flink jar 和应用程序 jar 将从指定的近程地位获取,而不是由客户端传送到集群。客户端将惟一传送到集群的是你的应用程序配置,其中包含上述所有门路。

PS: 本文属于翻译,原文

正文完
 0