我正在研究火花,使用 scala,我有 2 個 DataFrame
DF 1 的架構 -
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|--....
DF 2-的架構
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|. |-- phone: string (nullable = false)
如何在 DF1 上phone向欄位添加欄位,employee
注意:并非 DF1 的所有員工都在 DF2 中,因此如果員工不在 DF2 中,則該phone欄位應設定為000
uj5u.com熱心網友回復:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
name: String,
id: String,
salary: Long,
dept: String,
phone: String
)
case class E1(employee: C1)
case class E2(employee: C2)
import spark.implicits._
val empl1DF =
Seq(
E1(C1("n1", "1", 1, "1")),
E1(C1("n2", "2", 2, "2")),
E1(C1("n5", "5", 5, "5"))
).toDF()
val empl2DF = Seq(
E2(C2("n1", "1", 1, "1", "1111")),
E2(C2("n2", "2", 2, "2", "22222")),
E2(C2("n3", "3", 3, "3", "3333"))
).toDF()
empl1DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
empl1DF.show(false)
// -------------
// |employee |
// -------------
// |[n1, 1, 1, 1]|
// |[n2, 2, 2, 2]|
// |[n5, 5, 5, 5]|
// -------------
empl2DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
empl2DF.show(false)
// --------------------
// |employee |
// --------------------
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n3, 3, 3, 3, 3333] |
// --------------------
val df1 = empl1DF
.join(
empl2DF,
empl1DF.col("employee.id") === empl2DF.col("employee.id"),
"left"
)
.select(
empl1DF.col("employee.name"),
empl1DF.col("employee.id"),
empl1DF.col("employee.salary"),
empl1DF.col("employee.dept"),
empl2DF.col("employee.phone")
)
val resDF = df1.na
.fill("000", Seq("phone"))
.select(
struct(
col("name"),
col("id"),
col("salary"),
col("dept"),
col("phone")
).as("employee")
)
resDF.printSchema()
// root
// |-- employee: struct (nullable = false)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = true)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
resDF.show(false)
// --------------------
// |employee |
// --------------------
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n5, 5, 5, 5, 000] |
// --------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/486219.html
標籤:数据框 斯卡拉 阿帕奇火花 apache-spark-sql
上一篇:Scala:求和型別的函式映射
