1. 存在原因:丟失消費/重復消費
1 自動提交offset:
1.1 當自動提交時間為1s時,間隔時間達到1s,offset(100)已經提交,但是資料處理尚未完成(只處理了80)出錯了(掛了),此時從新啟動后會從已經提交的offset(100)開始消費處理,那么81-100這些資料就未處理,導致丟失消費
1.2 當自動提交時間為3s時,資料1s已經處理完了一批,突然掛了,由于提交時間未到,offset未提交,重新啟動時,會重復處理已經處理過的資料,導致重復消費
2 官方手動提交(與上雷同問題)
2.1 同步手動提交
//同步提交,當前執行緒會阻塞 直到提交成功才會 繼續消費后面的資料 效率低下 一般不用
//consumer.commitSync();
2.2 異步手動提交
//異步手動提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if( null != exception){
exception.printStackTrace();
}
}
});
2. 自定義存盤offset
實作方案:
1 實作offset重新分配的機制
2 保證資料處理與offset提交能事務性
實作程序:
1 借助ConsumerRebalanceListener類,重寫重新分配offset的方法以及提交offset的方法
2 結果mysql,實作事務性
3. 代碼實作
package com.dream.bigdata.bi.es.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
/**
* 手動提交offset
*/
public class MyManualCommitOffsetKafkaConsumer {
private static String group = "qdd";
private static String topic = "qdd100";
/**
* 1. 自動提交offset:
* 1.1 當間隔時間到達自動提交時間時,offset已提交,但是
*
* 自定義mysql資料列:group | topic | partition | offset (前三個作為聯合主鍵)
* @param args
*/
public static void main(String[] args) {
// 1. 創建消費者配置資訊
Properties properties = new Properties();
// 2. Kafka集群資訊
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "113.143.100.155:9092,113.143.100.140:9092,113.143.100.148:9092");
// 3. ***** 關閉自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 4. 自動提交時間
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 默認提交時間1000ms
// 5. key、value的反序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 6. 消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
// ** offset重置配置生效條件:1 當Kafka中沒有初始化的offset(換組消費) 2 當Kafka保存訊息已經沒有當前offset的資料
// 面試題:如何重新消費一個主題的資料?答:設定消費者offset重置為earliest,換組消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 7. 創建消費者
KafkaConsumer consumer = new KafkaConsumer(properties);
// 8. 消費者訂閱主題 此處借助于ConsumerRebalanceListener 監聽磁區offset發生變化后的重新分配機制
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
// rebalance之前將記錄進行保存
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
// 獲取磁區
int sub_topic_partition_id = partition.partition();
// 對應磁區的偏移量
long sub_topic_partition_offset = consumer.position(partition);
String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss")
.format(new Date(new Long(System.currentTimeMillis())));
DBUtils.update("replace into offset values(?,?,?,?,?)",
new Offset(
group,
topic,
sub_topic_partition_id,
sub_topic_partition_offset,
date
)
);
}
}
// rebalance之后讀取之前的消費記錄,繼續消費
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
int sub_topic_partition_id = partition.partition();
long offset = DBUtils.queryOffset(
"select sub_topic_partition_offset from offset where consumer_group=? and sub_topic=? and sub_topic_partition_id=?",
group,
topic,
sub_topic_partition_id
);
System.out.println("partition = " + partition + "offset = " + offset);
// 定位到最近提交的offset位置繼續消費
consumer.seek(partition, offset);
}
}
});
while (true) {
// 9. 獲取資料
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
List<Offset> offsets = new ArrayList<>();
// 10. 決議并列印結果
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " ===> " + consumerRecord.value());
String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss").format(
new Date(
new Long(
System.currentTimeMillis()
)
)
);
offsets.add(new Offset(group, topic, consumerRecord.partition(), consumerRecord.offset(), date));
System.out.println("|---------------------------------------------------------------\n" +
"|group\ttopic\tpartition\toffset\ttimestamp\n" +
"|" + group + "\t" + topic + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.timestamp() + "\n" +
"|---------------------------------------------------------------"
);
}
// 異步批量插入offset
for (Offset offset : offsets) {
DBUtils.update("replace into offset values(?,?,?,?,?)", offset);
}
//同步手動提交,當前執行緒會阻塞 直到提交成功才會 繼續消費后面的資料 效率低下 一般不用
//consumer.commitSync();
//異步手動提交
// consumer.commitAsync(new OffsetCommitCallback() {
// @Override
// public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// if( null != exception){
// exception.printStackTrace();
// }
// }
// });
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/265640.html
標籤:其他
上一篇:嵌入式學習DAY6 --- 一維陣列字串處理相關的函式、二維陣列講解、指標初步講解!
下一篇:小結
