給定以下輸入資料框
npos = 3
inp = spark.createDataFrame([
['1', 23, 0, 2],
['1', 45, 1, 2],
['1', 89, 1, 3],
['1', 95, 2, 2],
['1', 95, 0, 4],
['2', 20, 2, 2],
['2', 40, 1, 4],
], schema=["id","elap","pos","lbl"])
需要構建一個看起來像這樣的資料框
out = spark.createDataFrame([
['1', 23, [2,0,0]],
['1', 45, [2,2,0]],
['1', 89, [2,3,0]],
['1', 95, [4,3,2]],
['2', 20, [0,0,2]],
['2', 40, [0,4,2]],
], schema=["id","elap","vec"])
輸入資料框有數百萬條記錄。
在上面的例子中看到的一些細節(按設計)
npos是要在輸出中構造的向量的大小pos保證在[0,npos)- 在每個時間步 (
elap) 最多有 1label個pos - 如果
lbl在某個時間步未給出,則必須從上次指定的時間推斷出pos - 如果
lbl之前沒有指定,可以假設為0
uj5u.com熱心網友回復:
您可以在陣列上使用一些高階函式來實作:
- 使用函式添加
vec列并從array_repeatposlbl - 用于
collect_list獲取vec由劃分的視窗累積id aggregate通過選擇先前位置的結果陣列,如果它不同于0
from pyspark.sql import Window
import pyspark.sql.functions as F
npos = 3
out = inp.withColumn(
"vec",
F.expr(f"transform(array_repeat(0, {npos}), (x, i) -> IF(i=pos, lbl, x))")
).withColumn(
"vec",
F.collect_list("vec").over(Window.partitionBy("id").orderBy("elap"))
).withColumn(
"vec",
F.expr(f"""aggregate(
vec,
array_repeat(0, {npos}),
(acc, x) -> transform(acc, (y, i) -> int(IF(x[i]!=0, x[i], y)))
)""")
).drop("lbl", "pos")
out.show(truncate=False)
# --- ---- ---------
#|id |elap|vec |
# --- ---- ---------
#|1 |23 |[2, 0, 0]|
#|1 |45 |[2, 2, 0]|
#|1 |89 |[2, 3, 0]|
#|1 |95 |[4, 3, 2]|
#|1 |95 |[4, 3, 2]|
#|2 |20 |[0, 0, 2]|
#|2 |40 |[0, 4, 2]|
# --- ---- ---------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/416367.html
標籤:
上一篇:使用spark和scala將資料框合并到Googlebigquery
下一篇:將字串型別列百分比轉換為小數
