版本如圖

讀取代碼如下:
JavaInputDStream<String> stream = KafkaUtils.createDirectStream(jssc,
String.class, String.class, StringDecoder.class,
StringDecoder.class, String.class, kafkaParams_direct,
fromOffsets,
new Function<MessageAndMetadata<String, String>, String>() {
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
return v1.message();
}
});
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
//Map<String, Long> offsets = new HashMap<String, Long>();
//message.print();
lines=message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
// TODO Auto-generated method stub
OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();
offsetRanges.set(offsets);
return v1;
}
});
當我用rdd.foreachPartition只列印offset資訊和最后一條資訊時 offset資訊是連續的,但是當我在rdd.foreachPartition做處理,寫入hbase時 offset是不連續,漏讀了很多訊息,有遇到這種情況的嗎?
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/51963.html
標籤:Spark
