@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必須要加,如果要讀舊資料
props.put("zookeeper.connect", "Master:2181");
props.put("zk.connectiontimeout.ms", "10000");
props.put("group.id", "test-consumer-group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1); // 一次從主題中獲取一個資料
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get("test").get(0);// 獲取每次接收到的這個資料
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
System.out.println("consummer...");
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
producer正常運行,但是consumer沒有取到資料,"group.id", "test-consumer-group"是從conf中的consumer來的,消費者還需要其他配置?程式卡在while(iterator.hasNext()){ 這一行了,怎么解決能讓它繼續運行?
uj5u.com熱心網友回復:
help help~轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/78679.html
標籤:Spark
上一篇:走進誤區,讓你更了解Docker
