我一直在嘗試通過 Pyspark、SparkSQL 和 Pandas 更新一系列 JSON blob,但沒有成功。以下是資料的樣子:
# --- --------- ------------------------------------------
#|ID |Timestamp|Properties |
# --- --------- ------------------------------------------
#|a |7 |{"a1": 5, "a2": 8} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a |8 |{"a2": 4} |
#|a |10 |{"a3": "z", "a4": "t"} |
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |
#|b |20 |{"b2": "k", "b3": 9} |
#|b |14 |{"b8": "y", "b3": 2} |
# --- --------- ------------------------------------------
我想要一個查詢,該查詢將按欄位對行進行磁區ID并按欄位對其進行排序Timestamp。在此之后,該Properties欄位將在每個磁區中累積合并以創建一個新列New Props。所以輸出將是這樣的:
# --- --------- ------------------------------------------ ------------------------------------------ ------
#|ID |Timestamp|Properties |New_Props |rownum|
# --- --------- ------------------------------------------ ------------------------------------------ ------
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |{"a1": 3, "a2": 12, "a4": "r"} |1 |
#|a |7 |{"a1": 5, "a2": 8} |{"a1": 5, "a2": 8, "a4": "r"} |2 |
#|a |8 |{"a2": 4} |{"a1": 5, "a2": 4, "a4": "r"} |3 |
#|a |10 |{"a3": "z", "a4": "t"} |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"} |4 |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |
#|b |14 |{"b8": "y", "b3": 2} |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2 |
#|b |20 |{"b2": "k", "b3": 9} |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3 |
# --- --------- ------------------------------------------ ------ ------------------------------------------
公式:從rownum2開始,獲取New Props上一行(rownum1)的列值,并用Properties當前行(rownum2)的列值進行更新。
我嘗試使用該LAG函式,但我不能使用我當前在函式本身內計算的列。
要創建Next Props列,我嘗試了這個 CASE 陳述句,但它不起作用:
CASE
WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
ELSE next_props
END AS new_props
我一直在嘗試不同的事情,但我被困住了。我可能可以使用 for 回圈和 pythondict.update()函式來做到這一點,但我擔心效率。任何幫助表示贊賞。
uj5u.com熱心網友回復:
這是在陣列和映射列上使用高階函式的一種方法:
- 使用前一行獲取前
Properties一行lag并將前一行和當前行都Properties轉換為地圖型別 - 在視窗上使用
collect_list函式,獲取前一行的累積陣列Properties - 將當前行添加
Properties到結果陣列中,并使用.aggregate連接內部映射map_concat。從您的示例看來,更新操作似乎只是添加新鍵,因此在 concat 之前,我們使用map_filter函式過濾已經存在的鍵 - 用于
to_json從聚合映射中獲取 json 字串并洗掉中間列
from pyspark.sql import functions as F, Window
w = Window.partitionBy("ID").orderBy("Timestamp")
df1 = df.withColumn("rownum", F.row_number().over(w)) \
.withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
.withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
.withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
.withColumn(
"New_Props",
F.to_json(F.aggregate(
F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
F.expr("cast(map() as map<string,string>)"),
lambda acc, x: F.map_concat(
acc,
F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
)
))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")
df1.show(truncate=False)
# --- --------- ------------------------------------------ ------ ---------------------------------------
#|ID |Timestamp|Properties |rownum|New_Props |
# --- --------- ------------------------------------------ ------ ---------------------------------------
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |1 |{"a1":"3","a2":"12","a4":"r"} |
#|a |7 |{"a1": 5, "a2": 8} |2 |{"a1":"5","a2":"8","a4":"r"} |
#|a |8 |{"a2": 4} |3 |{"a2":"4","a1":"5","a4":"r"} |
#|a |10 |{"a3": "z", "a4": "t"} |4 |{"a3":"z","a4":"t","a2":"4","a1":"5"} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b |14 |{"b8": "y", "b3": 2} |2 |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b |20 |{"b2": "k", "b3": 9} |3 |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
# --- --------- ------------------------------------------ ------ ---------------------------------------
如果您更喜歡使用 SQL 查詢,這里是等效的 SparkSQL:
WITH props AS (
SELECT *,
row_number() over(partition by ID order by Timestamp) AS rownum,
from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
from_json(Properties, 'map<string,string>') AS current_prop_map
FROM props_tb
), cumulative_props AS (
SELECT *,
collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
FROM props
)
SELECT ID,
Timestamp,
Properties,
aggregate(
concat(array(current_prop_map), reverse(cumulative_prev_props)),
cast(map() as map<string,string>),
(acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
) AS New_Props,
rownum
FROM cumulative_props
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/416373.html
標籤:
