假設我有這些案例類
case class Employee(id。Long, proj_id: Long, office_id: Long, salary: Long)?
case class Renumeration(id。Long, amount: Long)?
我想用Spark更新一個基于Renumeration的Employee的集合
val right: Dataset[Renumeration] =?
val left: Dataset[Employee] = ??
left.joinWith(broadcast(right),left("proj_id") === right("id")," leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),left("office_id") === right("id") ,"leftouter")
.map { case(left,right) => updateOffice(left,right) }
def updateProj(emp: Employee; ren: Renumeration)。) Employee = /business logic
def updateOffice(emp: Employee; ren: Renumeration)。) Employee = /business logic: 鈾?
第一個join和map作業,然而當我引入第二個join時,Spark未能解決id列,而是顯示這些。
org.apache.spark.sql.AnalysisException。Resolved attribute(s) office_id#42L missing from id#114L, salary#117L, id#34L,amount#35L,proj_id#115L,office_id#116L in operator ! Join LeftOuter, (office_id#42L = id#34L) 。Attribute(s) withthe same name appear in the operation: office_id. 請檢查是否使用正確的屬性。
!Join LeftOuter, (office_id#42L = id#34L)
:- SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])。 id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true]). proj_id AS proj_id#115L。assertnotnull(assertnotnull(input[0, Employee, true])。 office_id AS office_id#116L。assertnotnull(assertnotnull(input[0, Employee, true])。 salary AS salary#117L] 。
: - MapElements <function1>, class scala. Tuple2, [StructField(_1,StructType(StructField(id,LongType, false), StructField(proj_id,LongType, false), StructField(office_id,LongType, false), StructField(salary,LongType,false)),true), StructField(_2,StructType(StructField(id。 LongType,false), StructField( amount,LongType,false)),true)], obj#113: Employee: Employee.
: - DeserializeToObject newInstance(class scala. Tuple2), obj#112>: scala.Tuple2.
: - Join LeftOuter, (_1#103.proj_id = _2#104.id)
: : - Project [named_struct(id, id#40L, proj_id, proj_id#41L, office_id, office_id#42L, salary, salary#43L) AS _1#103 ]
: : - LocalRelation < empty> , [id#40L, proj_id#41L, office_id#42L, salary#43L] ?
: - Project [named_struct(id, id#34L, amount, amount#35L) AS _2#104] 。
: - ResolvedHint (廣播)。
: - LocalRelation <empty>, [id#34L, amount#35L] 。
- ResolvedHint (廣播)
- LocalRelation <empty>, [id#34L, amount#35L]
有什么想法嗎,為什么Spark不能解決這個列,即使我已經使用了打字的Dataset?另外,如果可能的話,我應該怎么做才能使其發揮作用呢?
uj5u.com熱心網友回復:
這個錯誤是因為left("office_id")回傳的參考不再存在于新的預測資料集(即第一個連接和映射操作產生的資料集)。
如果你仔細看一下嵌套關系中的執行計劃,
: - LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]/code>
你可以觀察到,在left資料集中對office_id的參考是office_id#42L。然而,如果你看一下后面的執行情況,你會發現這個參考在投影中不再存在
Serialize的資料集是office_id#42L。
SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])) 。 proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true]).sary AS salary#117L]
因為可用的office_id參考是office_id#116L。
為了解決這個問題,你可以使用中介/臨時變數,例如:
val right: Dataset[Renumeration] =?
val left: Dataset[Employee] =?
val leftTemp = left.joinWith(broadcast(right),left("proj_id") === right("id"),"lftouter")
.map { case(left,right) => updateProj(left,right) }
val leftFinal = leftTemp.joinWith(broadcast(right),leftTemp("office_id") ==right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
或者你可以嘗試在你的連接中使用以下速記方法 $"office_id" === right("id") 例如
left.joinWith(broadcast(right),left("proj_id") === right("id"),"loutouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),$"office_id" === right("id")," leftouter")
.map { case(left,right) => updateOffice(left,right) }
讓我知道這是否對你有用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/309104.html
標籤:
