我正在嘗試創建一個類似于 Facade 的功能CaseWhen我正在嘗試創建一個類似于spark 函式
CaseWhen(
branches: Seq[(Expression, Expression)],
elseValue: Option[Expression] = None): ...
所以,CaseWhen需要一個 Seq(Expression, Expression),第一個Expression是條件,第二個是值。它還需要一個Option[Expression]else 值。
基本上,我想給出一個嵌套的Map[Column, Column]物件,第一個Column是條件,第二個是滿足此條件時要放置的值。
我設法為最簡單的用例做到了:只有一個級別的(Condition, Value). 這是我的代碼:
def withExpr(expr: Expression): Column = new Column(expr)
def caseWhenFacade(
outputColName: String,
conditionsValues: Map[Column, Column],
defaultValue: Option[Column] = None
)(df: DataFrame): DataFrame = {
require(conditionsValues.nonEmpty, "Cannot call caseWhenFacadewith an Empty Map")
val conditionalMap = conditionsValues.map(x => (x._1.expr, x._2.expr)).toSeq
defaultValue match {
case Some(value) => df.withColumn(
colName,
withExpr {CaseWhen(
conditionalMap,
value.expr
)}
)
case None => df.withColumn(
colName,
withExpr {CaseWhen(
conditionalMap
)}
)
}
}
以下是我如何使用該功能:
val spec = Map(
($"column_one" === 1) -> lit(2),
($"column_one" === 2 && $"column_two" === 1) -> lit(1),
($"column_one" === 3) -> lit(4),
)
df.transform(caseWhenFacade("column_three", spec))
這很好用,但是現在,我想給出這個規范 Map :
val spec: Map[Column, Any] = Map(
($"column_one" === 1) -> Map(
($"column_two" === 2) -> lit(54),
($"column_two" === 5) -> lit(524)
),
($"column_one" === 2) -> Map(
($"column_two" === 7) -> Map(
($"whatever_column" === "whatever") -> lit(12),
($"whatever_column" === "whatever_two") -> lit(13)
),
($"column_two" === 8) -> lit(524)
),
($"column_one" === 3) -> lit(4)
)
這顯然不適用于我的caseWhenFacade功能。而且我真的想不出辦法做到這一點。
誰能幫我嗎 ?
uj5u.com熱心網友回復:
那是因為(如您所知),的實體Map[Column, Any]不是 spark 的實體Column。我可以想到兩種方法,其中一種是不安全的方法,我不會深入解釋,但為了讓您對此有所了解,您可以conditionsValues: Map[Column, Any]在您的中使用caseWhenFacade,并嘗試將條件行內到單次Map[Column, Column]使用&&按鍵。我推薦的另一種方法與第一種方法非常相似,但它是安全的。您可以創建一個 ADT 來代表您的條件和值:
sealed trait ConditionValue {
def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)]
}
case class SingleConditionValue(condition: Column, value: Column) extends ConditionValue {
override def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)] =
(condition -> value) :: Nil
}
case class NestedConditionValue(conditions: Map[Column, ConditionValue]) extends ConditionValue {
def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)] =
conditions.mapValues(_.enumerate(conditionReduce)).map {
case (outerCondition, innerExpressions) =>
innerExpressions.map { case (innerCondition, innerValue) =>
conditionReduce((outerCondition, innerCondition)) -> innerValue
}
}
}
只是為了快速了解這種建模的含義,在您的嵌套 Map 物件中,它看起來像這樣:
val spec: Map[Column, Any] = Map(
($"column_one" === 1) -> Map(
($"column_two" === 2) -> lit(54),
($"column_two" === 5) -> lit(524)
)
)
表達這張地圖的另一種方式是這樣的:
val spec: Map[Column, Column] = Map(
(($"column_one" === 1) && ($"column_two" === 2)) -> lit(54),
(($"column_one" === 1) && ($"column_two" === 2)) -> lit(524)
)
正確的?如您所見,我們正在使用&&. 所以這正是方法conditionReduce中的目的enumerate。我所說的縮減是指將外部嵌套映射縮減為單個映射,其中嵌套條件被縮減為一個。最后,您的代碼將如下所示:
def caseWhenFacade(
outputColName: String,
conditionsValues: ConditionValue,
defaultValue: Option[Column] = None
)(df: DataFrame): DataFrame = {
require(conditionsValues.nonEmpty, "Cannot call caseWhenFacadewith an Empty Map")
// this can be defined globally
val and: (Column, Column) => Column = _ && _
val conditionalMap: List[(Column, Column)] = conditionsValues.enumerate(and) // this meand do the 'and' operation on conditions
defaultValue match {
case Some(value) =>
df.withColumn(
colName,
withExpr {CaseWhen(
conditionalMap,
value.expr
)}
)
case None =>
df.withColumn(
colName,
withExpr {CaseWhen(
conditionalMap
)}
)
}
}
還有一件事很明顯,就是您需要將您的條件和值包裝到此 ADT 的實體中,或者您可以使用隱式轉換來簡化使用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/535446.html
標籤:斯卡拉阿帕奇火花
