假設我們有一個pyspark.sql.dataframe.DataFrame物件:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
我有一個對 DataFrame 的每一行運行sql查詢的函式:
def getInfo(data):
param_name = data['name']
param_gender = data['gender']
param_age = data['age']
sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
info = info.append(spark.sql(sql_query))
return info
我正在嘗試通過以下方式運行每一行的函式map:
df_info = df.rdd.map(lambda x: getInfo(x))
我有錯誤
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
uj5u.com熱心網友回復:
錯誤訊息實際上是在告訴您到底出了什么問題。您的函式正在嘗試sparck.sql(sql_query)從轉換()內部訪問 SparkContext( df.rdd.map(lambda x: getInfo(x)))。
這就是我認為你正在嘗試做的事情:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")
people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")
這里有幾種其他的方式來進行 join。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/481487.html
標籤:阿帕奇火花 pyspark apache-spark-sql
上一篇:如何在pysaprk資料幀中保持資料在一定范圍內的唯一性?
下一篇:一列中的火花詞不應出現在另一列中
