共计 4473 个字符,预计需要花费 12 分钟才能阅读完成。
本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java 程序调用的过程也大体雷同
1.PythonRunner
对于运行与 JVM 上的程序(即 Scala、Java 程序),Spark 提供了 PythonRunner 类。只须要调用 PythonRunner 的 main 办法,就能够在 Scala 或 Java 程序中调用 Python 脚本。在实现上,PythonRunner 基于 py4j,通过结构 GatewayServer 实例让 python 程序通过本地网络 socket 来与 JVM 通信。
// Launch a Py4J gateway server for the process to connect to; this will let it see our | |
// Java system properties and such | |
val localhost = InetAddress.getLoopbackAddress() | |
val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder() | |
.authToken(secret) | |
.javaPort(0) | |
.javaAddress(localhost) | |
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret) | |
.build() | |
val thread = new Thread(new Runnable() {override def run(): Unit = Utils.logUncaughtExceptions {gatewayServer.start() | |
} | |
}) | |
thread.setName("py4j-gateway-init") | |
thread.setDaemon(true) | |
thread.start() | |
// Wait until the gateway server has started, so that we know which port is it bound to. | |
// `gatewayServer.start()` will start a new thread and run the server code there, after | |
// initializing the socket, so the thread started above will end as soon as the server is | |
// ready to serve connections. | |
thread.join() |
在启动 GatewayServer 后,再通过 ProcessBuilder 结构子过程执行 Python 脚本,期待 Python 脚本执行实现后,依据 exitCode 判断是否执行胜利,若执行失败则抛出异样,最初敞开 gatewayServer。
// Launch Python process | |
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) | |
try {val process = builder.start() | |
new RedirectThread(process.getInputStream, System.out, "redirect output").start() | |
val exitCode = process.waitFor() | |
if (exitCode != 0) {throw new SparkUserAppException(exitCode) | |
} | |
} finally {gatewayServer.shutdown() | |
} |
2. 调用办法
2、1 调用代码
PythonRunner 的 main 办法中须要传入三个参数:
- pythonFile:执行的 python 脚本
- pyFiles:须要增加到 PYTHONPATH 的其余 python 脚本
- otherArgs:传入 python 脚本的参数数组
val pythonFile = args(0) | |
val pyFiles = args(1) | |
val otherArgs = args.slice(2, args.length) |
具体样例代码如下,scala 样例代码:
package com.huawei.bigdata.spark.examples | |
import org.apache.spark.deploy.PythonRunner | |
import org.apache.spark.sql.SparkSession | |
object RunPythonExample {def main(args: Array[String]) {val pyFilePath = args(0) | |
val pyFiles = args(1) | |
val spark = SparkSession | |
.builder() | |
.appName("RunPythonExample") | |
.getOrCreate() | |
runPython(pyFilePath, pyFiles) | |
spark.stop()} | |
def runPython(pyFilePath: String, pyFiles :String) : Unit = { | |
val inputPath = "-i /input" | |
val outputPath = "-o /output" | |
PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath)) | |
} | |
} |
python 样例代码:
#!/usr/bin/env python | |
# coding: utf-8 | |
import sys | |
import argparse | |
argparser = argparse.ArgumentParser(description="ParserMainEntrance") | |
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True) | |
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True) | |
arglist = argparser.parse_args() | |
def getTargetPath(input_path, output_path): | |
try: | |
print("input path: {}".format(input_path)) | |
print("output path: {}".format(output_path)) | |
return True | |
except Exception as ex: | |
print("error with: {}".format(ex)) | |
return False | |
if __name__ == "__main__": | |
ret = getTargetPath(arglist.input, arglist.output) | |
if ret: | |
sys.exit(0) | |
else: | |
sys.exit(1) |
2、2 运行命令
执行 python 脚本须要设置 pythonExec,即执行 python 脚本所应用的执行环境。默认状况下,应用的执行器为 python(Spark 2.4 及以下)或 python3(Spark 3.0 及以上)。
//Spark 2.4.5 | |
val sparkConf = new SparkConf() | |
val secret = Utils.createSecret(sparkConf) | |
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) | |
.orElse(sparkConf.get(PYSPARK_PYTHON)) | |
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) | |
.orElse(sys.env.get("PYSPARK_PYTHON")) | |
.getOrElse("python") | |
//Spark 3.1.1 | |
val sparkConf = new SparkConf() | |
val secret = Utils.createSecret(sparkConf) | |
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) | |
.orElse(sparkConf.get(PYSPARK_PYTHON)) | |
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) | |
.orElse(sys.env.get("PYSPARK_PYTHON")) | |
.getOrElse("python3") |
如果要手动指定 pythonExec,须要在执行前设置环境变量(无奈通过 spark-defaults 传入)。在 cluster 模式下,能够通过 –conf“spark.executorEnv.PYSPARK_PYTHON=python3”–conf“spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3”设置。driver 端还能够通过 export PYSPARK_PYTHON=python3 设置环境变量。
若须要上传 pyhton 包,能够通过 –archive python.tar.gz 的形式上传。
为了使利用可能获取到 py 脚本文件,还须要在启动命令中增加 –file pythonFile.py 将 python 脚本上传到 yarn 上。
运行命令参考如下:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
如果须要应用其余 python 环境,而非节点上已装置的,能够通过 –archives 上传 python 压缩包,再通过环境变量指定 pythonExec,例如:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
本文由华为云公布