本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体雷同

1.PythonRunner

对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只须要调用PythonRunner 的main办法,就能够在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过结构GatewayServer实例让python程序通过本地网络socket来与JVM通信。

    // Launch a Py4J gateway server for the process to connect to; this will let it see our    // Java system properties and such    val localhost = InetAddress.getLoopbackAddress()    val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()      .authToken(secret)      .javaPort(0)      .javaAddress(localhost)      .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)      .build()    val thread = new Thread(new Runnable() {      override def run(): Unit = Utils.logUncaughtExceptions {        gatewayServer.start()      }    })    thread.setName("py4j-gateway-init")    thread.setDaemon(true)    thread.start()            // Wait until the gateway server has started, so that we know which port is it bound to.    // `gatewayServer.start()` will start a new thread and run the server code there, after    // initializing the socket, so the thread started above will end as soon as the server is    // ready to serve connections.    thread.join()

在启动GatewayServer后,再通过ProcessBuilder结构子过程执行Python脚本,期待Python脚本执行实现后,依据exitCode判断是否执行胜利,若执行失败则抛出异样,最初敞开gatewayServer。

    // Launch Python process    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)    try {      val process = builder.start()       new RedirectThread(process.getInputStream, System.out, "redirect output").start()       val exitCode = process.waitFor()      if (exitCode != 0) {        throw new SparkUserAppException(exitCode)      }    } finally {      gatewayServer.shutdown()    }

2.调用办法

2、1 调用代码

PythonRunner的main办法中须要传入三个参数:

  • pythonFile:执行的python脚本
  • pyFiles:须要增加到PYTHONPATH的其余python脚本
  • otherArgs:传入python脚本的参数数组
    val pythonFile = args(0)    val pyFiles = args(1)    val otherArgs = args.slice(2, args.length)

具体样例代码如下,scala样例代码:

package com.huawei.bigdata.spark.examples import org.apache.spark.deploy.PythonRunnerimport org.apache.spark.sql.SparkSession object RunPythonExample {  def main(args: Array[String]) {    val pyFilePath = args(0)    val pyFiles = args(1)    val spark = SparkSession      .builder()      .appName("RunPythonExample")      .getOrCreate()     runPython(pyFilePath, pyFiles)     spark.stop()  }   def runPython(pyFilePath: String, pyFiles :String) : Unit = {    val inputPath = "-i /input"    val outputPath = "-o /output"    PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))  }}

python样例代码:

#!/usr/bin/env python# coding: utf-8import sysimport argparse argparser = argparse.ArgumentParser(description="ParserMainEntrance")argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)arglist = argparser.parse_args() def getTargetPath(input_path, output_path):    try:        print("input path: {}".format(input_path))        print("output path: {}".format(output_path))        return True    except Exception as ex:        print("error with: {}".format(ex))        return False if __name__ == "__main__":    ret = getTargetPath(arglist.input, arglist.output)    if ret:        sys.exit(0)    else:        sys.exit(1)

2、2 运行命令

执行python脚本须要设置pythonExec,即执行python脚本所应用的执行环境。默认状况下,应用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。

    //Spark 2.4.5    val sparkConf = new SparkConf()    val secret = Utils.createSecret(sparkConf)    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)      .orElse(sparkConf.get(PYSPARK_PYTHON))      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))      .orElse(sys.env.get("PYSPARK_PYTHON"))      .getOrElse("python")     //Spark 3.1.1    val sparkConf = new SparkConf()    val secret = Utils.createSecret(sparkConf)    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)      .orElse(sparkConf.get(PYSPARK_PYTHON))      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))      .orElse(sys.env.get("PYSPARK_PYTHON"))      .getOrElse("python3")

如果要手动指定pythonExec,须要在执行前设置环境变量(无奈通过spark-defaults传入)。在cluster模式下,能够通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还能够通过export PYSPARK_PYTHON=python3 设置环境变量。

若须要上传pyhton包,能够通过 --archive python.tar.gz 的形式上传。

为了使利用可能获取到py脚本文件,还须要在启动命令中增加 --file pythonFile.py 将python脚本上传到 yarn 上。

运行命令参考如下:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py

如果须要应用其余python环境,而非节点上已装置的,能够通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
本文由华为云公布