本文將介紹如何在 Spark scala 程式中呼叫 Python 腳本,Spark java程式呼叫的程序也大體相同
1.PythonRunner
對于運行與 JVM 上的程式(即Scala、Java程式),Spark 提供了 PythonRunner 類,只需要呼叫PythonRunner 的main方法,就可以在Scala或Java程式中呼叫Python腳本,在實作上,PythonRunner 基于py4j ,通過構造GatewayServer實體讓python程式通過本地網路socket來與JVM通信,
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val localhost = InetAddress.getLoopbackAddress()
val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
.authToken(secret)
.javaPort(0)
.javaAddress(localhost)
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
// Wait until the gateway server has started, so that we know which port is it bound to.
// `gatewayServer.start()` will start a new thread and run the server code there, after
// initializing the socket, so the thread started above will end as soon as the server is
// ready to serve connections.
thread.join()
在啟動GatewayServer后,再通過ProcessBuilder構造子行程執行Python腳本,等待Python腳本執行完成后,根據exitCode判斷是否執行成功,若執行失敗則拋出例外,最后關閉gatewayServer,
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
try {
val process = builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
2.呼叫方法
2、1 呼叫代碼
PythonRunner的main方法中需要傳入三個引數:
- pythonFile:執行的python腳本
- pyFiles:需要添加到PYTHONPATH的其他python腳本
- otherArgs:傳入python腳本的引數陣列
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
具體樣例代碼如下,scala樣例代碼:
package com.huawei.bigdata.spark.examples
import org.apache.spark.deploy.PythonRunner
import org.apache.spark.sql.SparkSession
object RunPythonExample {
def main(args: Array[String]) {
val pyFilePath = args(0)
val pyFiles = args(1)
val spark = SparkSession
.builder()
.appName("RunPythonExample")
.getOrCreate()
runPython(pyFilePath, pyFiles)
spark.stop()
}
def runPython(pyFilePath: String, pyFiles :String) : Unit = {
val inputPath = "-i /input"
val outputPath = "-o /output"
PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
}
}
python樣例代碼:
#!/usr/bin/env python
# coding: utf-8
import sys
import argparse
argparser = argparse.ArgumentParser(description="ParserMainEntrance")
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
arglist = argparser.parse_args()
def getTargetPath(input_path, output_path):
try:
print("input path: {}".format(input_path))
print("output path: {}".format(output_path))
return True
except Exception as ex:
print("error with: {}".format(ex))
return False
if __name__ == "__main__":
ret = getTargetPath(arglist.input, arglist.output)
if ret:
sys.exit(0)
else:
sys.exit(1)
2、2 運行命令
執行python腳本需要設定pythonExec,即執行python腳本所使用的執行環境,默認情況下,使用的執行器為python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上),
//Spark 2.4.5
val sparkConf = new SparkConf()
val secret = Utils.createSecret(sparkConf)
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python")
//Spark 3.1.1
val sparkConf = new SparkConf()
val secret = Utils.createSecret(sparkConf)
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python3")
如果要手動指定pythonExec,需要在執行前設定環境變數(無法通過spark-defaults傳入),在cluster模式下,可以通過 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 設定,driver端還可以通過export PYSPARK_PYTHON=python3 設定環境變數,
若需要上傳pyhton包,可以通過 --archive python.tar.gz 的方式上傳,
為了使應用能夠獲取到py腳本檔案,還需要在啟動命令中添加 --file pythonFile.py 將python腳本上傳到 yarn 上,
運行命令參考如下:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
如果需要使用其他python環境,而非節點上已安裝的,可以通過 --archives 上傳python壓縮包,再通過環境變數指定pythonExec,例如:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
本文由華為云發布,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/445906.html
標籤:其他
上一篇:Eolink介面管理工具應用
