我有 2 個資料幀 df1(old) 和 df2(new)。我正在嘗試將 df2 與 df1 進行比較,并找到新添加的行、洗掉的行、更新的行以及更新的列的名稱。
這是我寫的代碼
from pyspark.sql.functions import col, array, when, array_remove, lit
data1 = [("James","rob","Smith","36636","M",3000),
("Michael","Rose","jim","40288","M",4000),
("Robert","dunkin","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","60563","F",-1)
]
data2 = [("James","rob","Smith","36636","M",3000),
("Robert","dunkin","Williams","42114","M",2000),
("Maria","Anne","Jones","72712","F",3000),
("Yesh","Reddy","Brown","75234","M",3000),
("Jen","Mary","Brown","60563","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]
select_expr =[
col("firstname"),col("middlename"),col("lastname"),
*[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']],
array_remove(array(*conditions_), "").alias("updated_columns")
]
df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()
這是我得到的輸出
--------- ---------- -------- ----- ------ ------ ---------------
|firstname|middlename|lastname| id|gender|salary|updated_columns|
--------- ---------- -------- ----- ------ ------ ---------------
| James| rob| Smith|36636| M| 3000| []|
| Robert| dunkin|Williams|42114| M| 2000| [salary]|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]|
| Jen| Mary| Brown|60563| F| -1| []|
--------- ---------- -------- ----- ------ ------ ---------------
這是我期待的輸出
--------- ---------- -------- ----- ------ ------ --------------- -----------------
|firstname|middlename|lastname| id|gender|salary|updated_columns| status|
--------- ---------- -------- ----- ------ ------ --------------- -----------------
| James| rob| Smith|36636| M| 3000| []| unchanged|
| Robert| dunkin|Williams|42114| M| 2000| [salary]| updated|
| Michael| Rose| jim|40288| M| 4000| []| deleted|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]| updated|
| Yesh| Reddy| Brown|75234| M| 3000| []| added|
| Jen| Mary| Brown|60563| F| -1| []| unchanged|
--------- ---------- -------- ----- ------ ------ --------------- -----------------
我知道我可以分別使用左反連接找到添加和洗掉的行。但是,我正在尋找方法來更新我現有的連接以獲得上述輸出。
uj5u.com熱心網友回復:
Anouter join會幫助你的情況。我已經修改了您為此提供的代碼,還包括狀態列。
最小作業示例
from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *
data1 = [("James","rob","Smith","36636","M",3000),
("Michael","Rose","jim","40288","M",4000),
("Robert","dunkin","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","60563","F",-1)
]
data2 = [("James","rob","Smith","36636","M",3000),
("Robert","dunkin","Williams","42114","M",2000),
("Maria","Anne","Jones","72712","F",3000),
("Yesh","Reddy","Brown","75234","M",3000),
("Jen","Mary","Brown","60563","F",-1)
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True)
])
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]
statusColumn 的邏輯并修改select_expr為coalesce來自df2并df1優先df2獲取更新到日期資料的值。
status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")
select_expr =[
col("firstname"),col("middlename"),col("lastname"),
*[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],
array_remove(array(*conditions_), "").alias("updated_columns"),
status.alias("status"),
]
最后,應用一個outer join.
df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()
輸出
--------- ---------- -------- ----- ------ ------ --------------- ---------
|firstname|middlename|lastname| id|gender|salary|updated_columns| status|
--------- ---------- -------- ----- ------ ------ --------------- ---------
| James| rob| Smith|36636| M| 3000| []|unchanged|
| Jen| Mary| Brown|60563| F| -1| []|unchanged|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]| updated|
| Michael| Rose| jim|40288| M| 4000| []| deleted|
| Robert| dunkin|Williams|42114| M| 2000| [salary]| updated|
| Yesh| Reddy| Brown|75234| M| 3000| []| added|
--------- ---------- -------- ----- ------ ------ --------------- ---------
uj5u.com熱心網友回復:
我建議使用 Ranger,以便您捕獲實際更改的內容和時間。但是如果你只有這些資料框......你想要做一個“外部”連接。(將兩個表中的所有資料都拉到一個連接中。)您已經有了更新列邏輯。
對于狀態:“已洗掉”(在 df1 中但不是 df2)和“添加”(在 df2 中,但不在 df1 中),(如果有更新列)-->“已更新”,否則為“未更改”。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/353006.html
標籤:Python 阿帕奇火花 火花 apache-spark-sql
