kafka消費者能正常消費,能從kafka集群中取到資料,消費者的邏輯是這樣的。
KafkaStream<byte[], byte[]> streamt = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = streamt.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
// now launch all the threads
executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
但是全部資料消費完之后,再執行到 while (it.hasNext())時,就會出現這個問題:
2016-10-23 16:36:04 [main-SendThread(localhost:2181):1414357] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 0ms
2016-10-23 16:36:07 [main-SendThread(localhost:2181):1417025] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms
2016-10-23 16:36:10 [main-SendThread(localhost:2181):1419692] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms
2016-10-23 16:36:12 [main-SendThread(localhost:2181):1422359] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms
而且每隔1秒列印一次這個,程式不往下進行了。 是不是kafka連不上zookeeper呢 ?
大神們幫忙看看啊,,。。這個不知道怎么解決了。。
uj5u.com熱心網友回復:
貌似你的log4j log 級別設的是DEBUG,調高點就好了。轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/71176.html
標籤:云存儲
上一篇:Hbase SingleColumnValueFilter條件查詢
下一篇:RD and pipeline
