有一個data類似的 DataFrame
|timestamp |value|
|2021-01-01 12:00:00| 10.0|
|2021-01-01 12:00:01| 10.0|
|2021-01-01 12:00:02| 10.0|
|2021-01-01 12:00:03| 10.0|
|2021-01-01 12:00:04| 10.0|
|2021-01-01 12:00:05| 10.0|
|2021-01-01 12:00:06| 10.0|
|2021-01-01 12:00:07| 10.0|
和資料幀events像
|timestamp |event|
|2021-01-01 12:00:01| true|
|2021-01-01 12:00:05| true|
基于這一點,我想在初始 DataFrame 中再添加一列,該列是index自以下內容開始以來的資料event:
|timestamp |value|index|
|2021-01-01 12:00:00| 10.0| 1|
|2021-01-01 12:00:01| 10.0| 2|
|2021-01-01 12:00:02| 10.0| 3|
|2021-01-01 12:00:03| 10.0| 4|
|2021-01-01 12:00:04| 10.0| 5|
|2021-01-01 12:00:05| 10.0| 1|
|2021-01-01 12:00:06| 10.0| 2|
|2021-01-01 12:00:07| 10.0| 3|
我試過
.withColumn("index",monotonically_increasing_id())
但是在將它與其他一些 DataFrame 連接時,無法將其設定回 0。所以,歡迎任何想法。
uj5u.com熱心網友回復:
您可以將datadf 與eventdf on連接起來,timestamp然后在event列上使用條件累積總和來定義組。最后按group列磁區設定行號。
像這樣的東西:
import org.apache.spark.sql.expressions.Window
val result = data.join(
events,
Seq("timestamp"),
"left"
).withColumn(
"group",
sum(when(col("event"), 1).otherwise(0)).over(Window.orderBy("timestamp"))
).withColumn(
"index",
row_number().over(Window.partitionBy("group").orderBy("timestamp"))
).drop("group", "event")
result.show
// ------------------- ----- -----
//| timestamp|value|index|
// ------------------- ----- -----
//|2021-01-01 12:00:00| 10.0| 1|
//|2021-01-01 12:00:01| 10.0| 1|
//|2021-01-01 12:00:02| 10.0| 2|
//|2021-01-01 12:00:03| 10.0| 3|
//|2021-01-01 12:00:04| 10.0| 4|
//|2021-01-01 12:00:05| 10.0| 1|
//|2021-01-01 12:00:06| 10.0| 2|
//|2021-01-01 12:00:07| 10.0| 3|
// ------------------- ----- -----
uj5u.com熱心網友回復:
您可以使用 Window 函式來實作它:
from pyspark.sql import SparkSessionRow, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
加入原始 DF 后的示例資料(timestamp為了簡單起見,我將列更改為整數型別):
df = spark.createDataFrame([
Row(timestamp=0, value='foo', event=True),
Row(timestamp=1, value='foo', event=None),
Row(timestamp=2, value='foo', event=None),
Row(timestamp=3, value='foo', event=None),
Row(timestamp=4, value='foo', event=None),
Row(timestamp=5, value='foo', event=True),
Row(timestamp=6, value='foo', event=None),
Row(timestamp=7, value='foo', event=None),
])
然后我group_id通過向前填充“組”的第一個時間戳來創建一個列。這group_id可以被用來創建使用索引F.row_number():
(
df
.withColumn('group_id', F.when(F.col('event'), F.col('timestamp')))
.withColumn('group_id', F.last('group_id', ignorenulls=True).over(Window.orderBy('timestamp')))
.withColumn('index', F.row_number().over(Window.partitionBy('group_id').orderBy('timestamp')))
.show()
)
# Output:
--------- ----- ----- -------- -----
|timestamp|value|event|group_id|index|
--------- ----- ----- -------- -----
| 0| foo| true| 0| 1|
| 1| foo| null| 0| 2|
| 2| foo| null| 0| 3|
| 3| foo| null| 0| 4|
| 4| foo| null| 0| 5|
| 5| foo| true| 5| 1|
| 6| foo| null| 5| 2|
| 7| foo| null| 5| 3|
--------- ----- ----- -------- -----
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/384756.html
標籤:斯卡拉 阿帕奇火花 apache-spark-sql 阿帕奇齐柏林飞艇
下一篇:Chisel中的條件模塊實體化
