我有2個資料框,
我有2個資料框。
df1 - CUSTOMER_ID, USER_ADDRESS_ID,ORDERED_TIME(下訂單的時間)。
df2 - CUSTOMER_ID, USER_ADDRESS_ID,latest_order (每個用戶最后一次下訂單的時間 from特定地址)
我通過以下命令從df1獲得df2
df2 = df1.groupBy('CUSTOMER_ID', 'USER_ADDRESS_ID').agg(f.max("ORDERED_TIME").alias('latest_order'))
從這2個資料框中,我想得到一個結果資料框,它包含了從30天到最后一次下單的特定(CUSTOMER_ID,ADDRESS_ID)的訂單數量。
示例資料
df1
customer_id|user_address_id| ordered_time|
----------- --------------- -------------------
| 7894496| 167514241|2021-01-27 13: 37: 49|
| 28596279| 178674171|2021-01-27 13: 42:02|
| 12682115| 192834231|2021-01-27 22: 20: 23|
| 6981716| 13228441|2021-01-27 22: 22:32|。
df2
----------- --------------- -------------------
|CUSTOMER_ID|USER_ADDRESS_ID| latest_order|
----------- --------------- -------------------
|5145237|83276530|2021-07-28 16: 52:40|。
11634405| 21756839|202109-08 20: 43: 35|
| 43919672| 120835117|2020-10-03 21: 44: 21|
71206555| 170807531|2020- 10-30 14: 00:43|。
我試著使用一個UDF,我想把第二個資料框架作為一個引數來傳遞,但發現不可能這樣做。
所以,我試著用一個連接來代替它(不確定語法),方法如下
df1. join(df2, ["CUSTOMER_ID","USER_ADDRESS_ID"], how="inner").select(df1.filter((df1. USER_ADDRESS_ID == df2.USER_ADDRESS_ID) & (df1.ORDERED_TIME >= date_sub(df2.latest_order, 30) & (df1. ORDERED_TIME <= date_sub(df2.latest_order, 1)).count()).alias("order_counts"/span>).show()
我最終得到了以下錯誤
org.apache.spark.sql.AnalysisException: 解決了USER_ADDRESS_ID#433,latest_order#434在CUSTOMER_ID#416,USER_ADDRESS_ID#417,ORDERED_TIME#418中丟失的屬性。 過濾器(((USER_ADDRESS_ID#417 = USER_ADDRESS_ID#433) &amp;(ORDERED_TIME#418 >= cast(date_sub(cast(new_order#434 as date), 30) as timestamp)) & amp;&(ORDERED_TIME#418 <= cast(date_sub(cast(latest_order#434 as date), 1) as timestamp))。同名的屬性出現在操作中。user_address_id。請檢查是否使用了正確的屬性。
我正在學習PySpark,想知道解決這個問題的正確方法。如果你需要任何其他的資訊,請告訴我
任何幫助都會被告知。
如果有任何幫助,我們將不勝感激。謝謝!
uj5u.com熱心網友回復:
最新的訂單時間可以通過Window函式計算,每條記錄 "ORDERED_TIME "可以與最新的進行比較:
val df1 = Seq(
(7894496, 167514241, "2021-01-27 13:37:49") 。
(28596279, 178674171, "2021-01-27 13:42:02") 。
(12682115, 192834231, "2021-01-27 22:20:23") 。
(6981716, 13228441, "2021-01-27 22:22:32")
).toDF("CUSTOMER_ID"/span>, "USER_ADDRESS_ID"/span>, "ORDERED_TIME"/span>)
val customerAddressWindow = Window.partitionBy("CUSTOMER_ID"/span>, "USER_ADDRESS_ID"/span>)
val df2 = df1
.withColumn("ORDERED_TIMESTAMP", to_timestamp($"ORDERED_TIME", "yyy-MM-dd HH:mm:ss")
.withColumn("latest_order", max("ORDERED_TIMESTAMP").over(customerAddressWindow))
.groupBy("CUSTOMER_ID", "USER_ADDRESS_ID", "latest_order")
.agg(sum(
when(dateiff($"latest_order", $"ORDERED_TIMESTAMP") < 30, 1) 。 否則(0)
).別名("Orders"))
輸出是:
----------- --------------- ------------------- ------
|CUSTOMER_ID|USER_ADDRESS_ID|latest_order|Orders|
----------- --------------- ------------------- ------
|6981716 |13228441 |2021-01-27 22: 22:32|1|
|12682115 |192834231 |2021-01-27 22: 20:23|1|
|7894496 |167514241 |2021-01-27 13: 37:49|1|
|28596279 |178674171 |2021-01-27 13: 42:02|1|
----------- --------------- ------------------- ------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/310740.html
標籤:
