我有這樣一個資料框架:
------ ---------- -----------
|品牌 |時間戳 |重量 |
------ ---------- -----------
|BR1 |1632899456|4.0 !
|BR1 |1632901256|4.0 |
|BR300 |1632901796|2.0|
|BR300 |1632899155|2.0|
|BR200 |1632899155|2.0 |
還有這張地圖 :
val map : Map[String, Double] = ("BR1"/span> -> 70. 0, "BR300" -> 90.0 )
我想根據地圖中的內容來更新 "重量 "列。
目的是將行中的值與地圖中的值相加。
輸出結果應該是這樣的。
輸出結果應該是這樣的:
------ ---------- -----------
|品牌 |時間戳 |重量 |
------ ---------- -----------
|BR1 |1632899456|74.0 !
|BR1 |1632901256|74.0|
|BR300 |1632901796|92.0|
|BR300 |1632899155|92.0|
|BR200 |1632899155|2.0 |
我使用Spark 3.0.2版本和SQLContext,使用scala語言。
uj5u.com熱心網友回復:
地圖可以被翻譯成SQL陳述句。這避免了使用UDF,因此可能會提高性能。
val df = ...
val map : Map[String, Double] = Map("BR1"/span> -> 70。 0, "BR300"/span> -> 90.0 )
val sql=map.foldLeft("Weight case brand")((a, b) => s"$a 當 '${b. _1}' 則 ${b._2}") " else 0.0 end"
df.withColumn("Weight", expr(sql)).show()
生成的sql字串是
Weight case 品牌 when 'BR1' then 70. 0 when 'BR300'/span> then 90. 0 else0.0 end
輸出:
----- ---------- ------
|品牌|時間戳|重量|
----- ---------- ------
|br1|1632899456|74.0|
|br1|1632901256|74.0|
|br300|1632901796|92.0|
|br300|1632899155|92.0|
|br200|1632899155|2.0|
----- ---------- ------
uj5u.com熱心網友回復:
你可以使用UDF從map中獲取值,然后用列值做和。
val spark = SparkSession.builder().master("local[*]").getorCreate()
spark.sparkContext.setLogLevel("ERROR"/span>)
import org.apache.spark.sql.function._
import spark.implicits._
val df = Seq(("BR1", 1632901256, 4.0) 。
("BR300"/span>, 1632901796, 2.0)。
("BR200", 1632899155, 2.0) )。 toDF("brand", "timestamp", "weight")
val map: Map[String, Double] = Map("BR1"/span> -> 70. 0, "BR300" -> 90.0)
val broadcastedMap = spark.sparkContext.broadcast(map)
val getvalueFromMap = udf((s: String) => broadcastedMap.value.getOrElse(s, 0.0)
df.withColumn("Weight", getvalueFromMap('brand) 'Weight).show()
/*
----- ---------- ------
|品牌|時間戳|重量|
----- ---------- ------
|br1|1632901256|74.0|
|br300|1632901796|92.0|
|br200|1632899155|2.0|
----- ---------- ------ */
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/309099.html
標籤:
