我嘗試加入兩個 pyspark 資料框。一個包含我的測量資料,另一個包含我的測量設備的發布資訊。我想將發布資訊添加到測量資料中,如下所示:
輸入:
測量資料:
logger_id 測量日期 資料 394 2018-07-09T09:25:40 一些資料 394 2018-08-23T09:51:18 其他資料 394 2019-04-23T09:51:18 其他資料 398 2018-01-10T12:15:53 更多資料 398 2019-10-24T08:10:25 其他資料 發布資料
logger_id 發布日期 發布資訊 394 2018-07-01T00:00:00 發布資訊 394 2019-04-01T00:00:00 發布資訊 398 2018-01-01T00:00:00 發布資訊 398 2019-07-01T00:00:00 發布資訊
我想要這樣的輸出:
| logger_id | 測量日期 | 資料 | 發布日期 | 發布資訊 |
|---|---|---|---|---|
| 394 | 2018-07-09T09:25:40 | 一些資料 | 2018-07-01T00:00:00 | 發布資訊 |
| 394 | 2018-08-23T09:51:18 | 其他資料 | 2018-07-01T00:00:00 | 發布資訊 |
| 394 | 2019-04-23T09:51:18 | 其他資料 | 2019-04-01T00:00:00 | 發布資訊 |
| 398 | 2018-01-10T12:15:53 | 更多資料 | 2018-01-01T00:00:00 | 發布資訊 |
| 398 | 2019-10-24T08:10:25 | 其他資料 | 2019-07-01T00:00:00 | 發布資訊 |
我已經試過了
cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')
但是在生成的資料框中,我使用度量資料框的“空”列獲得了發布資料
我還考慮遍歷我的 measuredata 資料框并為每一行添加發布資訊,但因為它真的很大,我不想這樣做
uj5u.com熱心網友回復:
您可以轉換release_df為包含一個列,該列可以找到直到發布有效的列,因為lead可以使用此列。
一旦release_valid_end被列入,那么連接狀態將改變之間找到日期比較檢查measure_date和
release_date和release_valid_end。
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W
measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
(394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
(398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]
release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]
measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))
world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")
window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))
release_validity_df = release_df.withColumn("release_valid_end",
F.lead("release_date", offset=1, default=world_end_date).over(window_spec))
(measure_df.join(release_validity_df,
((measure_df["logger_id"] == release_validity_df["logger_id"]) &
((measure_df["measure_date"] >= release_validity_df["release_date"]) &
(measure_df["measure_date"] < release_validity_df["release_valid_end"]))
))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()
輸出
--------- ------------------- ---------- ------------------- -------------------
|logger_id| measure_date| data| release_date|release_information|
--------- ------------------- ---------- ------------------- -------------------
| 398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
| 398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
| 394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
| 394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
| 394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
--------- ------------------- ---------- ------------------- -------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/382364.html
下一篇:xarray中的查找表?
