我運行了一個火花作業,它可以正常作業。在我的 pyspark 代碼中,我依次運行 3 個機器學習作業。但是當我嘗試它們同時在一個執行緒中作業時,我得到了一個錯誤。它在這部分給出了錯誤:
def run(.....):
(
......
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("/tmp/ersing/")
spark = SparkSession(sc)
temp_name = "my_test_table_thread_" str(thread_id)
my_table.createOrReplaceTempView(temp_name)
print(temp_name " count(*) --> " str(my_table.count()))
print("""spark.catalog.tableExists(""" temp_name """) = """ str(spark._jsparkSession.catalog().tableExists(temp_name)))
model_sql = """select id from {sample_table_name} where
id= {id} """.format(id=id, sample_table_name=temp_name)
my_df= spark.sql(model_sql).select("id",) #this part gives error --> no such table
my_df= broadcast(my_df)
......
)
我的主要代碼是:
....
from multiprocessing.pool import ThreadPool
import threading
def run_worker(job):
returned_sample_table= run('sampling',...) # i call run method twice. First run get df and I call second run for modeling
run('modeling',...,returned_sample_table)
def mp_handler():
p = ThreadPool(8)
p.map(run_worker, jobs)
p.join()
p.close()
mp_handler()
我同時運行 3 個作業,每次只有一個作業createOrReplaceTempView作業正常,因為我記錄了這個:print("""spark.catalog.tableExists(""" temp_name """) = """ str(spark._jsparkSession.catalog().tableExists(temp_name)))我看到一個作業存在而其他作業不存在。
那么我錯過了什么?
提前致謝。
uj5u.com熱心網友回復:
最后我得到了解決方案。
問題是火花背景關系。當其中一個執行緒作業完成并關閉背景關系時,其他執行緒在 spark 上找不到表。
我所做的是我將 spark 背景關系移動到 main 中,如下所示:
def run_worker(job):
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("/tmp/ersing/")
spark = SparkSession(sc)
returned_sample_table= run(spark ,'sampling',...) # i call run method twice. First run get df and I call second run for modeling
run(spark ,'modeling',...,returned_sample_table)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/491160.html
