我有一個類似于下面的資料框。
from datetime import date
rdd = sc.parallelize([
[123,date(2007,1,31),1],
[123,date(2007,2,28),1],
[123,date(2007,3,31),1],
[123,date(2007,4,30),1],
[123,date(2007,5,31),1],
[123,date(2007,6,30),1],
[123,date(2007,7,31),1],
[123,date(2007,8,31),1],
[123,date(2007,8,31),2],
[123,date(2007,9,30),1],
[123,date(2007,9,30),2],
[123,date(2007,10,31),1],
[123,date(2007,10,31),2],
[123,date(2007,11,30),1],
[123,date(2007,11,30),2],
[123,date(2007,12,31),1],
[123,date(2007,12,31),2],
[123,date(2007,12,31),3],
[123,date(2008,1,31),1],
[123,date(2008,1,31),2],
[123,date(2008,1,31),3]
])
df = rdd.toDF(['id','sale_date','sale'])
df.show()
從上面的資料框中,我想將所有行保留為相對于日期的最新銷售。所以本質上,我將只有每一行的唯一日期。在上面的例子中,輸出看起來像:
rdd_out = sc.parallelize([
[123,date(2007,1,31),1],
[123,date(2007,2,28),1],
[123,date(2007,3,31),1],
[123,date(2007,4,30),1],
[123,date(2007,5,31),1],
[123,date(2007,6,30),1],
[123,date(2007,7,31),1],
[123,date(2007,8,31),2],
[123,date(2007,9,30),2],
[123,date(2007,10,31),2],
[123,date(2007,11,30),2],
[123,date(2007,12,31),2],
[123,date(2008,1,31),3]
])
df_out = rdd_out.toDF(['id','sale_date','sale'])
df_out.show()
您能否指導我如何獲得這個結果?
作為一個僅供參考 - 使用 SAS,我將獲得如下結果:
proc sort data = df;
by id date sale;
run;
data want;
set df;
by id date sale;
if last.date;
run;
uj5u.com熱心網友回復:
可能有很多方法可以實作這一點,但一種方法是使用Window。Window您可以將資料磁區在一個或多個列上(在您的情況下)sale_date,最重要的是,您可以按特定列對每個磁區內的資料進行排序(在您的情況下,降序排列sale,這樣最新的銷售是第一的)。所以:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
my_window = Window.partitionBy("sale_date").orderBy(desc("sale"))
然后你可以做的是Window在你的 DataFrame 上應用它并應用許多視窗函式中的一個。您可以應用的功能之一是row_number,它為每個磁區根據您的orderBy. 像這樣:
from pyspark.sql.functions import row_number
df_out = df.withColumn("row_number",row_number().over(my_window))
這將導致每個日期的最后一次銷售將具有row_number = 1. 如果您隨后進行過濾,row_number=1您將獲得每個組的最后一次銷售。
所以,完整的代碼:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc, col
my_window = Window.partitionBy("sale_date").orderBy(desc("sale"))
df_out = (
df
.withColumn("row_number",row_number().over(my_window))
.filter(col("row_number") == 1)
.drop("row_number")
)
uj5u.com熱心網友回復:
在這里,您想將“部門”替換為sale_date,將“工資”替換為sale。
這是同一件事的無視窗示例...@Cleared 的答案非常好。與使用視窗相比,這個答案在非常大的資料集上可能會表現得更好。根據我的經驗,Windows 比使用 groupBy 的邏輯等效項要慢。(隨意測驗什么更適合您。)Windows 的撰寫非常簡單且易于理解,因此如果資料很小,可能是更好的選擇。
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkExample').getOrCreate()
data = [("James","Sales",3000),("Michael","Sales",4600),
("Robert","Sales",4100),("Maria","Finance",3000),
("Raman","Finance",3000),("Scott","Finance",3300),
("Jen","Finance",3900),("Jeff","Marketing",3000),
("Kumar","Marketing",2000)]
df = spark.createDataFrame(data,["Name","Department","Salary"])
unGroupedDf = df.select( \
df["Department"], \
f.struct(*[\ # Make a struct with all the record elements.
df["Department"].alias("Dept"),\
df["Salary"].alias("Salary"),\
df["Name"].alias("Name")] )\
.alias("record") )
unGroupedDf.groupBy("Department")\ #group
.agg(f.collect_list("record")\ #Gather all the element in a group
.alias("record"))\
.select(\
f.reverse(\ #Make the sort Descending
f.array_sort(\ #Sort the array ascending
f.col("record")\ #the struct
)\
)[0].alias("record"))\ #grab the "Max element in the array
).select( f.col("record.*") ).show() # use struct as Columns
.show()
注意:如果您沒有指定帶有視窗的 partitionBy,它會將所有資料發送到一個節點以進行處理。這將是一個性能問題。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/477861.html
