spark官網,多個receive(對應多個輸入dstream)并行運行通過下面的代碼解決:
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
我的程式中,使用kafka源,單個輸入dstream是沒有問題,當采用多個Dstream時,經過測驗,兩個輸入DStream中的資料都接收到了,但問題是:程式只運行一次,或者說只接收一次資料,后面就不再接收了,我的代碼如下:
String groupId = args[0];
String zookeepers = args[1];
String topics = "tpsN5a";
Integer numPartitions = Integer.parseInt(args[3]);
Map<String, Integer> topicsMap = new HashMap<String, Integer>();
for (String topic : topics.split(",")) {
topicsMap.put(topic, numPartitions);
}
// 多長時間統計一次
Duration batchInterval = Durations.seconds(2);
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaConsumerWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap, StorageLevel.MEMORY_AND_DISK_SER());
String topics2 = "tpsN5b";
Map<String, Integer> topicsMap2 = new HashMap<String, Integer>();
topicsMap2.put(topics2, numPartitions);
JavaPairReceiverInputDStream<String, String> kafkaStream2 = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap2, StorageLevel.MEMORY_AND_DISK_SER());
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(2);
kafkaStreams.add(kafkaStream);
kafkaStreams.add(kafkaStream2);
ssc.checkpoint("/spark/stream/checkpoint/d1");
JavaPairDStream<String, String> unifiedStream = ssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
JavaDStream<String> lines = unifiedStream//kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
logger.warn(Thread.currentThread().getName() + " msg1:" + arg0._1 + "|msg2:" + arg0._2);
return arg0._2();
}
});
請教如何解決上面提到的問題,當采用多個輸入DStream并行接收資料時,streaming程式能持續接收資料,而不是只接收一次?
uj5u.com熱心網友回復:
我用的是spark1.3.1,使用了預寫日志,我的預寫日志的接收資料中,能接收到kafka發的訊息,但在程式中怎么接收不到呢uj5u.com熱心網友回復:
問題解決了嗎?我也遇到了類似的問題。uj5u.com熱心網友回復:
運行了一下,沒問題,在程式最后加上如下代碼即可:ssc.start();
ssc.awaitTermination();
uj5u.com熱心網友回復:
請問你的多個輸入DStream的groupid是一樣的嗎?換句話說你的多個輸入DStream都屬于一個消費者群組嗎?轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/37632.html
標籤:Spark
下一篇:求救,kafka,大佬救救我
