最近寫了個kafka_producer測驗程式,可是每當程式運行到一定時間后就報錯,重新創建新topic也不行:
Fetching topic metadata with correlation id 15 for topics [Set(mykafka1)] from broker [id:1,host:192.168.231.12,port:9092] fail
ERROR DefaultEventHandler: Failed to send requests for topics mykafka1 with correlation ids in [0,20]
producer:
object MyProducer {
def getProducerConfig(brokerAddr: String): Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[ItelogEncoder[Person]].getName)
props.put("key.serializer.class", classOf[StringEncoder].getName)
// props.put("replica.fetch.max.bytes", "" + 1024 * 1024 * 6)
// props.put("message.max.bytes", "" + 1024 * 1024 * 4)
props
}
def sendMessages(topic: String, messages: List[Person], brokerAddr: String) = {
val producer = new Producer[String, Person](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(messages.map {
new KeyedMessage[String, Person](topic, "Iteblog", _)
}: _*)
producer.close()
}
def main(args: Array[String]) {
val Array(brokerAddr, topic) = args
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
for (i <- 0.to(10000)) {
val data = List(Person("xiaoming" + i, 23), Person("xiaohua" + i, 24), Person("xiaoliu" + i, 26))
sendMessages(topic, data, brokerAddr)
}
}
case class Person(var name: String, var age: Int)
求大神們幫幫忙,小白在此謝過
uj5u.com熱心網友回復:
剛好也犯了同樣的錯誤,而且時間也最近幾天,希望對你有用;問題部分在于這段代碼:(每次發送訊息都創建新實體)
def sendMessages(topic: String, messages: List[Person], brokerAddr: String) = {
val producer = new Producer[String, Person](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(messages.map {
new KeyedMessage[String, Person](topic, "Iteblog", _)
}
uj5u.com熱心網友回復:
val producer = new Producer[String, Person](new ProducerConfig(props))for (j <- 0.to(10000)) {
producer.send(data.map {
new KeyedMessage[String, Person](topic, "Iteblog", _)
}: _*)
producer.close()
}
如果將新建物件寫到for回圈外,就會報錯:
Exception in thread "main" kafka.producer.ProducerClosedException: producer already closed
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/53477.html
標籤:Spark
