我所擁有的是兩個DataFrames,每個代表一個概率分布,但每個都每行存盤一個條目。例如一個是df1:
item_id | probability
--------|---------------
item1 | 0.1
item2 | 0.2
item3 | 0.7
另一個,我們稱之為df2:
item_id | probability
--------|---------------
item2 | 0.3
item3 | 0.5
item4 | 0.2
請注意這兩個的專案空間是不同的。但這沒關系,因為它的意思是對于df1的概率為零item4,df2對于 的概率為零item1。我想要的是沒有大量使用自定義 UDF 的代碼,它基本上產生一個DataFrame,給定一些alpha雙值,混合這兩個分布。我可以使用自定義 UDF 撰寫此代碼,但我想知道是否有一些純基于 Spark SQL 的代碼僅使用內置函式執行此操作。
item_id | probability
--------|---------------
item1 | 0.1 * alpha 0.0 * (1 - alpha)
item2 | 0.2 * alpha 0.3 * (1 - alpha)
item3 | 0.7 * alpha 0.5 * (1 - alpha)
item4 | 0.0 * alpha 0.2 * (1 - alpha)
uj5u.com熱心網友回復:
我認為這完全可以通過 SQL 實作。秘訣是一個“外部”連接和一些合并魔法來處理空值。
import org.apache.spark.sql.types._
val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )
val alpha = 1.1
val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )
val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
.join(df1, df1("item_id") === df2("item_id"), "outer" )
.select(
coalesce( df1("item_id"), df2("item_id") ).alias("item_id"),
coalesce( df1("probability"),lit(0.0)).alias("probability1"),
coalesce( df2("probability"),lit(0.0)).alias("probability2"),
lit(alhpa).alias("alpha") )
prob.show()
------- ------------ ------------ -----
|item_id|probability1|probability2|alpha|
------- ------------ ------------ -----
| item3| 0.5| 0.7| 1.1|
| item2| 0.3| 0.2| 1.1|
| item1| 0.0| 0.1| 1.1|
| item4| 0.2| 0.0| 1.1|
------- ------------ ------------ -----
prob.select( prob("probability1") * prob("alpha") prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
--------------------------------------------------------- -------
|((probability1 * alpha) (probability2 * (1.0 - alpha)))|item_id|
--------------------------------------------------------- -------
| 0.48| item3|
| 0.31| item2|
| -0.01000000000000...| item1|
| 0.22000000000000003| item4|
--------------------------------------------------------- -------
uj5u.com熱心網友回復:
可以通過全連接和簡單的操作來解決。這里有一個片段。請記住,為了澄清起見,我將prob兩個資料框中的名稱分別更改為prob1和prob2。然后就是這樣:
alpha = 0.5
df = df1 \
.join(df2, how='full', on='item_id') \
.fillna(0) \
.withColumn('prob', alpha * F.col('prob1') (1-alpha) * F.col('prob2'))
您的資料樣本:
df1 = spark.createDataFrame(data=[
('item1', 0.1),
('item2', 0.2),
('item3', 0.7)
], schema=['item_id', 'prob1'])
df2 = spark.createDataFrame(data=[
('item2', 0.3),
('item3', 0.5),
('item4', 0.2)
], schema=['item_id', 'prob2'])
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/472159.html
