我正在嘗試將包含以下格式資料的列拆分并轉換為 udf 中的映射。
UDF:
def convertToMapUDF = udf((c: String) => {
val arr = c.split(",")
val l = arr.toList
val regexPattern = ".*(=).*".r
println(s"column value: $c")
s"$c" match {
case regexPattern(a) => Some(l.map(x => x.split("=")).map(a => a(0).toString -> a(1).toString).toMap)
case "null" => Some(Map[String, String]())
}
})
val splitColList = List("r_split")
val d = ft.select(splitColList.map(c => convertToMapUDF(col(c))): _*)
r_split 列包含類似的資料
null
null
As=true, eMsion_New:E=true, HR:E=true, Don:E=true, Hrs=true, PAD:E=true, mog:E=true, N4k:E=true, WY:E=true, AT:E=true, Dt_RFC:E=true, DASH_ALL:E=true, TE:E=true, C_14:E=true, We:E=true, PG:E=true, ZR:E=true, MP:E=true, m2M:E=true, HC:E=true, Nos:E=true,
null
例外:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3712.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3712.0 (TID 37567, 10.73.35.140, executor 48): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$10845/2054440395: (string) => map<string,string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at line6ec706fd41324bde944dab3ff50b81c6224.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$convertToMapUDF$1(command-1649645762990220:2)
... 14 more
我嘗試使用 Option 和 Some 但問題沒有得到解決。在上述字串上作為獨立函式執行時的 udf 作業正常。
uj5u.com熱心網友回復:
最后一行是不必要的混亂,與您的問題無關。你基本上是說
val d = ft.select(convertToMapUDF(col("r_split")))
對?
無論如何,對于缺失值 ( null),您期望什么行為?我猜你想跳過它們,那就是 keep null。此行為在 spark 中實作,用于處理不可為空型別(Float/Int/etc)的 udfs。對于可為空的型別,包括String,需要自己實作。只需檢查c == nulludf 中的第一件事,然后null在這種情況下回傳。抱歉,我看到您的函式回傳Option[Map[_,_]]. 在那種情況下回傳None。
我認為(目前不確定)您也可以將您的引數更改為,Option[String]但您仍然必須自己處理None案例,也許null也是如此。我對此有點模糊。
uj5u.com熱心網友回復:
空值來自您的資料集。您可以使您的 udfs null 安全cala.util.Try(如果您不想丟失記錄)。只需包裹convertToMapUDF到 Try 結構中即可。它將確保捕獲任何非致命例外并回傳一個 Failure 物件。
def convertToMapUDF: UserDefinedFunction = udf((c: String) => {
Try {
val arr = c.split(",")
val l = arr.toList
val regexPattern = ".*(=).*".r
println(s"column value: $c")
s"$c" match {
case regexPattern(a) => l.map(x => x.split("=")).map(a => a(0) -> a(1)).toMap
case _ => Map[String, String]()
}
}.toOption
})
uj5u.com熱心網友回復:
您根本不需要在這里使用 UDF:Spark 擁有您需要的所有功能。考慮以下:
ft
.withColumn("id", monotonically_increasing_id())
.select(col("id"), explode(split(col("r_split"), ",")) as "r")
.select(col("id"), split(col("r"), "=") as "s")
.select(col("id"), map_from_arrays(array(col("s")(0)) as "k", array(col("s")(1)) as "v") as "map")
.groupBy("id").agg(collect_list("map") as "maps")
.select(
aggregate(col("maps"), typedLit(Map[String, String]()), (acc, nxt) => map_concat(acc, nxt)) as "r_split"
)
首先,我們添加一個 ID,以便我們可以在最后將所有內容重新組合在一起,然后獲取您的輸入字串r_split,并在“,”處將其斷開。然后它“分解”結果陣列,以便每個術語都有它自己的Row. 然后我們拆分“=”符號上的行以創建另一個陣列。接下來,我們通過選擇陣列的第一個元素作為鍵和第二個元素作為值來創建映射。這為我們提供了每個條目的映射,因此我們通過 ID(因此是第一步)將它們聚集到一個串列中。最后,我們將該串列縮減為單個地圖。
我覺得這里的步驟太多了,各種拆分/爆炸階段可能我們可以用 更有效地完成regexp_extract,但這演示了不必使用 UDF 的想法。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/333175.html
上一篇:Scala得到一個子串
下一篇:D3過時的鏈接仍然可見
