我正在使用Pyspark并且我有一個具有以下架構的資料框
root
|-- BOOK_ID: integer (nullable = false)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
我們如何添加一個名為的新列short_chapters,為每本書計算NAME.length < 10 章節的 NUMBER_PAGES 總和?
注意:我們有一個章節串列,有沒有辦法在不展平資料框的情況下進行迭代?
uj5u.com熱心網友回復:
您可以short_chapters使用higher order functions. 使用 查找名稱長度小于 10 的所有章節filter。然后NUMBER_PAGES為使用aggregate.
from pyspark.sql import functions as F
from pyspark.sql import Row
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
"""
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
"""
# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)
# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc x["NUMBER_PAGES"])
df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)
"""
------- ------------------------------------------------------------------- --------------
|BOOK_ID|Chapters |short_chapters|
------- ------------------------------------------------------------------- --------------
|1 |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
------- ------------------------------------------------------------------- --------------
"""
uj5u.com熱心網友回復:
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
解決方案
在 StructType 列上添加一列并填寫所需的結果。對于結果鏈用戶定義的函式。首先通過長度小于 10 的章節名稱的布爾選擇來過濾章節編號。聚合上面過濾的頁碼總和。下面的代碼
import pyspark.sql.functions as f
df = df.withColumn(
"Chapters",
f.struct(
f.col("Chapters"),
f.lit(expr("aggregate(filter(Chapters.NUMBER_PAGES, (x, i) -> boolean(transform(Chapters.NAME,x->length(x)<10)[i])),0,(acc,x)->acc x)")).alias("short_chapters")
)
)
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: struct (nullable = false)
| |-- Chapters: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- NAME: string (nullable = true)
| | | |-- NUMBER_PAGES: integer (nullable = true)
| |-- short_chapters: integer (nullable = true)
df.show(truncate=False)
------- ------------------------------------------------------------------------
|BOOK_ID|Chapters |
------- ------------------------------------------------------------------------
|1 |{[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}], 6}|
------- ------------------------------------------------------------------------
df.select('Chapters.*').show(truncate=False)
------------------------------------------------------------------- --------------
|Chapters |short_chapters|
------------------------------------------------------------------- --------------
|[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
------------------------------------------------------------------- --------------
uj5u.com熱心網友回復:
您需要定義過濾章節的用戶定義函式(短 udf)。udf 的回傳型別與我們使用模式獲取的Chapters列相同。
from pyspark.sql.functions import udf
def filter_short_chapters(chapters, thresh):
return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))
udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/447535.html
