我有一個pyspark如下所示的資料框
df = spark.createDataFrame([
(124,10,8),
(124,20,7),
(125,30,6),
(125,40,5),
(126,50,4),
(126,60,3),
(126,70,2),
(127,80,1)],("ACC_KEY", "AMT", "value"))
df.show()
------- --- -----
|ACC_KEY|AMT|value|
------- --- -----
| 126| 70| 2|
| 126| 60| 3|
| 126| 50| 4|
| 124| 20| 7|
| 124| 10| 8|
| 127| 80| 1|
| 125| 40| 5|
| 125| 30| 6|
------- --- -----
Expected result
------- --- ----- ------- ----- -------
|ACC_KEY|AMT|value|row_now|amt_c|lkp_rev|
------- --- ----- ------- ----- -------
| 126| 70| 2| 1| 70| 72|
| 126| 60| 3| 2| 72| 75|
| 126| 50| 4| 3| 75| 79|
| 124| 20| 7| 1| 20| 27|
| 124| 10| 8| 2| 27| 35|
| 127| 80| 1| 1| 80| 81|
| 125| 40| 5| 1| 40| 45|
| 125| 30| 6| 2| 45| 51|
------- --- ----- ------- ----- -------
Conditions
1) When row_number = 1 then amt_c column = column AMT
2) when row_number != 1 then It should be the lag of column lkp_rev column value
3) lkp_rev column = amt_c column value column
我試過如下
import pyspark.sql.functions as f
from pyspark.sql import Window
# create row_number column
df1 = df.withColumn("row_now", f.row_number().over(Window.partitionBy("ACC_KEY").orderBy(f.col('AMT').desc())))
# amt_c column creation
df2 = df1.withColumn("amt_c", f.when(f.col("row_now") == 1, f.col("AMT")).otherwise(f.col("value") f.col("AMT")))
我怎樣才能達到我想要的
uj5u.com熱心網友回復:
我認為,如果您將具有 的所有行分開row_now = 1,并將其作為“參考”資料框或每個acc_key.
首先,添加行號以便我們以后可以重用
df = df.withColumn('row_now', F.row_number().over(W.partitionBy('acc_key').orderBy(F.col('amt').desc())))
# ------- --- ----- -------
# |acc_key|amt|value|row_now|
# ------- --- ----- -------
# | 126| 70| 2| 1|
# | 126| 60| 3| 2|
# | 126| 50| 4| 3|
# | 124| 20| 7| 1|
# | 124| 10| 8| 2|
# | 127| 80| 1| 1|
# | 125| 40| 5| 1|
# | 125| 30| 6| 2|
# ------- --- ----- -------
我們現在需要制作一個僅包含初始數量的“參考”資料框(即row_now = 1)
ref = (df
.where(F.col('row_now') == 1)
.drop('row_now', 'value')
.withColumnRenamed('amt', 'init_amt')
)
# ------- --------
# |acc_key|init_amt|
# ------- --------
# | 126| 70|
# | 124| 20|
# | 127| 80|
# | 125| 40|
# ------- --------
最后,加入原件,這樣我們就有了應用lag功能的起點
(df
.join(ref, ['acc_key'])
.withColumn('temp', F
.when(F.col('row_now') == 1, F.col('init_amt'))
.otherwise(F.lag('value').over(W.partitionBy('acc_key').orderBy('row_now')))
)
.withColumn('amt_c', F.sum('temp').over(W.partitionBy('acc_key').orderBy('row_now')))
.withColumn('lkp_rev', F.col('amt_c') F.col('value'))
.drop('init_amt', 'temp')
.show()
)
# ------- --- ----- ------- ----- -------
# |acc_key|amt|value|row_now|amt_c|lkp_rev|
# ------- --- ----- ------- ----- -------
# | 126| 70| 2| 1| 70| 72|
# | 126| 60| 3| 2| 72| 75|
# | 126| 50| 4| 3| 75| 79|
# | 124| 20| 7| 1| 20| 27|
# | 124| 10| 8| 2| 27| 35|
# | 127| 80| 1| 1| 80| 81|
# | 125| 40| 5| 1| 40| 45|
# | 125| 30| 6| 2| 45| 51|
# ------- --- ----- ------- ----- -------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/333874.html
