我有一個 DataFrame,它由 ArrayType 的 Column 組成,并且該陣列在資料的每一行中可能具有不同的長度。我在下面提供了一些示例代碼,可以創建一些具有類似結構的模擬資料。
您會看到,對于一筆交易,我有一個交易 ID 以及一些附加資料,每個資料都存盤在一個“段”中。在這里,我們看到一個存盤客戶資訊的段(總是一個長度為 2 的陣列),并且我們為每件商品購買了一個附加段。購買商品本身的資訊是一個長度不等的陣列;陣列的前兩個元素將始終是購買商品的 ID 和名稱;顏色等可能存在額外的陣列元素,但在這個用例中我們可以忽略它們。
val dfschema = new StructType()
.add("transaction",
new StructType()
.add(
"transaction_id",
StringType
)
.add(
"segments",
ArrayType(
new StructType()
.add("segment_id",StringType)
.add("segment_fields",ArrayType(
StringType,
false
)
), false
)
)
)
val mockdata = Seq(
Row(
Row(
"2e6d57769e49ae8cb0c4105548c4389d",
List(
Row(
"CustomerInformation",
List(
"SomeCustomerName",
"SomeCustomerEmail"
)
),
Row(
"ItemPurchased",
List(
"SomeItemID",
"SomeItemName"
)
),
Row(
"ItemPurchased",
List(
"AnotherItemID",
"AnotherItemName",
"ItemColor"
)
),
Row(
"ItemPurchased",
List(
"YetAnotherItemID",
"YetAnotherItemName",
"ItemColor"
)
)
)
)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(mockdata),
dfschema)
我想要完成的是將上述內容轉換為另一個具有兩列的資料框,一列用于客戶名稱,另一列用于專案名稱。對于上面的示例,它希望:
| 顧客姓名 | 專案名稱 |
|---|---|
| 一些客戶名稱 | 一些物品名稱 |
| 一些客戶名稱 | 另一個專案名稱 |
| 一些客戶名稱 | YetAnotherItemName |
但是,我不想對要檢索的資料欄位進行硬編碼;相反,我想撰寫幾個可以作為選擇命令的一部分運行的函式,如下所示:
df(
select(
get_single_subsegment("CustomerInformation", 0),
get_repeated_subsements("ItemPurchased", 1)
)
)
這樣,如果我選擇檢索客戶電子郵件而不是姓名,我只需將上面的 0 更改為 1 即可。我什至可以將索引號作為變數傳遞。
這可以做到嗎?
uj5u.com熱心網友回復:
從 Spark 3.0 開始,您可以使用 spark 的內置函式來定義您的兩個函式get_single_subsegment和get_repeated_subsegments
對于get_single_subsegment,您可以首先通過 segment_id 過濾您的segments陣列filter,然后使用 獲取此過濾陣列的第一個元素,然后使用andgetItem檢索此段物件中所需索引處的元素:getFieldgetItem
import org.apache.spark.sql.functions.{col, filter}
def get_single_subsegment(segmentId: String, index: Int): Column = {
filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
.getItem(0)
.getField("segment_fields")
.getItem(index)
}
對于get_repeated_subsegments,您首先按 in 進行過濾get_single_subsegment,然后使用transform為過濾陣列的每個元素提取右段欄位索引,然后使用explode此陣列以逐行過濾陣列的元素:
import org.apache.spark.sql.functions.{col, explode, filter, transform}
def get_repeated_subsegments(segmentId: String, index: Int): Column = {
explode(
transform(
filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
.getField("segment_fields"),
c => c.getItem(index)
)
)
}
如果我們在您的示例中應用上面定義的兩個函式,我們會得到以下結果:
df.select(
get_single_subsegment("CustomerInformation", 0).as("customer_name"),
get_repeated_subsegments("ItemPurchased", 1).as("item_name")
).show(false)
// ---------------- ------------------
//|customer_name |item_name |
// ---------------- ------------------
//|SomeCustomerName|SomeItemName |
//|SomeCustomerName|AnotherItemName |
//|SomeCustomerName|YetAnotherItemName|
// ---------------- ------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/474817.html
