流程:kafka中讀取資料,用steaming進行流程處理,我現在對kafka中生產一條資料,spark 每隔2秒計算最近10秒內產生的資料,但是現在reduceByKeyAndWindow資料一直都有資料,無法理解,按道理應該一條資料最多保存10秒鐘。
代碼代碼如下,請高手指教。
object AdsClick1 {
def main(args: Array[String]): Unit = {
val kafkaBrokerList = "192.168.1.114:9092,192.168.1.115:9092,192.168.1.116:9092"
val groupId = "SparkStreaming-ads-test"
val topicSet = Set[String]("ads_test")
val blacklist = "blacklist-test"
val kafkaParamMap = Map[String, String](
"group.id" -> groupId,
"metadata.broker.list" -> kafkaBrokerList,
"auto.offset.reset" -> "largest").toMap
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(s"${this.getClass.getSimpleName}")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.task.maxFailures", "16")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val streamingContext = new StreamingContext(sc, Seconds(2))
streamingContext.checkpoint("checkpoint-ads")
val directStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParamMap, topicSet.toSet)
directStream.foreachRDD(rdd => {
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val kc = new KafkaCluster(kafkaParamMap)
val offsetMap = Map[TopicAndPartition, Long]()
for (offsets <- offsetsList) {
val tp = TopicAndPartition(offsets.topic, offsets.partition)
offsetMap += (tp -> offsets.untilOffset)
}
kc.setConsumerOffsets(groupId, offsetMap.toMap)
})
val adsPairRDD: DStream[(String, (String, String, String,Integer))] = directStream.map(item => {
val fields: Array[String] = item._2.split(",")
(fields(1), (fields(0), fields(2), fields(3),1))
})
val reduceWindowRDD: DStream[(String, (String, String, String, Integer))] = adsPairRDD.reduceByKeyAndWindow(
(a, b) => (a._1, a._2, a._3, a._4 + b._4)
,(a, b) => (a._1, a._2, a._3, a._4 - b._4)
, Seconds(10)
,Seconds(2)
)
reduceWindowRDD.foreachRDD(rdd=>{
rdd.collect().foreach(item=>{
println(item)
})
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
列印結果如下:
(cc,(1528037285,hangzhou,left,1))
(cc,(1528037285,hangzhou,left,1))
(cc,(1528037285,hangzhou,left,1))
(cc,(1528037285,hangzhou,left,1))
(cc,(1528037285,hangzhou,left,1))
(cc,(1528037285,hangzhou,left,0))=》這后面還有資料且后面數字是0,應該沒有資料了,無法理解
(cc,(1528037285,hangzhou,left,0))
(cc,(1528037285,hangzhou,left,0))
(cc,(1528037285,hangzhou,left,0))
.......
.......
.......
一直有這條資料
uj5u.com熱心網友回復:
有高手幫忙回答一下嗎?uj5u.com熱心網友回復:
厲害了,看得我頭暈uj5u.com熱心網友回復:
加個過濾函式,過濾掉v是0的資料不就行了轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/19020.html
標籤:Spark
上一篇:Proxmox VE 能否實作一個可擴展的計算系統?
下一篇:k8s yaml 撰寫
