我需要根據資料框下方的每個索引對基于行的值進行分組
----- ------ ------ ------ ------ ----- ---- -------
|index|amount| dept | date | amount |dept |date |
----- ----------- ----- -- --------- --------- ----
| 1|1000 | acnt |2-4-21| 2000 | acnt2 |2-4-21 |
| 2|1500 | sales|2-3-21| 1600 | sales2|2-3-21 |
由于每行唯一的索引站和日期相同,我需要將行值分組如下
----- ------ ------------ -------
|index|amount | dept | date |
----- --------- ------------ -------
| 1|1000,2000|acnt,acnt2 |2-4-21 |
| 2|1500,1600|sales,sales2|2-3-21 |
我看到很多選項可以對列進行分組,但專門針對 pyspark 中基于行的值是否有任何解決方案來填充上述結果?
uj5u.com熱心網友回復:
理想情況下,這需要在上游修復(檢查您的上游代碼中是否有連接,并嘗試僅選擇適當的別名以僅保留唯一列)。
話雖如此,您可以在創建輔助字典和列名后創建輔助 spark 函式:
from pyspark.sql import functions as F
from itertools import groupby
使用計數器創建一個新串列:
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0) 1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']
然后使用這個新串列使用現有資料框創建一個資料框,并使用輔助函式根據重復檢查進行連接:
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
--------- ------ ------------ -----
| amount| date| dept|index|
--------- ------ ------------ -----
|1000,2000|2-4-21| acnt,acnt2| 1|
|1500,1600|2-3-21|sales,sales2| 2|
--------- ------ ------------ -----
完整代碼:
from pyspark.sql import functions as F
from itertools import groupby
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0) 1
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
uj5u.com熱心網友回復:
假設您有一個初始資料框,如下所示
INPUT: ------ ------ ------ ------
| dept| dept|amount|amount|
------ ------ ------ ------
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
------ ------ ------ ------
- 重命名列:
newColumns = ["dept1","dept2","amount1","amount2"]
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()
------ ------ ------- -------
| dept1| dept2|amount1|amount2|
------ ------ ------- -------
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
------ ------ ------- -------
- 匯出最終輸出列:
final_df = None
final_df = new_clms_df.\
withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()
------ ------ ------- ------- ------------- ------
| dept1| dept2|amount1|amount2| dept|amount|
------ ------ ------- ------- ------------- ------
|sales1|sales2| 1| 1|sales1,sales2| 1,1|
|sales1|sales2| 2| 2|sales1,sales2| 2,2|
|sales1|sales2| 3| 3|sales1,sales2| 3,3|
|sales1|sales2| 4| 4|sales1,sales2| 4,4|
|sales1|sales2| 5| 5|sales1,sales2| 5,5|
------ ------ ------- ------- ------------- ------
uj5u.com熱心網友回復:
有兩種方法..取決于你想要什么
from pyspark.sql.functions import struct, array, col
df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array
如果有兩個具有相同名稱的列(如您的示例中),只需重新創建您的 df
(如果是連接,則不需要......只需使用別名)
cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/447323.html
標籤:阿帕奇火花 pyspark apache-spark-sql
上一篇:棘手的pyspark值排序
