概述
在kafka中,或者是說在任何訊息佇列中都有個消費順序的問題,為了保證一個佇列順序消費,當當中一個訊息消費例外時,必將影響后續佇列訊息的消費,這樣業務豈不是卡住了,比如筆者舉個最簡單的例子:我發送1-100的訊息,在我的處理邏輯當中 msg%5==0我就進行 int i=1/0操作,這必將拋例外,一直阻塞在msg=5上,后面6-100無法消費,下面筆者給出解決方案,
重試一定次數(訊息丟失)
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
public void test6(String msg){
businessProcess(msg);
}
private void businessProcess(String msg){
System.out.println("接收到訊息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
說明:如果讀者使用的是java客戶端,也就是spring進行實作,那么在不做任何處理的情況下,會自動重試10次,然后訊息會被直接處理掉,也就是說如果你的業務允許訊息丟失,那么你不需要額外的編碼處理
加入到死訊佇列(訊息不丟失)
消費端代碼:
//1.啟用手動提交offset
//2.配置errorHandler,用來加入到死訊佇列
//3.不管業務處理是否處理例外還是正常都提交offset
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
public void test6(String msg,Acknowledgment ack){
try {
businessProcess(msg);
}finally {
//手動提交
ack.acknowledge();
}
}
//1.專門處理死訊佇列訊息,都是topicName+.DLT的主題
//2.死訊佇列里,只有消費成功的才提交offset,否則等待bug修復完上線,繼續處理
@KafkaHandler
@KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
public void test7(String msg,Acknowledgment ack){
try {
businessProcess(msg);
ack.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
//業務代碼
private void businessProcess(String msg){
System.out.println("接收到訊息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
例外處理器
//1.向容器注冊一個KafkaListenerErrorHandler型別的bean
//2.該bean就是當處理訊息例外的時候,將訊息加入到.DLT主題中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_DLT=".DLT";
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
System.out.println("消費失敗訊息:"+message.toString());
//獲取訊息處理例外主題
MessageHeaders headers = message.getHeaders();
String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
//放入死訊佇列
kafkaTemplate.send(topic,message.getPayload());
return message;
}
}
效果圖:
說明:以上基本上就是使用死訊佇列的方案,也許讀者會覺得這樣編碼復雜度很高,但其實不用擔心,其實上面這些代碼基本上是使用死訊佇列的模板代碼,在成熟一點的公司,一般會使用上述代碼進行簡單封裝,這里筆者給個思路,有興趣同學可以實作一下,我們其實可以使用aop思想,進行自定義一個@EnableDLT這樣的注解去實作,這樣上面這個方案使用起來是不是就簡單優雅了,之前筆者在開發程序中使用過亞馬遜的訊息佇列服務,也不過是這樣實作罷了,
最后:
最近我整理了整套《JAVA核心知識點總結》,說實話 ,作為一名Java程式員,不論你需不需要面試都應該好好看下這份資料,拿到手總是不虧的~我的不少粉絲也因此拿到騰訊位元組快手等公司的Offer
進【Java進階之路】,找管理員獲取哦-!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/289106.html
標籤:其他
上一篇:OpenMVG 系列 (2):Image 和 Numeric
下一篇:JDK JRE JVM的區別
