我有兩個資料框 df1 和 df2。我必須從 df2 的 df1 中添加一個新列:
df1
X Y Z
1 2 3
4 5 6
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
df2 中 col1 的值應該作為新列(如果它不存在)添加到 df1 和 col2 作為新列的值。例如:
df1
X Y Z XX YY ZZ
1 2 3 aa bb vv
4 5 6 cc
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
uj5u.com熱心網友回復:
首先,spark資料集是分布式的。但是列名是模式的一部分,因此它們在主服務器的記憶體中。因此,要為 的每個不同值添加列df2.col1,您首先需要在 master 中獲取這些值(即收集)
// inputs
val df1 = List((1,2,3), (4,5,6), (7,8,9), (3,6,9)).toDF("X", "Y", "Z")
val df2 = List(("XX", "aa"), ("YY", "bb"), ("XX", "cc"), ("ZZ", "vv")).toDF("col1", "col2")
val newColumns = df2.select("col1").as[String].distinct.collect
val newDF = newColumns.foldLeft(df1)( (df, col) => df.withColumn(col, lit("?")))
newDF.show
--- --- --- --- --- ---
| X| Y| Z| ZZ| YY| XX|
--- --- --- --- --- ---
| 1| 2| 3| ?| ?| ?|
| 4| 5| 6| ?| ?| ?|
| 7| 8| 9| ?| ?| ?|
| 3| 6| 9| ?| ?| ?|
--- --- --- --- --- ---
但
- 我不知道你想在那些列中放入什么值(上面,我到處都放了“?”)
- 如果df2中有很多行,比如千分之十,它可以殺死master收集并將它們全部添加到df1
現在,再多說一點,這里是如何從 df2.col1 添加列并將 df2.col2 的連接值作為值放置
val toAdd = df2.groupBy("col1").agg(concat_ws(",", collect_set("col2")).as("col2All"))
toAdd.show
---- -------
|col1|col2All|
---- -------
| ZZ| vv|
| YY| bb|
| XX| cc,aa|
---- -------
val newColumns = toAdd.rdd.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2All"))).collectAsMap()
val newDF = newColumns.foldLeft(df1){ case (df, (name, value)) => df.withColumn(name, lit(value))}
newDF.show
--- --- --- ----- --- ---
| X| Y| Z| XX| YY| ZZ|
--- --- --- ----- --- ---
| 1| 2| 3|cc,aa| bb| vv|
| 4| 5| 6|cc,aa| bb| vv|
| 7| 8| 9|cc,aa| bb| vv|
| 3| 6| 9|cc,aa| bb| vv|
--- --- --- ----- --- ---
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/312775.html
