使用 Spark 和 Scala 我有 df1 和 df2 如下:
df1
-------------------- -------- ---------------- ----------
| ID|colA. |colB. |colC |
-------------------- -------- ---------------- ----------
| 1| 0| 10| APPLES|
| 2| 0| 20| APPLES|
|. 3| 0| 30| PEARS|
-------------------- -------- ---------------- ----------
df2
-------------------- -------- ---------------- ----------
| ID|colA. |colB |colC |
-------------------- -------- ---------------- ----------
| 1| 0| 10| APPLES|
| 2| 0| 20| PEARS|
| 3| 0| 10| APPLES|
-------------------- -------- ---------------- ----------
我需要比較這兩個資料框并在包含 4 列的 df3 中提取差異:包含差異的列名、來自 df1 的值、來自 df2 的值、ID 如何在不使用列名的情況下實作這一點,我只能使用ID 硬編碼。
-------------------- -------- ---------------- ------------- -----
| Column Name |Value from df1. |Value from df2| ID |
-------------------- -------- ---------------- -------------- -----
| col B | 30| 10| 3. |
| col C | APPLES| PEARS| 2. |
| col C | PEARS| APPLES| 3. |
-------------------- -------- ---------------- --------------- ----
到目前為止,我所做的是提取包含差異的列的名稱,但我一直堅持如何獲取這些值。val columns = df1.columns val df_join = df1.alias("d1").join(df2.alias("d2"), col("d1.id") === col("d2.id"), "剩下”)
val test = columns.foldLeft(df_join) {(df_join, name) => df_join.withColumn(name
"_temp", when(col("d1." name) =!= col("d2." name), lit(name))))}
.withColumn("Col Name", concat_ws(",", columns.map(name => col(name "_temp")): _*))
uj5u.com熱心網友回復:
你可以這樣試試:
// Consider the below dataframes
df1.show()
--- ---- ---- ------
| ID|colA|colB| colC|
--- ---- ---- ------
| 1| 0| 10|APPLES|
| 2| 0| 20|APPLES|
| 3| 0| 30| PEARS|
--- ---- ---- ------
df2.show()
--- ---- ---- ------
| ID|colA|colB| colC|
--- ---- ---- ------
| 1| 0| 10|APPLES|
| 2| 0| 20| PEARS|
| 3| 0| 10|APPLES|
--- ---- ---- ------
// As ID column can be hardcoded, we can use it to exclude from the list of all the columns of the dataframe so that we will be left with the remaining columns
val df1_columns = df1.columns.to[ListBuffer].-=("ID")
val df2_columns = df2.columns.to[ListBuffer].-=("ID")
// obtain the number of columns to use it in the stack function later
val df1_columns_count = df1_columns.length
val df2_columns_count = df2_columns.length
// obtain the columns in dynamic way to use in the stack function
var df1_stack_str = ""
var df2_stack_str = ""
// Typecasting columns to string type to avoid conflicts
df1_columns.foreach { column =>
df1_stack_str = s"'$column',cast($column as string),"
}
df1_stack_str = df1_stack_str.substring(0,df1_stack_str.lastIndexOf(","))
// Typecasting columns to string type to avoid conflicts
df2_columns.foreach { column =>
df2_stack_str = s"'$column',cast($column as string),"
}
df2_stack_str = df2_stack_str.substring(0,df2_stack_str.lastIndexOf(","))
/*
In this case the stack function implementation would look like this
val df11 = df1.selectExpr("id","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name_,value_from_df2)")
*/
val df11 = df1.selectExpr("id",s"stack($df1_columns_count,$df1_stack_str) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_",s"stack($df2_columns_count,$df2_stack_str) as (column_name_,value_from_df2)")
// use inner join to get value_from_df1 and value_from_df2 in one dataframe and apply the filter
df11.as("df11").join(df21.as("df21"),expr("df11.id=df21.id_ and df11.column_name=df21.column_name_"))
.drop("id_","column_name_")
.filter("value_from_df1!=value_from_df2")
.show
// Final output
--- ----------- -------------- --------------
| id|column_name|value_from_df1|value_from_df2|
--- ----------- -------------- --------------
| 2| colC| APPLES| PEARS|
| 3| colB| 30| 10|
| 3| colC| PEARS| APPLES|
--- ----------- -------------- --------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/432901.html
