我有 2 個 Pyspark 資料框
資料框 1 -df列所在的位置customer_id, address_id, order_id, date the order was placed, order_share
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
資料框 2 -df_address列所在的位置customer_id, address_id, the date of address creation
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
現在,我希望加入這兩個表,這樣對于每個 order_id,我都會從中獲取地址,df_address并且相應的條目應該0.0在order_share列中
我的輸出應該是這樣的
---- ---- ------------ -------- ---------- -----------
|c_id|a_id|created_date|order_id|order_date|order_share|
---- ---- ------------ -------- ---------- -----------
| c1| a1| 2020-12-31| 1|2021-01-23| 0.5|
| c1| a2| 2020-04-23| 1|2021-01-23| 0.2|
| c1| a3| 2020-03-23| 1|2021-01-23| 0.3|
| c1| a4| 2020-01-16| 1|2021-01-23| 0.0|
| c2| a5| 2020-12-28| 2|2021-03-20| 0.4|
| c2| a6| 2020-05-16| 2|2021-03-20| 0.6|
| c2| a7| 2020-03-04| 2|2021-03-20| 0.0|
| c1| a1| 2020-12-31| 3|2021-02-20| 0.3|
| c1| a2| 2020-04-23| 3|2021-02-20| 0.3|
| c1| a3| 2020-03-23| 3|2021-02-20| 0.4|
| c1| a4| 2020-01-16| 3|2021-02-20| 0.0|
---- ---- ------------ -------- ---------- -----------
這看起來不像正常的左/右連接,我應該為每個 order_id 執行此操作。
我嘗試加入 using['c_id','a_id']但輸出與預期不符。考慮df_address到左和df右,Usingleft join為我提供了空值,order_id并且right join沒有給我所有的地址df_address
看起來我必須為每個 order_id 應用某種 groupby,然后為每個組應用 join,但我不知道如何實作這一點,甚至不確定這是否是正確的方法
任何幫助,將不勝感激。謝謝!
uj5u.com熱心網友回復:
您可以使用一個中間orders資料幀,從創建的df資料幀,并且包含有關訂單,這是列僅作參考customer_id,order_id并order_date。然后,您首先將df_address資料幀與此orders資料幀進行內部連接,將每對資料幀鏈接(customer_id, address_id)到特定于訂單的資訊,然后將結果資料幀與df資料幀進行左連接以獲得order_share每個地址,然后將列中的null值替換order_share為0.0.
這是完整的代碼:
from pyspark.sql import functions as F
# Orders dataframe that contains only orders-specific information
orders = df.select('customer_id', 'order_id', 'order_date').distinct()
df_address.join(orders, ['customer_id']) \ # link addresses with orders
.join(df.drop('order_date'), ['customer_id', 'address_id', 'order_id'], 'left_outer') \ # link orders/addresses with order shares
.withColumn('order_share', F.when(F.col('order_share').isNotNull(), F.col('order_share')).otherwise(F.lit(0.0))) \ # replace null in order_share column with 0.0
.orderBy('customer_id', 'order_id', 'address_id') \ # optional, to reorder dataframe
細節
注:我在這里重新排序的所有dataframes通過order_id,并address_id于可讀性目的
從df您問題中的資料框開始,我們得到以下orders資料框:
----------- -------- ----------
|customer_id|order_id|order_date|
----------- -------- ----------
|c1 |1 |2021-01-23|
|c2 |2 |2021-03-20|
|c1 |3 |2021-02-20|
----------- -------- ----------
然后我們將這個orders資料框與df_address資料框連接起來:
----------- ---------- ------------ -------- ----------
|customer_id|address_id|created_date|order_id|order_date|
----------- ---------- ------------ -------- ----------
|c1 |a1 |2020-12-31 |1 |2021-01-23|
|c1 |a2 |2020-04-23 |1 |2021-01-23|
|c1 |a3 |2020-03-23 |1 |2021-01-23|
|c1 |a4 |2020-01-16 |1 |2021-01-23|
|c2 |a5 |2020-12-28 |2 |2021-03-20|
|c2 |a6 |2020-05-16 |2 |2021-03-20|
|c2 |a7 |2020-03-04 |2 |2021-03-20|
|c1 |a1 |2020-12-31 |3 |2021-02-20|
|c1 |a2 |2020-04-23 |3 |2021-02-20|
|c1 |a3 |2020-03-23 |3 |2021-02-20|
|c1 |a4 |2020-01-16 |3 |2021-02-20|
----------- ---------- ------------ -------- ----------
最后與df沒有 column 的資料框連接order_date,我們得到:
----------- ---------- -------- ------------ ---------- -----------
|customer_id|address_id|order_id|created_date|order_date|order_share|
----------- ---------- -------- ------------ ---------- -----------
|c1 |a1 |1 |2020-12-31 |2021-01-23|0.5 |
|c1 |a2 |1 |2020-04-23 |2021-01-23|0.2 |
|c1 |a3 |1 |2020-03-23 |2021-01-23|0.3 |
|c1 |a4 |1 |2020-01-16 |2021-01-23|null |
|c2 |a5 |2 |2020-12-28 |2021-03-20|0.4 |
|c2 |a6 |2 |2020-05-16 |2021-03-20|0.6 |
|c2 |a7 |2 |2020-03-04 |2021-03-20|null |
|c1 |a1 |3 |2020-12-31 |2021-02-20|0.3 |
|c1 |a2 |3 |2020-04-23 |2021-02-20|0.3 |
|c1 |a3 |3 |2020-03-23 |2021-02-20|0.4 |
|c1 |a4 |3 |2020-01-16 |2021-02-20|null |
----------- ---------- -------- ------------ ---------- -----------
然后我們只需要替換null為0.0,我們就會得到我們預期的資料幀:
----------- ---------- -------- ------------ ---------- -----------
|customer_id|address_id|order_id|created_date|order_date|order_share|
----------- ---------- -------- ------------ ---------- -----------
| c1| a1| 1| 2020-12-31|2021-01-23| 0.5|
| c1| a2| 1| 2020-04-23|2021-01-23| 0.2|
| c1| a3| 1| 2020-03-23|2021-01-23| 0.3|
| c1| a4| 1| 2020-01-16|2021-01-23| 0.0|
| c2| a5| 2| 2020-12-28|2021-03-20| 0.4|
| c2| a6| 2| 2020-05-16|2021-03-20| 0.6|
| c2| a7| 2| 2020-03-04|2021-03-20| 0.0|
| c1| a1| 3| 2020-12-31|2021-02-20| 0.3|
| c1| a2| 3| 2020-04-23|2021-02-20| 0.3|
| c1| a3| 3| 2020-03-23|2021-02-20| 0.4|
| c1| a4| 3| 2020-01-16|2021-02-20| 0.0|
----------- ---------- -------- ------------ ---------- -----------
uj5u.com熱心網友回復:
我嘗試full outer與 the 連接DataFrames以獲取缺失c_id和a_id組合并進一步利用when with isNull用于來自下面的Null列值df并替換它們的值df_address是結果時 -
資料準備
input_str1 = """
c1| a1| 1|2021-01-23| 0.5|
c1| a2| 1|2021-01-23| 0.2|
c1| a3| 1|2021-01-23| 0.3|
c2| a5| 2|2021-03-20| 0.4|
c2| a6| 2|2021-03-20| 0.6|
c1| a1| 3|2021-02-20| 0.3|
c1| a2| 3|2021-02-20| 0.3|
c1| a3| 3|2021-02-20| 0.4
""".split("|")
input_values1 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str1))
cols1 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|order_id|order_date|order_share".split("|")))
n = len(input_values1)
n_col1 = 5
input_list1 = [tuple(input_values1[i:i n_col1]) for i in range(0,n,n_col1)]
sparkDF1 = sql.createDataFrame(input_list1, cols1)
sparkDF1.show()
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
input_str2 = """
c1| a1| 2020-12-31|
c1| a2| 2020-04-23|
c1| a3| 2020-03-23|
c1| a4| 2020-01-16|
c2| a5| 2020-12-28|
c2| a6| 2020-05-16|
c2| a7| 2020-03-04
""".split("|")
input_values2 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str2))
cols2 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|created_date".split("|")))
n = len(input_values2)
n_col2 = 3
input_list2 = [tuple(input_values2[i:i n_col2]) for i in range(0,n,n_col2)]
sparkDF2 = sql.createDataFrame(input_list2, cols2)
sparkDF2.show()
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
完全加入
重命名來自 SparkDF2 的列值,這將進一步用于填充空值以避免列名不明確
finalDF = sparkDF1.join(sparkDF2
, (sparkDF1['c_id'] == sparkDF2['c_id'])
& (sparkDF1['a_id'] == sparkDF2['a_id'])
,'full'
).select(sparkDF1['*']
,sparkDF2['c_id'].alias('c_id_address')
,sparkDF2['a_id'].alias('a_id_address')
,sparkDF2['created_date']
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
|null|null| null| null| null| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
|null|null| null| null| null| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
什么時候為空
finalDF = finalDF.withColumn('c_id',F.when(F.col('c_id').isNull()
,F.col('c_id_address')).otherwise(F.col('c_id'))
)\
.withColumn('a_id',F.when(F.col('a_id').isNull()
,F.col('a_id_address')).otherwise(F.col('a_id'))
)\
.withColumn('order_share',F.when(F.col('order_share').isNull()
,0.0).otherwise(F.col('order_share'))
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
| c2| a7| null| null| 0.0| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
| c1| a4| null| null| 0.0| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
注意 - order_idandorder_date為空,因為c_id和a_id組合中不存在值sparkDF2
此示例提供了一種方法,以獲取所需的解決方案,如果需要填充訂單缺失值,您可以進一步即興發揮。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/315247.html
