我有一個帶有這樣字串的 RDD(以特定方式排序):
["A","B","C","D"]
另一個帶有如下串列的 RDD:
["C","B","F","K"],
["B","A","Z","M"],
["X","T","D","C"]
我想根據它們在第一個 RDD 中出現的順序對第二個 RDD 中每個串列中的元素進行排序。沒有出現在第一個串列中的元素的順序無關緊要。
從上面的例子中,我想得到一個這樣的 RDD:
["B","C","F","K"],
["A","B","Z","M"],
["C","D","X","T"]
我知道我應該使用廣播變數來廣播第一個 RDD,因為我處理第二個 RDD 中的每個串列。但是我對 Spark/Scala(以及一般的函式式編程)非常陌生,所以我不確定如何做到這一點。
uj5u.com熱心網友回復:
我假設第一個 RDD 很小,因為您談論的是廣播它。在這種情況下,您是對的,廣播排序是解決問題的好方法。
// generating data
val ordering_rdd = sc.parallelize(Seq("A","B","C","D"))
val other_rdd = sc.parallelize(Seq(
Seq("C","B","F","K"),
Seq("B","A","Z","M"),
Seq("X","T","D","C")
))
// let's start by collecting the ordering onto the driver
val ordering = ordering_rdd.collect()
// Let's broadcast the list:
val ordering_br = sc.broadcast(ordering)
// Finally, let's use the ordering to sort your records:
val result = other_rdd
.map( _.sortBy(x => {
val index = ordering_br.value.indexOf(x)
if(index == -1) Int.MaxValue else index
}))
請注意,如果在串列中找不到該元素,則indexOf回傳-1。如果我們保持原樣,所有未找到的元素都會在開頭結束。我知道你最后想要它們,所以我換-1了一些大數字。
列印結果:
scala> result.collect().foreach(println)
List(B, C, F, K)
List(A, B, Z, M)
List(C, D, X, T)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/364794.html
