我對 pyspark 很陌生,我有一個資料框,目前如下所示。
| col1 | col2 |
--------------------------------- -------------------
| [(a, 0)], [(b,0)].....[(z,1)] | [0, 0, ... 1] |
| [(b, 0)], [(b,1)].....[(z,0)] | [0, 1, ... 0] |
| [(a, 0)], [(c, 1)].....[(z,0)] | [0, 1, ... 0] |
我從 col2 中提取值col1.QueryNum,當我列印模式時,它是一個包含來自的數字串列的陣列col1.QueryNum。
最終我的目標是將串列值轉換col2為 pyspark 中的結構格式(請參閱所需的架構)。
當前架構
|-- col1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- types: string (nullable = true)
| | |-- QueryNum: integer (nullable = true)
|-- col2: array (nullable = true)
| |-- element: integer (containsNull = true)
所需的架構
|-- col2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- val1: integer (nullable = true)
| | |-- val2: integer (nullable = true)
.
.
.
| | |-- val80: integer (nullable = true)
我嘗試使用from_json它并沒有真正起作用。
uj5u.com熱心網友回復:
如果你有固定的陣列大小,你可以使用 list-comprehension 創建結構:
from pyspark.sql import functions as F
df1 = df.withColumn(
"col2",
F.array(
F.struct(*[
F.col("col1")[i]["QueryNum"].alias(f"val{i 1}") for i in range(2)
])
)
)
df1.show()
# ---------------- --------
#|col1 |col2 |
# ---------------- --------
#|[[0, a], [0, b]]|[[0, 0]]|
#|[[0, b], [1, b]]|[[0, 1]]|
#|[[0, a], [1, c]]|[[0, 1]]|
# ---------------- --------
df1.printSchema()
#root
#|-- col1: array (nullable = true)
#| |-- element: struct (containsNull = true)
#| | |-- QueryNum: long (nullable = true)
#| | |-- types: string (nullable = true)
#|-- col2: array (nullable = false)
#| |-- element: struct (containsNull = false)
#| | |-- val1: long (nullable = true)
#| | |-- val2: long (nullable = true)
但是請注意,在這種情況下不需要使用陣列,因為您將始終在該陣列中擁有一個結構。只需使用簡單的結構:
df1 = df.withColumn(
"col2",
F.struct(*[
F.col("col1")[i]["QueryNum"].alias(f"val{i 1}") for i in range(2)
])
)
或者,如果您更喜歡地圖型別:
df1 = df.withColumn(
"col2",
F.map_from_entries(
F.expr("transform(col1, (x,i) -> struct('val' || (i 1) as name, x.QueryNum as value))")
)
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/419843.html
標籤:
下一篇:我想累計計算先前重復值的數量
