我有一個如下的表格:
| transaction_id | 交易日期 | partition_key金額記錄_idrecord_in_date |
|---|---|---|
| 1 | 2021-09-21 | 1 |
我如何將上面的表格轉化為下面的模式:
我如何將上面的表格轉化為下面的模式?
root
|-- transaction_id: string (nullable = false)
|-- transaction_date: string (nullable = false)
|-- transaction_partition: struct (nullable = false)
|-- partition_key: 整數 (nullable = false)
|-- record_amount_sum: 整數 (nullable = false)
|-- 記錄: 結構 (nullable = false)
|-- 記錄_id: string (nullable = false)
|-- record_amount: 整數 (nullable = false)
| |-- record_in_date: string (nullable = false)
像這樣:
| transaction_id | transaction | 1 | { transaction_id: 1, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 1, record_in_date: 2021-09-20 }, { record_id: 2, record_in_date: 2021-09-20 } ] }, { partition_key: 2, record_amount_sum: 1, records: [{ record_id: 3, record_in_date: 2021-09-20 } ] }. ] } | 2 | { transaction_id: 2, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 4, record_in_date: 2021-09-20 }, { record_id: 5, record_in_date: 2021-09-20 }. ] } ] } | 3 | { transaction_id: 3, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 2, record_amount_sum: 1, records: [ { record_id: 5, record_in_date: 2021-09-20 } ] } ] } |
|---|
uj5u.com熱心網友回復:
你可以試試下面的spark sql查詢
SELECT
transaction_id。
transaction_date,
收集_串列(
STRUCT(
partition_key,
record_amount_sum,
記錄
)
) as transaction_partition
FROM (
SELECT 交易_磁區
transaction_id。
transaction_date,
partition_key,
SUM(金額) as record_amount_sum,
collect_list(
STRUCT(
record_id,
金額 as record_amount,
記錄_in_date
)
) as records
FROM[/span
我的臨時視圖(my_temp_view
GROUPBY
transaction_id。
transaction_date。
磁區_key
) t
GROUP BY
transaction_id。
交易_日期
讓我知道這是否對你有用。
uj5u.com熱心網友回復:
你可以先執行內部聚合,然后再一次聚合結果:
from pyspark.sql import functionsas F
df = ...
df1=df.groupBy("transaction_id"/span>, "transaction_date"/span>, "partition_key"/span>)
.agg(F.sum("amount").alias("record_amount_sum") 。
F.collect_list(F.struct("record_id", "amount", "record_in_date")) .alias("records")
.groupBy("transaction_id"/span>, "transaction_date"/span>)
.agg(F.collect_list(
F.struct("partition_key", "record_amount_sum", "records").alias("transition_partition")
df1.orderBy("transaction_id").toJSON().collect()
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/328155.html
標籤:
上一篇:在Keras中加載測驗影像
下一篇:如何在增量資料上處理行數視窗函式
