我有一個spark資料框架,其中一列是一個結構陣列。
df = spark.createDataFrame([(1, [{"col1"/span>: 10, "col2": 0.8, "col3": 0.1}, {"col1": 9, "col2": 1.8, "col3":0.0}, {"col1": 8, "col2": 1.9, "col3": None}]], ['rowNum', 'vals'] )
我試圖創建一個聚合函式,將所有的col2相加并除以所有的col1,你可以忽略col3。我知道如何創建一個聚合函式,如果我只是對其中一列求和的話(使用 pyspark 2.4 中的高階函式),但是否有可能同時對兩個專案求和,或者我必須分兩步來做。
如果我必須把它作為兩個獨立的步驟來做,那么我可以這樣做:
df = df.withColumn("sum", F. aggregate("vals", F.lit(0.0), lambda x, y: x y.col2)
.withColumn("denom", F.aggregation("vals", F.lit(0.0), lambda x, y: x y.col1)
.withColumn("output", F.col('sum')/F.col('denom')
我想知道是否有一個高階函式能以更干凈的方式/在一個步驟中完成這個任務?非常感謝。
uj5u.com熱心網友回復:
你可以創建一個有兩個元素的陣列來聚合sum和denom。另外,AGGREGATE或REDUCE有這樣的簽名。reduce(array<T>, B, function<B, T, B>, function<B, R>) 。R 其中function<B, R>你可以在你的聚合上應用另一個函式,這正是我在最后所做的,將sum除以denom。
import pyspark.sql.function as F
df = spark.createDataFrame([(1, [
{"col1": 10.0, "col2": 0.8, "col3": 0.1}。
{"col1": 9.0, "col2": 1.8, "col3":0.0}。
{"col1": 8.0, "col2": 1.9, "col3": None}]], schema='rowNum int, vals array<struct<`col1`:double, `col2`:double, `col3`:double>> ' )
expr = ('AGGREGATE(vals, ARRAY(CAST(0.0 AS DOUBLE), CAST(0.0 AS DOUBLE)), (acc, el) -> '
'ARRAY(acc[0] el.col2, acc[1] el.col1), acc -> acc[0] / acc[1]) ')
df.withColumn('output', F.expr(expr)).show()
------ -------------------- -------------------
|rowNum| vals| output|
------ -------------------- -------------------
|1|[{10.0, 0.8, 0.1}...|0.1666666666666|
------ -------------------- -------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/310742.html
標籤:
上一篇:火花是否將整個蜂巢表帶入記憶體
