我有下表,如果 activity_date 是連續的,則連續增加。如果沒有,streak 將設定為 1。
現在我需要獲得每組條紋的最小值和最大值。使用 Spark 和 Scala 或 Spark SQL。
Input
floor activity_date streak
--------------------------------
floor1 2018-11-08 1
floor1 2019-01-24 1
floor1 2019-04-05 1
floor1 2019-04-08 1
floor1 2019-04-09 2
floor1 2019-04-14 1
floor1 2019-04-17 1
floor1 2019-04-20 1
floor2 2019-05-04 1
floor2 2019-05-05 2
floor2 2019-06-04 1
floor2 2019-07-28 1
floor2 2019-08-14 1
floor2 2019-08-22 1
Output
floor activity_date end_activity_date
----------------------------------------
floor1 2018-11-08 2018-11-08
floor1 2019-01-24 2019-01-24
floor1 2019-04-05 2019-04-05
floor1 2019-04-08 2019-04-09
floor1 2019-04-14 2019-04-14
floor1 2019-04-17 2019-04-17
floor1 2019-04-20 2019-04-20
floor2 2019-05-04 2019-05-05
floor2 2019-06-04 2019-06-04
floor2 2019-07-28 2019-07-28
floor2 2019-08-14 2019-08-14
floor2 2019-08-22 2019-08-22
uj5u.com熱心網友回復:
您可以使用以下方法
使用 Spark SQL
SELECT
floor,
activity_date,
MAX(activity_date) OVER (PARTITION BY gn,floor) as end_activity_date
FROM (
SELECT
*,
SUM(is_same_streak) OVER (
PARTITION BY floor ORDER BY activity_date
) as gn
FROM (
SELECT
*,
CASE
WHEN streak > LAG(streak,1,streak-1) OVER (
PARTITION BY floor
ORDER BY activity_date
) THEN 0
ELSE 1
END as is_same_streak
FROM
df
) t1
) t2
ORDER BY
"floor",
activity_date
查看作業演示資料庫小提琴
使用 Scala api
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val floorWindow = Window.partitionBy("floor").orderBy("activity_date")
val output = df.withColumn(
"is_same_streak",
when(
col("streak") > lag(col("streak"),1,col("streak")-1).over(floorWindow) , 0
).otherwise(1)
)
.withColumn(
"gn",
sum(col("is_same_streak")).over(floorWindow)
)
.select(
"floor",
"activity_date",
max(col("activity_date")).over(
Window.partitionBy("gn","floor")
).alias("end_activity_date")
)
使用 pyspark api
from pyspark.sql import functions as F
from pyspark.sql import Window
floorWindow = Window.partitionBy("floor").orderBy("activity_date")
output = (
df.withColumn(
"is_same_streak",
F.when(
F.col("streak") > F.lag(F.col("streak"),1,F.col("streak")-1).over(floorWindow) , 0
).otherwise(1)
)
.withColumn(
"gn",
F.sum(F.col("is_same_streak")).over(floorWindow)
)
.select(
"floor",
"activity_date",
F.max(F.col("activity_date")).over(
Window.partitionBy("gn","floor")
).alias("end_activity_date")
)
)
讓我知道這是否適合您。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/312778.html
標籤:斯卡拉 阿帕奇火花 火花 apache-spark-sql
下一篇:[Scala][Spark]:轉換資料框中的一列,保留其他列,使用withColumn和map[錯誤:缺少引數型別]
