我在陣列中有一個嵌套結構的模式。我想按字母順序對嵌套結構的列進行排序。
這個問題給出了一個復雜的函式,但它不適用于嵌套在陣列中的結構。任何幫助表示贊賞。
我正在使用 PySpark 3.2.1。
我的架構:
root
|-- id: integer (nullable = true)
|-- values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Dep: string (nullable = true)
| | |-- ABC: string (nullable = true)
它應該看起來如何:
root
|-- id: integer (nullable = true)
|-- values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ABC: string (nullable = true)
| | |-- Dep: string (nullable = true)
可重現的例子:
data = [
(10, [{"Dep": 10, "ABC": 1}, {"Dep": 10, "ABC": 1}]),
(20, [{"Dep": 20, "ABC": 1}, {"Dep": 20, "ABC": 1}]),
(30, [{"Dep": 30, "ABC": 1}, {"Dep": 30, "ABC": 1}]),
(40, [{"Dep": 40, "ABC": 1}, {"Dep": 40, "ABC": 1}])
]
myschema = StructType(
[
StructField("id", IntegerType(), True),
StructField("values",
ArrayType(
StructType([
StructField("Dep", StringType(), True),
StructField("ABC", StringType(), True)
])
))
]
)
df = spark.createDataFrame(data=data, schema=myschema)
df.printSchema()
df.show(10, False)
uj5u.com熱心網友回復:
不涵蓋所有情況,但作為您當前 df 的開始,您可以從內部結構中獲取欄位串列,對它們進行排序,然后使用transform函式更新每個結構元素,如下所示:
from pyspark.sql import functions as F
fields = sorted(df.selectExpr("inline(values)").columns)
df1 = df.withColumn(
"values",
F.transform("values", lambda x: F.struct(*[x[f].alias(f) for f in fields]))
)
df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- values: array (nullable = true)
# | |-- element: struct (containsNull = false)
# | | |-- ABC: string (nullable = true)
# | | |-- Dep: string (nullable = true)
uj5u.com熱心網友回復:
我找到了一個非常 hacky 的解決方案,所以如果有人知道更好的解決方案,請成為我的客人添加另一個答案。
- 檢索陣列 [struct] 元素作為它們自己的陣列列
- 以正確的順序將它們作為結構壓縮在一起
代碼:
selexpr = ["id", "values.ABC as ABC", "values.Dep as Dep"]
df = df.selectExpr(selexpr)
df = df.withColumn(
"zipped", arrays_zip("ABC", "Dep") # order of the column-names results in ordering!
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/454999.html
