我有一個帶有 python 的 Spark 程式。程式的結構是這樣的:
cst_utils.py
bn_utils.py
ep_utils.py
main.py
每個cst_utils.py,bn_utils.py,ep_utils.py都有一個名為Spark_Func(sc)的函式。在 main 中,我創建了一個 Spark 背景關系sc,并將其發送到每個Spark_Func,如下所示:
import cst_utils as cu
import bn_utils as bu
import ep_utils as eu
spark_conf = SparkConf().setAppName('app_name') \
.setMaster("spark://x.x.x.x:7077") \
.set('spark.executor.memory', "8g") \
.set('spark.executor.cores', 4) \
.set('spark.task.cpus', 2)
sc = SparkContext(conf=spark_conf)
cu.spark_func(sc)
bu.spark_func(sc)
eu.spark_func(sc)
我用兩個 Slave 和一個 Master 配置 Spark 集群,它們都有 Ubuntu 20.04 OS。我在spark-env.sh中設定了主 IP,并使 SSH 無密碼,主節點無需身份驗證即可訪問每個從節點。我在每個節點中運行這些命令:
主節點:
./start-master.sh
奴隸:
./start-worker.sh spark://x.x.x.x:7077
集群已創建,因為我可以在瀏覽器中使用此命令查看 SPARK UI:
http://x.x.x.x:8080
但是當我想用這個命令運行程式時:
/opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 main.py
我收到此錯誤:
22/02/16 16:39:20 INFO SparkContext: Starting job: count at /home/hs/Desktop/etl/cst_utils.py:442
22/02/16 16:39:20 INFO DAGScheduler: Registering RDD 2 (reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434) as input to shuffle 0
22/02/16 16:39:20 INFO DAGScheduler: Got job 0 (count at /home/hs/Desktop/etl/cst_utils.py:442) with 1 output partitions
22/02/16 16:39:20 INFO DAGScheduler: Final stage: ResultStage 1 (count at /home/hs/Desktop/etl/cst_utils.py:442)
22/02/16 16:39:20 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
22/02/16 16:39:20 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
22/02/16 16:39:20 INFO DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434), which has no missing parents
22/02/16 16:39:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KiB, free 366.3 MiB)
22/02/16 16:39:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.9 KiB, free 366.3 MiB)
22/02/16 16:39:20 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on x.x.x.x:43875 (size: 5.9 KiB, free: 366.3 MiB)
22/02/16 16:39:20 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
22/02/16 16:39:20 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434) (first 15 tasks are for partitions Vector(0))
22/02/16 16:39:20 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
22/02/16 16:39:21 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (z.z.z.z:39668) with ID 1, ResourceProfileId 0
22/02/16 16:39:21 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (y.y.y.y:46330) with ID 0, ResourceProfileId 0
22/02/16 16:39:21 INFO BlockManagerMasterEndpoint: Registering block manager y.y.y.y:34159 with 4.1 GiB RAM, BlockManagerId(0, y.y.y.y, 34159, None)
22/02/16 16:39:21 INFO BlockManagerMasterEndpoint: Registering block manager z.z.z.z:42231 with 4.1 GiB RAM, BlockManagerId(1, z.z.z.z, 42231, None)
22/02/16 16:39:21 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (y.y.y.y, executor 0, partition 0, PROCESS_LOCAL, 4481 bytes) taskResourceAssignments Map()
22/02/16 16:39:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on y.y.y.y:34159 (size: 5.9 KiB, free: 4.1 GiB)
22/02/16 16:39:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (y.y.y.y executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'cst_utils'
程式路徑是所有節點的相同路徑,以及 SPARK 路徑。
事實上,當我在本地模式下運行程式時,它運行沒有任何問題。但是,要在本地運行,我在 SPARK CONTEXT 中使用此配置:
spark_conf = SparkConf().setAppName('app_name') \
.setMaster("local[4]") \
.set('spark.executor.memory', "8g") \
.set('spark.executor.cores', 4) \
.set('spark.task.cpus', 1)
sc = SparkContext(conf=spark_conf)
更新 1:
我還使用虛擬環境并在其中安裝所有包以在節點之間分發它們。詳細說明:
要在 python 中創建虛擬環境,請運行以下命令:
sudo apt install python3.8-venv創建虛擬環境:
python3 -m venv my_venv進入環境:
source my_vent/bin/activate我使用venv-pack打包您在專案中安裝的所有包。
pip install venv-pack打包:
venv-pack -o my_venv.tar.gz
此外,正如 Spark 網站所說,我將專案的所有.py檔案放在一個檔案夾中,并將其壓縮到.zip檔案夾中。
最后在創建集群之后,我運行這個命令:
/opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 --archives my_venv.tar.gz#environment --py-files my_files.zip main.py
但是,它最終會出現這個錯誤:
Traceback (most recent call last):
File "/home/spark/Desktop/etl/main.py", line 3, in <module>
import cst_utils as cu
File "/home/spark/Desktop/etl/cst_utils.py", line 5, in <module>
import group_state as gs
File "/home/spark/Desktop/etl/group_state.py", line 1, in <module>
import numpy as np
ModuleNotFoundError: No module named 'numpy'
請您指導我在集群中運行代碼有什么問題?
任何幫助將非常感激。
uj5u.com熱心網友回復:
問題解決了。
首先,我使用以下命令在每個節點中安裝了所有包:
python3 -m pip install PACKAGE
然后,當我運行程式時,我必須將程式中使用的所有 PY 檔案寫在--py-files前面,如下所示:
/opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 --files sparkConfig.json --py-files cst_utils.py,grouping.py,group_state.py,g_utils.py,csts.py,oracle_connection.py,config.py,brn_utils.py,emp_utils.py main.py
然后我對匯入檔案沒有任何錯誤。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/429924.html
