文章目錄
- 概述
- Code
- POM依賴
- 組態檔
- 配置類
- SeekToCurrentErrorHandler
- 自定義邏輯處理消費例外
- 生產者
- 消費者
- 單元測驗
- 測速結果
- 原始碼地址
概述
Spring-Kafka 提供消費重試的機制,當訊息消費失敗的時候,Spring-Kafka 會通過消費重試機制,重新投遞該訊息給 Consumer ,讓 Consumer 重新消費訊息 ,
默認情況下,Spring-Kafka 達到配置的重試次數時,【每條訊息的失敗重試時間,由配置的時間隔決定】Consumer 如果依然消費失敗 ,那么該訊息就會進入到死信佇列,
Spring-Kafka 封裝了消費重試和死信佇列, 將正常情況下無法被消費的訊息稱為死信訊息(Dead-Letter Message),將存盤死信訊息的特殊佇列稱為死信佇列(Dead-Letter Queue),
我們在應用中可以對死信佇列中的訊息進行監控重發,來使得消費者實體再次進行消費,消費端需要做冪等性的處理,
Code
POM依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入 Spring-Kafka 依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
組態檔
spring:
# Kafka 配置項,對應 KafkaProperties 配置類
kafka:
bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以設定多個,以逗號分隔
# Kafka Producer 配置項
producer:
acks: 1 # 0-不應答,1-leader 應答,all-所有 leader 和 follower 應答,
retries: 3 # 發送失敗時,重試發送的次數
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 訊息的 key 的序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 訊息的 value 的序列化
# Kafka Consumer 配置項
consumer:
auto-offset-reset: earliest # 設定消費者分組最初的消費進度為 earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.artisan.springkafka.domain
# Kafka Consumer Listener 監聽器配置
listener:
missing-topics-fatal: false # 消費監聽介面監聽的主題不存在時,默認會報錯,所以通過設定為 false ,解決報錯
logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka
apache:
kafka: ERROR # kafka
配置類
首先要寫一個配置類,用于處理消費例外 ErrorHandler
package com.artisan.springkafka.configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/2/18 14:32
* @mark: show me the code , change the world
*/
@Configuration
public class KafkaConfiguration {
private Logger logger = LoggerFactory.getLogger(getClass());
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
logger.warn("kafkaErrorHandler begin to Handle");
// <1> 創建 DeadLetterPublishingRecoverer 物件
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// <2> 創建 FixedBackOff 物件 設定重試間隔 10秒 次數為 3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// <3> 創建 SeekToCurrentErrorHandler 物件
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
// @Bean
// @Primary
// public BatchErrorHandler kafkaBatchErrorHandler() {
// // 創建 SeekToCurrentBatchErrorHandler 物件
// SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
// // 創建 FixedBackOff 物件
// BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// batchErrorHandler.setBackOff(backOff);
// // 回傳
// return batchErrorHandler;
// }
}
Spring-Kafka 通過實作自定義的 SeekToCurrentErrorHandler ,當 Consumer 消費訊息例外的時候,進行攔截處理:
- 重試小于最大次數時,重新投遞該訊息給 Consumer
- 重試到達最大次數時,如果Consumer 還是消費失敗時,該訊息就會發送到死信佇列, 死信佇列的 命名規則為: 原有 Topic + .DLT 后綴 = 其死信佇列的 Topic
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
創建 DeadLetterPublishingRecoverer 物件,它負責實作,在重試到達最大次數時,Consumer 還是消費失敗時,該訊息就會發送到死信佇列,
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);

也可以選擇 BackOff 的另一個子類 ExponentialBackOff 實作,提供指數遞增的間隔時間
new SeekToCurrentErrorHandler(recoverer, backOff);
創建 SeekToCurrentErrorHandler 物件,負責處理例外,串聯整個消費重試的整個程序,
SeekToCurrentErrorHandler
在訊息消費失敗時,SeekToCurrentErrorHandler 會將 呼叫 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,將 Consumer 對于該訊息對應的 TopicPartition 磁區的本地進度設定成該訊息的位置,
這樣,Consumer 在下次從 Kafka Broker 拉取訊息的時候,又能重新拉取到這條消費失敗的訊息,并且是第一條,
同時,Spring-Kafka 使用 FailedRecordTracker 對每個 Topic 的每個 TopicPartition 消費失敗次數進行計數,這樣相當于對該 TopicPartition 的第一條消費失敗的訊息的消費失敗次數進行計數,
另外,在 FailedRecordTracker 中,會呼叫 BackOff 來進行計算,該訊息的下一次重新消費的時間,通過 Thread#sleep(...) 方法,實作重新消費的時間間隔,
注意:
FailedRecordTracker 提供的計數是客戶端級別的,重啟 JVM 應用后,計數是會丟失的,所以,如果想要計數進行持久化,需要自己重新實作下 FailedRecordTracker 類,通過 ZooKeeper 存盤計數,
SeekToCurrentErrorHandler 是只針對訊息的單條消費失敗的消費重試處理,如果想要有訊息的批量消費失敗的消費重試處理,可以使用 SeekToCurrentBatchErrorHandler ,配置方式如下
@Bean
@Primary
public BatchErrorHandler kafkaBatchErrorHandler() {
// 創建 SeekToCurrentBatchErrorHandler 物件
SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
// 創建 FixedBackOff 物件
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
batchErrorHandler.setBackOff(backOff);
// 回傳
return batchErrorHandler;
}
SeekToCurrentBatchErrorHandler 暫時不支持死信佇列的機制,
自定義邏輯處理消費例外
支持自定義 ErrorHandler 或 BatchErrorHandler 實作類,實作對消費例外的自定義的邏輯
比如 https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java
public class LoggingErrorHandler implements ErrorHandler {
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingErrorHandler.class));
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
LOGGER.error(thrownException, () -> "Error while processing: " + ObjectUtils.nullSafeToString(record));
}
}
配置方式同 SeekToCurrentErrorHandler 或 SeekToCurrentBatchErrorHandler,
生產者
package com.artisan.springkafka.producer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/2/17 22:25
* @mark: show me the code , change the world
*/
@Component
public class ArtisanProducerMock {
@Autowired
private KafkaTemplate<Object,Object> kafkaTemplate ;
public ListenableFuture<SendResult<Object, Object>> sendMsgASync() {
// 模擬發送的訊息
Integer id = new Random().nextInt(100);
MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
// 異步發送訊息
ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
return result ;
}
}
消費者
package com.artisan.springkafka.consumer;
import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/2/17 22:33
* @mark: show me the code , change the world
*/
@Component
public class ArtisanCosumerMock {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final String CONSUMER_GROUP_PREFIX = "MOCK-A" ;
@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)
public void onMessage(MessageMock messageMock){
logger.info("【接受到訊息][執行緒:{} 訊息內容:{}]", Thread.currentThread().getName(), messageMock);
// 模擬拋出一次一行
throw new RuntimeException("MOCK Handle Exception Happened");
}
}
在消費訊息時候,拋出一個 RuntimeException 例外,模擬消費失敗
單元測驗
package com.artisan.springkafka.produceTest;
import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* * @version 1.0
* @description: TODO
* @date 2021/2/17 22:40
* @mark: show me the code , change the world
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ArtisanProducerMock artisanProducerMock;
@Test
public void testAsynSend() throws ExecutionException, InterruptedException {
logger.info("開始發送");
artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info(" 發送例外{}]]", throwable);
}
@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
logger.info("回呼結果 Result = topic:[{}] , partition:[{}], offset:[{}]",
objectObjectSendResult.getRecordMetadata().topic(),
objectObjectSendResult.getRecordMetadata().partition(),
objectObjectSendResult.getRecordMetadata().offset());
}
});
// 阻塞等待,保證消費
new CountDownLatch(1).await();
}
}
測速結果
我們把這個日志來梳理一下
2021-02-18 16:18:08.032 INFO 25940 --- [ main] c.a.s.produceTest.ProduceMockTest : 開始發送
2021-02-18 16:18:08.332 INFO 25940 --- [ad | producer-1] c.a.s.produceTest.ProduceMockTest : 回呼結果 Result = topic:[C_RT_TOPIC] , partition:[0], offset:[0]
2021-02-18 16:18:08.371 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到訊息][執行緒:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 訊息內容:MessageMock{id=15, name='messageSendByAsync-15'}]
2021-02-18 16:18:18.384 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
......
......
......
2021-02-18 16:18:18.388 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到訊息][執行緒:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 訊息內容:MessageMock{id=15, name='messageSendByAsync-15'}]
2021-02-18 16:18:28.390 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
......
......
......
2021-02-18 16:18:28.394 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到訊息][執行緒:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 訊息內容:MessageMock{id=15, name='messageSendByAsync-15'}]
2021-02-18 16:18:38.395 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
......
......
......
2021-02-18 16:18:38.399 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到訊息][執行緒:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 訊息內容:MessageMock{id=15, name='messageSendByAsync-15'}]
清晰了么 老兄?
是不是和我們設定的消費重試
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
10秒 重試3次
3次處理后依然失敗,轉入死信佇列
看看資料

原始碼地址
https://github.com/yangshangwei/boot2/tree/master/springkafkaRetries
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/261050.html
標籤:其他
