現有1個大表A(ip,value)
ip是ip地址轉換的10進制Double數值
1個小表B(beginip,lastip,label)
beginip至lastip標識了一段ip地址
label表示這段ip地址歸屬的運營商(如:移動,聯通,電信)
需要在大表的map階段做join,結果將ip地址轉換成ip地址歸屬的運營商,請教spark scala的程式實作代碼,謝謝!
uj5u.com熱心網友回復:
val broadCastMap = sc.broadcast(iptable1)val sighttp1 = sighttp.map(x => (x.iremoteipv4,x)).mapPartitions({ iter =>
val m=broadCastMap.value
var f1="NULL"
for {
(k,v) <- iter
val f=m.where(k+">=fstip and lastip>="+k)
if (f.count()>0)
f1 = m.where(k + ">=fstip and lastip>=" + k).first.get(2).toString
} yield (f1,v)
})
演算法實作了,不知道是不是最優的,輸出的結果就是能匹配到的行,匹配不到的行不輸出。
uj5u.com熱心網友回復:
為什么不直接用SQLContext。。。uj5u.com熱心網友回復:
val sqlCtx = new SQLContext(sc)
sqlCtx.read().jdbc(xxx).registerTempTable("t_a")
sqlCtx.read().jdbc(xxx).registerTempTable("t_b")
val res = sqlCtx.sql(" SELECT a.ip , b.label FROM t_a a JOIN t_b b ON a.ip BETWEEN b.beginip AND b.endip ")
res.show(100)
uj5u.com熱心網友回復:
感謝 link0007的建議!我用你的方法實作了。但是我的大表很大,有幾十億行,小表就幾萬行,這種情況在reduce-side join運行非常慢,所以我想map-side join效率會高一些。我的原始碼如下,在spark-shell --master yarn-client集群運行,報NullPointerException,不知道是什么原因?出例外的行是val f = m.where(k + ">=fstip and lastip>=" + k) ,如果這行改成f=“test”這樣的常量就能正常跑通。
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object test1 {
case class httpxdr(localipv4:String,remoteipv4:Double,host:String,wholeurl:String)
case class iptable(fstip:Double,lastip:Double,label:String)
def main(args: Array[String]) {
if (args.length != 1 ){
println("usage is ebda <master> <input> <output>")
return
}
val sc = new SparkContext(args(0), "ebda-spark", System.getenv("SPARK_HOME"))
val hiveCtx = new HiveContext(sc)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val sighttp = hiveCtx.sql("SELECT * FROM httpnew where reportdate='20160510'").map(h => httpxdr(h(1).toString(),h(2).toString().split("\\.")(0).toDouble*1000000000+h(2).toString().split("\\.")(1).toDouble*1000000+h(2).toString().split("\\.")(2).toDouble*1000+h(2).toString().split("\\.")(3).toDouble,h(3).toString(),h(4).toString()))
val iptable1 = hiveCtx.sql("select * from configuration.wwipdsttable").map(ip => iptable(ip(0).toString().toDouble,ip(1).toString().toDouble,ip(3).toString())).toDF()
val broadCastMap = sc.broadcast(iptable1)
val sighttp1 = sighttp.map(line => (line.iremoteipv4,line)).mapPartitions({ iter =>
var f1="NULL"
val m=broadCastMap.value
for {
(k,v) <- iter
val f = m.where(k + ">=fstip and lastip>=" + k)
if (f.count>0)
f1=f.first.get(2).toString
} yield (f1,v)
}).toDF()
sighttp1.show(10)
}
}
uj5u.com熱心網友回復:
broadcast(RDD.collect())就不報空指標錯誤了。但是data frame執行了collect后,data frame的where 等算子就無法使用了,broadcast的物件難道不支持data frame么?uj5u.com熱心網友回復:
我覺得還有進一步從算法上優化的余地。現在你小表的ip都是范圍資料,如果轉換成C類或者B類ip網段的窮舉會有多大?估計不會是天文數字吧。
這樣就把范圍輪詢轉化成了map/reduce最擅長的等值映射了,估計速度會快幾個數量級。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/78661.html
標籤:Spark
