val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()
這是我寫的關于spark streaming讀取kafka資料的程式,但是當資料量大的時候,就會堵死,我想實作并發的功能,已達到資料的實時性,該如何去做?謝謝大家了
官網有這個 KafkaUtils.createDirectStream
但是我用的時候會出錯Received -1 when reading from channel, socket has likely been closed
這個怎么用
uj5u.com熱心網友回復:
你是不是連了zookeeper?createDirectStream直接流模式是連的brokeruj5u.com熱心網友回復:
val numInputDStreams = 4val kafkaDStreams = (1 to numInputDStreams).map { _ =>KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
kafkaDStreams.map(
你的處理邏輯
)
多行程讀取kafka
提交的時候加上這個,后面的數字根據你集群的處理能力來定,每秒鐘每個行程最多從每個partition消費多少資料
--conf spark.streaming.kafka.maxRatePerPartition=10000
uj5u.com熱心網友回復:
配置一下 spark.streaming.backpressure.enabled 和 spark.streaming.backpressure.initialRate 兩個引數轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/61481.html
標籤:Spark
