我所擁有的: 2 個大型 spark 資料幀,但這里有一些示例
- 資料框A:
| ID | IG | 開放日期 |
|---|---|---|
| P111 | 100 | 2022 年 13 月 4 日 |
| P222 | 101 | 16/04/2022 |
| P333 | 102 | 20/04/2022 |
- 資料框 B:
| IG | 服務 | Dt_Service |
|---|---|---|
| 100 | 一個 | 2022 年 12 月 4 日 |
| 100 | 乙 | 2022 年 13 月 4 日 |
| 100 | 乙 | 14/04/2022 |
| 101 | 一個 | 15/04/2022 |
| 101 | 一個 | 16/04/2022 |
| 101 | 乙 | 17/04/2022 |
| 101 | 乙 | 2022 年 4 月 18 日 |
| 102 | 一個 | 19/04/2022 |
| 102 | 乙 | 20/04/2022 |
我想要什么:我想在資料框 A 上使用鍵 'IG' 加入'Service' 和 'Dt_Service' 兩列,但同時具有相應日期的'Service' 的最大值。因此,我需要最新的“服務”及其資料框 A 中每一行的相應日期。這是我期望的結果:
| ID | IG | 開放日期 | 服務 | Dt_Service |
|---|---|---|---|---|
| P111 | 100 | 2022 年 13 月 4 日 | 乙 | 14/04/2022 |
| P222 | 101 | 16/04/2022 | 乙 | 2022 年 4 月 18 日 |
| P333 | 102 | 20/04/2022 | 乙 | 20/04/2022 |
工具:使用 PySpark 的 Spark 2.2,因為我正在研究 hadoop
謝謝您的幫助
uj5u.com熱心網友回復:
正如 samkart 所說,我們可以先進行 rank/row_number 以獲得最后一次服務,然后加入以獲得您想要的結果
from pyspark.sql import functions as F
from pyspark.sql import Window
se="IG string,Service string,Dt_Service string"
de=[("100","A","2022-04-12"),("100","B","2022-04-13"),("100","B","2022-04-14"),("101","A","2022-04-15"),("101","A","2022-04-16"),("101","B","2022-04-17"),("101","B","2022-04-18"),("102","A","2022-04-19"),("102","B","2022-04-20")]
df1=spark.createDataFrame([("P111","100","13/04/2022"),("P222","101","16/04/2022"),("P333","102","20/04/2022")],"ID string,IG string, OpenDate string")
df2=fd.withColumn("rn",F.row_number().over(Window.partitionBy("ig").orderBy(F.to_date(F.col("Dt_service")).desc()))).filter("rn==1").drop("rn")
df1.join(df2,"IG","inner").show()
#output
--- ---- ---------- ------- ----------
| IG| ID| OpenDate|Service|Dt_Service|
--- ---- ---------- ------- ----------
|100|P111|13/04/2022| B|2022-04-14|
|101|P222|16/04/2022| B|2022-04-18|
|102|P333|20/04/2022| B|2022-04-20|
--- ---- ---------- ------- ----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/491790.html
標籤:Python sql 阿帕奇火花 pyspark 大数据
