我正在使用具有另一個案例類序列的案例類將資料框轉換為資料集
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
ip: String,
ipVersion: Byte,
ipType: String,
city: String,
state: String,
country: String)
現在我有另一個只有 IP 的字串資料集。我的要求是從IpLocationif ipType== "home" or IP dataset has the given IP from獲取所有記錄ipLocation。我正在嘗試在 IP 資料集上使用布隆過濾器來搜索該資料集,但它效率低下,而且通常效果不佳。我想加入 IP 資料集,IpLocation但我遇到了麻煩,因為這是在 Seq 中。我對 spark 和 Scala 很陌生,所以我可能錯過了一些東西。現在我的代碼看起來像這樣
def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
val count = Ips.count
val bloomFilter = Ips.rdd
.mapPartitions { iter =>
val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
iter.foreach(i => b = i)
Iterator(b)
}
.treeReduce(_|_)
bloomFilter
}
val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)
idMonitor.map { x =>
x.ipLocation.filter(
x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
)
}
我只想弄清楚如何加入IpLocation和Ips
uj5u.com熱心網友回復:
您可以IpMonitor使用explode函式在物件中分解陣列序列,然后使用內部聯接過濾掉使用Ips資料集的ips ,最后IpLocation通過按id和分組來重建序列collect_list。
完整代碼如下:
import org.apache.spark.sql.functions.{col, collect_list, explode}
val result = idMonitor.select(col("id"), explode(col("ipLocation")))
.filter(col("col.ipType") === "home")
.join(Ips, col("col.ip") === col("value"))
.groupBy("id")
.agg(collect_list("col").as("value"))
.drop("id")
.as[Seq[IpLocation]]
uj5u.com熱心網友回復:
樣本:
從你的案例類開始,
case class IpLocation(
ip: String,
ipVersion: Byte,
ipType: String,
city: String,
state: String,
country: String
)
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
我已將示例資料定義如下:
val ip_locations1 = Seq(IpLocation("123.123.123.123", 12.toByte, "home", "test", "test", "test"), IpLocation("123.123.123.124", 12.toByte, "otherwise", "test", "test", "test"))
val ip_locations2 = Seq(IpLocation("123.123.123.125", 13.toByte, "company", "test", "test", "test"), IpLocation("123.123.123.124", 13.toByte, "otherwise", "test", "test", "test"))
val id_monitor = Seq(IdMonitor("1", ip_locations1), IdMonitor("2", ip_locations2))
val df = id_monitor.toDF()
df.show(false)
--- ------------------------------------------------------------------------------------------------------
|id |ipLocation |
--- ------------------------------------------------------------------------------------------------------
|1 |[{123.123.123.123, 12, home, test, test, test}, {123.123.123.124, 12, otherwise, test, test, test}] |
|2 |[{123.123.123.125, 13, company, test, test, test}, {123.123.123.124, 13, otherwise, test, test, test}]|
--- ------------------------------------------------------------------------------------------------------
和IP:
val ips = Seq("123.123.123.125")
val df_ips = ips.toDF("ips")
df_ips.show()
---------------
| ips|
---------------
|123.123.123.125|
---------------
加入:
從上面的示例資料中,分解 的陣列IdMonitor并連接IPs。
df.withColumn("ipLocation", explode('ipLocation)).alias("a")
.join(df_ips.alias("b"), col("a.ipLocation.ipType") === lit("home") || col("a.ipLocation.ip") === col("b.ips"), "inner")
.select("ipLocation.*")
.as[IpLocation].collect()
最后,收集到的結果如下:
res32: Array[IpLocation] = Array(IpLocation(123.123.123.123,12,home,test,test,test), IpLocation(123.123.123.125,13,company,test,test,test))
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/361517.html
