关于Flink:PyFlink-开发环境利器Zeppelin-Notebook

48次阅读

共计 4547 个字符,预计需要花费 12 分钟才能阅读完成。

PyFlink 作为 Flink 的 Python 语言入口,其 Python 语言确实很简略易学,然而 PyFlink 的开发环境却不容易搭建,稍有不慎,PyFlink 环境就会乱掉,而且很难排查起因。明天给大家介绍一款可能帮你解决这些问题的 PyFlink 开发环境利器:Zeppelin Notebook。次要内容为:

  1. 筹备工作
  2. 搭建 PyFlink 环境
  3. 总结与将来

兴许你早就据说过 Zeppelin,然而之前的文章都并重讲述如何在 Zeppelin 里开发 Flink SQL,明天则来介绍下如何在 Zeppelin 里高效的开发 PyFlink Job,特地是解决 PyFlink 的环境问题。

一句来总结这篇文章的主题,就是在 Zeppelin notebook 里利用 Conda 来创立 Python env 主动部署到 Yarn 集群中,你无需手动在集群下来装置任何 PyFlink 的包,并且你能够在一个 Yarn 集群里同时应用相互隔离的多个版本的 PyFlink。最初你能看到的成果就是这样:

1. 可能在 PyFlink 客户端应用第三方 Python 库,比方 matplotlib:

2. 能够在 PyFlink UDF 里应用第三方 Python 库,如:

接下来看看如何来实现。

一、筹备工作

Step 1.

筹备好最新版本的 Zeppelin 的搭建,这个就不在这边开展了,如果有问题能够退出 Flink on Zeppelin 钉钉群 (34517043) 征询。另外须要留神的是,Zeppelin 部署集群须要是 Linux,如果是 Mac 的话,会导致在 Mac 机器上打的 Conda 环境无奈在 Yarn 集群里应用 (因为 Conda 包在不同零碎间是不兼容的)。

Step 2.

下载 Flink 1.13,须要留神的是,本文的性能只能用在 Flink 1.13 以上版本,而后:

  • flink-Python-*.jar 这个 jar 包 copy 到 Flink 的 lib 文件夹下;
  • opt/Python 这个文件夹 copy 到 Flink 的 lib 文件夹下。

Step 3.

装置以下软件 (这些软件是用于创立 Conda env 的):

  • miniconda:https://docs.conda.io/en/late…
  • conda pack:https://conda.github.io/conda…
  • mamba:https://github.com/mamba-org/…

二、搭建 PyFlink 环境

接下来就能够在 Zeppelin 里搭建并且应用 PyFlink 了。

Step 1. 制作 JobManager 上的 PyFlink Conda 环境

因为 Zeppelin 天生反对 Shell,所以能够在 Zeppelin 里用 Shell 来制作 PyFlink 环境。留神这里的 Python 第三方包是在 PyFlink 客户端 (JobManager) 须要的包,比方 Matplotlib 这些,并且确保至多装置了上面这些包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)
  • jupyter,grpcio,protobuf (这三个包是 Zeppelin 须要的)

剩下的包能够依据须要来指定:

%sh

# make sure you have conda and momba installed.
# install miniconda: https://docs.conda.io/en/latest/miniconda.html
# install mamba: https://github.com/mamba-org/mamba

echo "name: pyflink_env
channels:
  - conda-forge
  - defaults
dependencies:
  - Python=3.7
  - pip
  - pip:
    - apache-flink==1.13.1
  - jupyter
  - grpcio
  - protobuf
  - matplotlib
  - pandasql
  - pandas
  - scipy
  - seaborn
  - plotnine
 " > pyflink_env.yml
    
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml

运行上面的代码打包 PyFlink 的 Conda 环境并且上传到 HDFS (留神这里打包进去的文件格式是 tar.gz):

%sh

rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz

hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz

Step 2. 制作 TaskManager 上的 PyFlink Conda 环境

运行上面的代码来创立 TaskManager 上的 PyFlink Conda 环境,TaskManager 上的 PyFlink 环境至多蕴含以下 2 个包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)

剩下的包是 Python UDF 须要依赖的包,比方这里指定了 pandas:

echo "name: pyflink_tm_env
channels:
  - conda-forge
  - defaults
dependencies:
  - Python=3.7
  - pip
  - pip:
    - apache-flink==1.13.1
  - pandas
 " > pyflink_tm_env.yml
    
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml

运行上面的代码打包 PyFlink 的 conda 环境并且上传到 HDFS (留神这里应用的是 zip 格局)

%sh

rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip

hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip

Step 3. 在 PyFlink 中应用 Conda 环境

接下来就能够在 Zeppelin 中应用下面创立的 Conda 环境了,首先须要在 Zeppelin 里配置 Flink,次要配置的选项有:

  • flink.execution.mode 为 yarn-application, 本文所讲的办法只实用于 yarn-application 模式;
  • 指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 来配置 JobManager 侧的 PyFlink Conda 环境;
  • 指定 Python.archives 以及 Python.executable 来指定 TaskManager 侧的 PyFlink Conda 环境;
  • 指定其余可选的 Flink 配置,比方这里的 flink.jm.memory 和 flink.tm.memory。
%flink.conf


flink.execution.mode yarn-application

yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz

Python.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable  pyflink_tm_env.zip/bin/Python3.7

flink.jm.memory 2048
flink.tm.memory 2048

接下来就能够如一开始所说的那样在 Zeppelin 里应用 PyFlink 以及指定的 Conda 环境了。有 2 种场景:

  • 上面的例子里,能够在 PyFlink 客户端 (JobManager 侧) 应用下面创立的 JobManager 侧的 Conda 环境,比方下边应用了 Matplotlib。

  • 上面的例子是在 PyFlink UDF 里应用下面创立的 TaskManager 侧 Conda 环境里的库,比方上面在 UDF 里应用 Pandas。

三、总结与将来

本文内容就是在 Zeppelin notebook 里利用 Conda 来创立 Python env 主动部署到 Yarn 集群中,无需手动在集群下来装置任何 Pyflink 的包,并且能够在一个 Yarn 集群里同时应用多个版本的 PyFlink。

每个 PyFlink 的环境都是隔离的,而且能够随时定制更改 Conda 环境。能够下载上面这个 note 并导入到 Zeppelin,就能够复现明天讲的内容:http://23.254.161.240/#/noteb…

此外还有很多能够改良的中央:

  • 目前咱们须要创立 2 个 conda env,起因是 Zeppelin 反对 tar.gz 格局,而 Flink 只反对 zip 格局。等前期两边对立之后,只有创立一个 conda env 就能够;
  • apache-flink 当初蕴含了 Flink 的 jar 包,这就导致打进去的 conda env 特地大,yarn container 在初始化的时候耗时会比拟长,这个须要 Flink 社区提供一个轻量级的 Python 包 (不蕴含 Flink jar 包),就能够大大减小 conda env 的大小。

心愿理解更多 Flink on Zeppelin 应用的同学能够退出上面的钉钉群来探讨。

<img src=”https://img.alicdn.com/imgextra/i3/O1CN01v6un4b1t1gWCzTWMA_!!6000000005842-0-tps-828-1068.jpg” alt=”img” style=”zoom:33%;” />


第三届 Apache Flink 极客挑战赛报名开始!30 万奖金等你来!

随同着海量数据的冲击,数据处理剖析能力在业务中的价值一劳永逸,各行各业对于数据处理时效性的摸索也在不断深入,作为主打实时计算的计算引擎 – Apache Flink 应运而生。

为给行业带来更多实时计算赋能实际的思路,激励宽广酷爱技术的开发者加深对 Flink 的把握,Apache Flink 社区联手阿里云、英特尔、阿里巴巴人工智能治理与可继续倒退实验室 (AAIG)、Occlum 联结举办 “ 第三届 Apache Flink 极客挑战赛暨 AAIG CUP” 流动,即日起正式启动。

👉 点击理解更多赛事信息 👈

正文完
 0