我想在我的 pyspark 資料框中創建一個新列,如果該行是 groupby 使用某種排序的最后一行,則該列等于 1。我的解決方案有效,但看起來很hacky:
我的策略是創建亂數并(使用視窗函式)識別每個排序組的最后一個亂數,最后在以下情況下進行比較:
df = (df.withColumn('rand', rand())
.withColumn("marker",
F.when(F.last(col('rand'))
.over(Window.partitionBy(['bvdidnumber','dt_year'])
.orderBy(["dt_rfrnc"]).rowsBetween(0,sys.maxsize))
== col('rand'),1).otherwise(0))
.drop('rand'))
前三列是輸入資料,最后一列是使用代碼創建的,goals是要實作的目標。
----------- ------- -------- ---- ------
|bvdidnumber|dt_year|dt_rfrnc|goal|marker|
----------- ------- -------- ---- ------
| 1| 2020| 202006| 0| 0|
| 1| 2020| 202012| 1| 1|
| 1| 2020| 202012| 0| 0|
| 1| 2021| 202103| 0| 0|
| 1| 2021| 202106| 0| 0|
| 1| 2021| 202112| 1| 1|
| 2| 2020| 202006| 0| 0|
| 2| 2020| 202012| 0| 0|
| 2| 2020| 202012| 1| 1|
| 2| 2021| 202103| 0| 0|
| 2| 2021| 202106| 0| 0|
| 2| 2021| 202112| 1| 1|
----------- ------- -------- ---- ------
uj5u.com熱心網友回復:
這是一種使用row_number().
data_ls = [
(1,2020,202006,0),
(1,2020,202012,1),
(1,2020,202012,0),
(1,2021,202103,0),
(1,2021,202106,0),
(1,2021,202112,1),
(2,2020,202006,0),
(2,2020,202012,0),
(2,2020,202012,1),
(2,2021,202103,0),
(2,2021,202106,0),
(2,2021,202112,1)
]
sdf = spark.sparkContext.parallelize(data_ls).toDF(['bvdidnumber', 'dt_year', 'dt_rfrnc', 'goal'])
# ----------- ------- -------- ----
# |bvdidnumber|dt_year|dt_rfrnc|goal|
# ----------- ------- -------- ----
# | 1| 2020| 202006| 0|
# | 1| 2020| 202012| 1|
# | 1| 2020| 202012| 0|
# | 1| 2021| 202103| 0|
# | 1| 2021| 202106| 0|
# | 1| 2021| 202112| 1|
# | 2| 2020| 202006| 0|
# | 2| 2020| 202012| 0|
# | 2| 2020| 202012| 1|
# | 2| 2021| 202103| 0|
# | 2| 2021| 202106| 0|
# | 2| 2021| 202112| 1|
# ----------- ------- -------- ----
sdf. \
withColumn('marker',
func.when(func.row_number().over(wd.partitionBy(['bvdidnumber', 'dt_year']).orderBy(func.desc('dt_rfrnc'))) == 1, 1).
otherwise(0)
). \
show()
# ----------- ------- -------- ---- ------
# |bvdidnumber|dt_year|dt_rfrnc|goal|marker|
# ----------- ------- -------- ---- ------
# | 1| 2021| 202112| 1| 1|
# | 1| 2021| 202106| 0| 0|
# | 1| 2021| 202103| 0| 0|
# | 2| 2021| 202112| 1| 1|
# | 2| 2021| 202106| 0| 0|
# | 2| 2021| 202103| 0| 0|
# | 1| 2020| 202012| 1| 1|
# | 1| 2020| 202012| 0| 0|
# | 1| 2020| 202006| 0| 0|
# | 2| 2020| 202012| 0| 1|
# | 2| 2020| 202012| 1| 0|
# | 2| 2020| 202006| 0| 0|
# ----------- ------- -------- ---- ------
正如您在評論中提到的,如果有重復的隨機行被標記,但只有其中一個被標記。因此,goal和marker- 行是重復的。
可能有重復(現在添加),在這種情況下可以標記隨機行(但只有一個)
uj5u.com熱心網友回復:
看不到你做錯了什么。也許只是讓它更整潔;
w=Window.partitionBy('bvdidnumber','dt_year').orderBy('bvdidnumber','dt_year')
df.withColumn('rand', when(sum((last('dt_rfrnc').over(w)==col('dt_rfrnc')).cast('int')).over(w.rowsBetween(-1,0))==1,1).otherwise(0)).show()
----------- ------- -------- ----
|bvdidnumber|dt_year|dt_rfrnc|rand|
----------- ------- -------- ----
| 1| 2020| 202006| 0|
| 1| 2020| 20201| 1|
| 1| 2020| 20201| 0|
| 1| 2021| 202103| 0|
| 1| 2021| 202106| 0|
| 1| 2021| 202112| 1|
| 2| 2020| 202006| 0|
| 2| 2020| 20201| 1|
| 2| 2020| 20201| 0|
| 2| 2021| 202103| 0|
| 2| 2021| 202106| 0|
| 2| 2021| 202112| 1|
----------- ------- -------- ----
uj5u.com熱心網友回復:
from pyspark.sql.window import Window
from pyspark.sql.functions import max, struct, lit
# create data
schema = ["bvdidnumber","dt_year","dt_rfrnc","goal","marker"]
data = [(1,2020,202006,0,0),
(1,2020,202012,1,1),
(1,2020,202012,0,0),
(1,2021,202103,0,0),
(1,2021,202106,0,0),
(1,2021,202112,1,1),
(2,2020,202006,0,0),
(2,2020,202012,0,0),
(2,2020,202012,1,1),
(2,2021,202103,0,0),
(2,2021,202106,0,0),
(2,2021,202112,1,1),
]
df = spark.createDataFrame( data, schema )
df\
.select(
#get max this will sort data based on first column of struct
struct( # create struct to carry all data forward - (think column with columns)
col("dt_rfrnc"), # must be first to ensure your sort on this column
col("dt_year"),
col("bvdidnumber")
).alias("columns")
)\
.groupby(
col("columns.bvdidnumber"),
col("columns.dt_year"))
.agg(
arrays_zip( #merge arrays in indexed manner
expr("concat(array_repeat(1, 1), array_repeat( 0 , size(collect_list( columns ))-1 ) ) "), #Build array of 1's and zerps'
reverse( #sort descending
sort_array( #sort
collect_list( "columns" )))).alias("zip") )\ #collect group elements
.select(
col("bvdidnumber") ,
col("dt_year"),
explode("zip") )\#use array values as rows
.select(
col("bvdidnumber") ,
col("dt_year"),
col("col.`0`")\# funny column reference required for working with arrays_zip output
.alias("marked")),\
col("col.`1`.dt_rfrnc").alias("dt_rfrnc")
.show()
----------- ------- ------ --------
|bvdidnumber|dt_year|marked|dt_rfrnc|
----------- ------- ------ --------
| 1| 2021| 1| 202112|
| 1| 2021| 0| 202106|
| 1| 2021| 0| 202103|
| 2| 2021| 1| 202112|
| 2| 2021| 0| 202106|
| 2| 2021| 0| 202103|
| 1| 2020| 1| 202012|
| 1| 2020| 0| 202012|
| 1| 2020| 0| 202006|
| 2| 2020| 1| 202012|
| 2| 2020| 0| 202012|
| 2| 2020| 0| 202006|
----------- ------- ------ --------
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/491788.html
標籤:阿帕奇火花 pyspark apache-spark-sql
