我在使用 Mongodb 和 Spark 時遇到聚合問題。我對此不是很專業,我什至不知道是否存在我需要的操作。
我有幾條記錄,已經按用戶名匯總了。然后是這個特殊的用戶名,叫做“-”。
username1 data1:100
username2 data1:100
username3 data1:100
username4 data1:100
- data1:55
現在,我需要將用戶名“-”中的data1與所有其他data1相加。
username1 data1:155
username2 data1:155
username3 data1:155
username4 data1:155
- data1:55
我怎樣才能使用 mongodb spark 做到這一點?
其實我有
rawDataRows.///some stuff//.groupBy("username")
這產生了我寫的輸出,有人可以幫助我將用戶名“-”中的資料“合并”到所有其他用戶嗎?
uj5u.com熱心網友回復:
將 MongoDB 放在一邊(JSON 檔案 - 我現在沒有這樣的資源可以使用,請參閱https://docs.mongodb.com/spark-connector/current/python/read-from-mongodb/),然后:
- 如果您已經匯總了資料并且只有 1 個“-”記錄,
- 您所要做的就是將這條記錄過濾到另一個資料框 DF1,
- 然后過濾原始資料 <> '-' 到,比如說,DF2,
- 然后加入并添加值,
- 選擇需要的列。
- 然后加入并添加值,
- 然后過濾原始資料 <> '-' 到,比如說,DF2,
- 您所要做的就是將這條記錄過濾到另一個資料框 DF1,
不需要 GROUP BY。
像這樣:
// Simple data gen: JSON - need to read your MongoDB doc in yourself
import scala.collection.mutable.ListBuffer
val json_content1 = "{'username': 'hello', 'data1': 32}"
val json_content2 = "{'username': 'hello2', 'data1': 64}"
val json_content3 = "{'username': '-', 'data1': 100}"
var json_seq = new ListBuffer[String]()
json_seq = json_content1
json_seq = json_content2
json_seq = json_content3
// JSON in
val json_ds = json_seq.toDS()
json_ds.show(false)
// Make a DF
val df= spark.read.json(json_ds).cache()
df.show(false)
// Get the 2 sets of data
val df1 = df.filter($"username" === "-" ).toDF("dataNull","usernameNull")
val df2 = df.filter($"username" =!= "-" )
df1.show(false)
df2.show(false)
// Add together and Bob's your uncle
val res = df1.join(df2).withColumn("data1", 'data1 'dataNull).drop('dataNull).drop('usernameNull)
res.show(false)
回傳:
----- --------
|data1|username|
----- --------
|132 |hello |
|164 |hello2 |
----- --------
對于我的示例資料。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/311444.html
