我正在嘗試獲取列的鍵和值,其中一些行為 json,其他行為字串/無。我想將每個 json 鍵及其值分別堆疊到新的不同列中。Json 可以是嵌套型別和 I don't know the schema of json rows beforehand (like how many keys are/ how nested it is)。
Data set has 100s millions of rows.
樣本資料:
----------- ---------- -------------------------------------------------------
|col1 |col2 | col3 | col4 |
----- ----- ---------- --------------------------------------------------------
|1 |A | hello1 | time1 |
|2 |B | hello2 | None
|3 |C | hello3 | {'world1': 'how are you?','world2':{'help me!':'please'}}
|4 |D | hello4 | {'world3':'ola!'} |
----- ----- ---------- -------------------------------------------------------
預期資料框:
-------- ------------------- -------------------------------
|col1 |col2 | col3 | new_col_keys | new_col_values |
-------- ------------------- ---------- --------------------
|1 |A | hello1 | time1 Null |
|2 |B | hello2 | Null Null |
|3 |C | hello3 | world1 how are you? |
|3 |C | hello3 | world2 {'help me!':'please'}|
|4 |D | hello4 | world3 ola! |
----- ----- ---------- -------------------------------------
在這里,我以不同的方式為鍵和值添加新列并洗掉原始列。
注意:示例資料中的所有列都是 StringType
uj5u.com熱心網友回復:
您可以嘗試以下方法:
- 使用型別和函式匯入
pyspark.sql - 使用模式匹配(例如
LIKE)過濾掉非 json 型別的記錄,而不是(~)就像 json 字典/地圖型別值具有模式{key:value}并存盤在資料幀中一樣df_simple - 使用模式匹配過濾出 json 型別字典記錄
LIKE,并將結果存盤在資料幀中df_map。 MapType使用字串鍵和字串值將記錄轉換為from_json- 使用
select檢索所需的列,并explode獲得來自轉化的每個鍵/值對MapType列col4,成為一個新的行 - 使用重命名分解的列名稱(
key->new_col_keys和value->new_col_values)withColumnRenamed - 使用聯合來組合
df_simple并df_map進入最終資料集output_df(雖然select在聯合中使用是可選的,但它確保即使將來資料框發生變化也使用正確的列)
注意。與代碼中的步驟相關的部分包含在注釋中
# Step 1
from pyspark.sql import functions as F
from pyspark.sql import types as T
df_simple = df.where(~ F.col("col4").like("{%}")) #Step 2
df_simple.show() # only for debugging purposes
---- ---- ------ -----
|col1|col2| col3| col4|
---- ---- ------ -----
| 1| A|hello1|time1|
| 2| B|hello2| null|
---- ---- ------ -----
df_maps = (
# Step 3
df.where(F.col("col4").like("{%}"))
# Step 4
.withColumn("col4",F.from_json(
F.col("col4"),
T.MapType(T.StringType(),T.StringType())
))
# Step 5
.select(
F.col("col1"),
F.col("col2"),
F.col("col3"),
F.explode("col4")
)
# Step 6
.withColumnRenamed("key","new_col_keys")
.withColumnRenamed("value","new_col_values")
)
df_maps.show(truncate=False) # only for debugging purposes
---- ---- ------ ------------ ---------------------
|col1|col2|col3 |new_col_keys|new_col_values |
---- ---- ------ ------------ ---------------------
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
---- ---- ------ ------------ ---------------------
# Step 7
output_df = (
df_simple.selectExpr("col1","col2","col3","col4 as new_col_keys","NULL as new_col_values")
.union(
df_maps.select("col1","col2","col3","new_col_keys","new_col_values")
)
)
output_df.printSchema() # only for debugging
output_df.show(truncate=False) # only for debugging
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- new_col_keys: string (nullable = true)
|-- new_col_values: string (nullable = true)
---- ---- ------ ------------ ---------------------
|col1|col2|col3 |new_col_keys|new_col_values |
---- ---- ------ ------------ ---------------------
|1 |A |hello1|time1 |null |
|2 |B |hello2|null |null |
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
---- ---- ------ ------------ ---------------------
注意。 如果您想對嵌套列重復此操作。您可以重復步驟 3 - 7 中的操作。但是,這次不是使用col4,而是使用new_col_keys因為col4將不再存在于轉換后的資料集中。
讓我知道這是否適合您。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/322623.html
