我運行spark自帶的Wordcount例子,為什么只有在CTRL + C后,才會在命令列輸出結果?即使把print()改成saveastextfile,也是在中斷程式后才寫檔案。請教各位這是什么原因?
代碼如下:
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/83357.html
標籤:Spark
上一篇:VirtualShiled是一個基于內省技術的虛擬機監控和入侵檢測系統
下一篇:PC機和虛擬機輸入法切換問題
