問題
您好,在 pyspark/spark 中有沒有辦法在某些條件下在某個視窗上選擇第一個元素?
例子
讓我們有一個示例輸入資料框
--------- ---------- ---- ---- ----------------
| id| timestamp| f1| f2| computed|
--------- ---------- ---- ---- ----------------
| 1|2020-01-02|null|c1f2| [f2]|
| 1|2020-01-01|c1f1|null| [f1]|
| 2|2020-01-01|c2f1|null| [f1]|
--------- ---------- ---- ---- ----------------
我想為每個計算的 id 最新列(f1,f2...)選擇。
所以“代碼”看起來像這樣
cols = ["f1", "f2"]
w = Window().partitionBy("id").orderBy(f.desc("timestamp")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
output_df = (
input_df.select(
"id",
*[f.first(col, condition=f.array_contains(f.col("computed"), col)).over(w).alias(col) for col in cols]
)
.groupBy("id")
.agg(*[f.first(col).alias(col) for col in cols])
)
輸出應該是
--------- ---- ----
| id| f1| f2|
--------- ---- ----
| 1|c1f1|c1f2|
| 2|c2f1|null|
--------- ---- ----
如果輸入看起來像這樣
--------- ---------- ---- ---- ----------------
| id| timestamp| f1| f2| computed|
--------- ---------- ---- ---- ----------------
| 1|2020-01-02|null|c1f2| [f1, f2]|
| 1|2020-01-01|c1f1|null| [f1]|
| 2|2020-01-01|c2f1|null| [f1]|
--------- ---------- ---- ---- ----------------
那么輸出應該是
--------- ---- ----
| id| f1| f2|
--------- ---- ----
| 1|null|c1f2|
| 2|c2f1|null|
--------- ---- ----
如您所見,僅使用它并不容易,f.first(ignore_nulls=True)因為在這種情況下,我們不想跳過 null,因為它被視為計算值。
當前解決方案
步驟1
保存原始資料型別
cols = ["f1", "f2"]
orig_dtypes = [field.dataType for field in input_df.schema if field.name in cols]
第2步
對于每個列,如果計算列,則使用其值創建新列,并將原始 null 替換為我們的“合成”<NULL>字串
output_df = input_df.select(
"id", "timestamp", "computed",
*[
f.when(f.array_contains(f.col("computed"), col) & f.col(col).isNotNull(), f.col(col))
.when(f.array_contains(f.col("computed"), col) & f.col(col).isNull(), "<NULL>")
.alias(col)
for col in cols
]
)
第 3 步
在視窗上選擇第一個非空值,因為現在我們知道它<NULL>不會被跳過
output_df = (
output_df.select(
"id",
*[f.first(col, ignorenulls=True).over(w).alias(col) for col in cols],
)
.groupBy("id")
.agg(*[f.first(col).alias(col) for col in cols])
)
步驟4
將我們的“合成”替換<NULL>為原始空值。
output_df = output_df.replace("<NULL>", None)
第 5 步
將列轉換回其原始型別,因為它們可能會在步驟 2 中重新輸入為字串
output_df = output_df.select("id", *[f.col(col).cast(type_) for col, type_ in zip(cols, orig_dtypes)])
該解決方案有效,但似乎不是正確的方法。此外,它非常重,而且計算時間太長。
還有其他更“閃亮”的方式嗎?
uj5u.com熱心網友回復:
這是使用結構排序技巧的一種方法。
Groupby并為串列中的每一列id收集結構串列,然后在結果陣列上使用函式,您可以獲得所需的持續值:struct<col_exists_in_computed, timestamp, col_value>colsarray_max
from pyspark.sql import functions as F
output_df = input_df.groupBy("id").agg(
*[F.array_max(
F.collect_list(
F.struct(F.array_contains("computed", c), F.col("timestamp"), F.col(c))
)
)[c].alias(c) for c in cols]
)
# applied to you second dataframe example, it gives
output_df.show()
# --- ---- ----
#| id| f1| f2|
# --- ---- ----
#| 1|null|c1f2|
#| 2|c2f1|null|
# --- ---- ----
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/420934.html
標籤:
上一篇:TypeError:“float”物件在邏輯操作中不可迭代
下一篇:過濾器組包含特定值
