僅列出 2 個解決方案,我曾嘗試實作將 spark udf 應用于某些列的用例,但我不確定為什么即使我嘗試實作相同的功能,我的兩個函式的行為也完全不同. 有人可以解釋內部作業,在這兩種情況下究竟發生了什么?
功能一:
def transformColumns(df: DataFrame, transformationType: String, sanitizationList: List[Sanitization]): DataFrame = {
try {
sanitizationList.foldLeft(df) {
(outerAccumulator: DataFrame, sanitization: Sanitization) =>
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_)
sanitization.column.foldLeft(outerAccumulator: DataFrame) {
(innerAccumulator: DataFrame, elem: String) =>
innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
}
}
功能二:
def transformColumns(df: DataFrame, columns: Map[Seq[String], TAlgorithm]): DataFrame = {
try {
columns.foldLeft(df) {
(accumulator: DataFrame, sanitization: (Seq[String], TAlgorithm)) =>
import org.apache.spark.sql.functions.udf
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_))
sanitization._1.foreach{
elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
accumulator
}
}
在第二種情況下,沒有任何列被轉換,不知道為什么。
uj5u.com熱心網友回復:
在你的第一個例子中
sanitization.column.foldLeft(outerAccumulator: DataFrame) { (innerAccumulator: DataFrame, elem: String) => innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null))) }foldLeft計算innerAccumulator.withColumn...將innerAccumulator:DataFrame在下一次迭代中出現的最后一行 ( ) 。在你的第二個例子中
sanitization._1.foreach{ elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null))) }DataFrame是不可變的,因此withColumn回傳一個新的DataFrame. 但是由于for_each回傳Unit,由創建的新 DFaccumulator.withColumn丟失了
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/311447.html
