我有一個包含兩列的資料框。第一個是唯一 ID 列,第二個是冒號分隔的學生分數串列(這是從沒有標題的 CSV 加載后)。
是否有任何機制可以將第二列轉換為結構串列以供進一步處理?還是動態數量的附加列?我只需要一種方法來對每個 id 的分數進行額外處理,即計算 id 的平均值,0000000003這在當前輸出資料格式中無法完成。
IE
---------- -----------------------------
|id |scores |
---------- -----------------------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|
|0783563078|chris,1.0 |
|0783801254|michelle,1.0:vixon,2.3 |
---------- -----------------------------
進入
---------- --------------------------------------------------------------------------
|id |scores |
---------- --------------------------------------------------------------------------
|0000000003|[{student -> brian, score -> 1.0 } , {student -> steve, score -> 2.3 .... |
---------- --------------------------------------------------------------------------
或者可能是這樣的:
---------- -------- ------ -------- ------ ------
|id |student1|score1|student2|score3|etc...|
---------- -------- ------ -------- ------ ------
|0000000003| | | | | |
---------- -------- ------ -------- ------ ------
我只是不確定如何將這種資料格式轉換為可處理的格式。
uj5u.com熱心網友回復:
方法 4 可能是獲得平均值的最短方法,但其他方法允許您將資料提取為映射/結構。
方法一
一種易于訪問的方法可能是使用str_to_map它將您的字串值轉換為地圖。然后你可以map_values用來提取分數,例如
(
df.withColumn(
"score_map",
expr("str_to_map(scores,':',',')")
).withColumn(
"score_values",
map_values(F.expr("str_to_map(scores,':',',')"))
)
).show(false)
---------- ----------------------------- ------------------------------------------ ---------------
|id |scores |score_map |score_values |
---------- ----------------------------- ------------------------------------------ ---------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|{brian -> 1.0, steve -> 2.3, allie -> 8.0}|[1.0, 2.3, 8.0]|
|0783563078|chris,1.0 |{chris -> 1.0} |[1.0] |
|0783801254|michelle,1.0:vixon,2.3 |{michelle -> 1.0, vixon -> 2.3} |[1.0, 2.3] |
---------- ----------------------------- ------------------------------------------ ---------------
由于您只對平均分數感興趣,因此您還可以使用explode將回傳的陣列拆分map_values為多行,然后再使用mean. 在下面的示例中,我將原始列包含score在 group by 中,但是您可以將其洗掉并在您的應用程式中獲得相同的結果。
(
df.withColumn(
"score_values",
explode(map_values(F.expr("str_to_map(scores,':',',')")))
)
.groupBy("id","scores") // you may remove "scores" from here to only have the id
.agg(
mean("score_values").alias("score_avg")
)
).show(false)
---------- ----------------------------- -----------------
|id |scores |score_avg |
---------- ----------------------------- -----------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|3.766666666666667|
|0783801254|michelle,1.0:vixon,2.3 |1.65 |
|0783563078|chris,1.0 |1.0 |
---------- ----------------------------- -----------------
方法二
如果您更喜歡使用 a struct,您可以使用split,transform并named_struct在 spark-sql 中將您的資料轉換為所需的struct例如
val df2=(
df.withColumn(
"score_struct",
expr("transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) )")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: string (nullable = true)
|-- score_struct: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- student: string (nullable = true)
| | |-- score: string (nullable = true)
---------- ----------------------------- ------------------------------------------
|id |scores |score_struct |
---------- ----------------------------- ------------------------------------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|[{brian, 1.0}, {steve, 2.3}, {allie, 8.0}]|
|0783563078|chris,1.0 |[{chris, 1.0}] |
|0783801254|michelle,1.0:vixon,2.3 |[{michelle, 1.0}, {vixon, 2.3}] |
---------- ----------------------------- ------------------------------------------
我們可以再次explode將每行中的值串列拆分為多行,然后再mean用于確定平均值,例如。
df2=(
df.withColumn(
"score_struct",
expr("explode(transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) ))")
)
.groupBy("id")
.agg(
mean("score_struct.score").alias("score_avg")
)
)
df2.printSchema()
df2.show(truncate=False)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
---------- -----------------
|id |score_avg |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------
方法三
在計算平均值之前,您可以簡單地使用方法 2 僅提取您想要的值,即分數,例如:
val df2=(
df.withColumn(
"score",
expr("explode(transform(split(scores,':'), x-> split(x,',')[1] ))")
)
.groupBy("id")
.agg(
mean("score").alias("score_avg")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
---------- -----------------
|id |score_avg |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------
方法四
此方法使用split和aggregate提取每行的總和,然后再除以條目數以找到平均值
df2=(
df.withColumn(
"scores",
split("scores",':')
)
.withColumn(
"scores",
expr("aggregate(scores,cast(0 as double), (acc,x) -> acc split(x,',')[1])") / size("scores")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: double (nullable = true)
---------- -----------------
|id |scores |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374915.html
