val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()
這是我寫的關于spark streaming讀取kafka資料的程式,但是當資料量大的時候,就會堵死,我想實作并發的功能,已達到資料的實時性,該如何去做?謝謝大家了
uj5u.com熱心網友回復:
KafkaUtils.createDirectStream普通的createStream方法,是指定某個executor的一個執行緒去充當receiver,單執行緒的從Kafka上消費資料;而DirectStream則是每個executor的每個執行緒都主動去Kafka上獲取資料。但前者Spark會幫你維護Consumer offset,后者要求你自己搞。網上很多DirectStream的檔案,去看看吧
uj5u.com熱心網友回復:
Received -1 when reading from channel, socket has likely been closede嘗試使用那種方法出現這個錯誤,而且這種方法沒有group
val ssc = new StreamingContext(sc, Seconds(1))
val topicsSet = Set("topic1")
val brokers = "10.20.30.91:2181"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
uj5u.com熱心網友回復:
抱歉,由于個人能力有限,幫不了你。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/69114.html
標籤:Spark
上一篇:spark磁區內資料的獲取
