一 重復消費
1.1 原因
- 強行kill執行緒,導致消費后的資料,offset沒有提交(消費系統宕機、重啟等)
- 網路波動,導致offset沒提交
- 當消費者消費的速度很慢的時候,可能在一個session周期內還未完成,導致心跳機制檢測報告出問題
- 消費后的資料,當offset還沒有提交時,partition就斷開連接
- 最根本的原因是消費之后的offset未提交
1.2 解決方法
- 第一種思路是提高消費能力,提高單條訊息的處理速度,例如對訊息處理中比 較耗時的步驟可通過異步的方式進行處理、利用多執行緒處理等,在縮短單條訊息消費時常的同時,根據實際場景可將max.poll.interval.ms值設定大一點,避免不 必要的rebalance,此外可適當減小max.poll.records的值,默認值是500,可根 據實際訊息速率適當調小,這種思路可解決因消費時間過長導致的重復消費問題, 對代碼改動較小,但無法絕對避免重復消費問題,
- 第二種思路是引入單獨去重機制,例如生成訊息時,在訊息中加入唯一識別符號如訊息id等,在消費端,我們可以保存最近的1000條訊息id到redis或mysql表中,配置max.poll.records的值小于1000,在消費訊息時先通過前置表去重后再進行訊息的處理,
1.3 重復消費問題
- 模擬網路問題
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author shu
* @Date: 2021/10/28/ 19:55
* @Description
**/
@Component
public class KafkaTest {
//topic
private final static String TOPIC_NAME = "my-replicated-topic";
//程式執行的初始時間,只會保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//時間轉換
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//快取
private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Autowired
private KafkaTemplate<Object,Object> kafkaTemplate;
/**
* 訊息接受者(每隔1分鐘執行)
*/
@Scheduled(cron = "0 */1 * * * ?")
public void Consumer() {
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (60 * 1000)){
System.out.println(DataList);
for (ConsumerRecord<String, String> consumerRecord : DataList) {
MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
System.out.println("訊息:"+info);
}
DataList.clear();
}
}
/**
* 訊息發送者(30s執行一次)
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void Provide(){
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (30 * 1000) ){
MsgInfo msgInfo=new MsgInfo(current-last,"訊息服務",last,new Date());
kafkaTemplate.send(TOPIC_NAME,"power",gson.toJson(msgInfo));
}
}
/**
* 單條消費
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
//?動提交offset
//ack.acknowledge();
}
}
- 觀察結果


- 恢復網路,正常通信
/**
* 單條消費
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
//?動提交offset
ack.acknowledge();
}
- 觀察結果


- 對比


結論:造成了重復消費
1.4 解決
package com.demo.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author shu
* @Date: 2021/10/29/ 19:34
* @Description redisson配置
**/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://ip:6379")
.setPassword("admin123");
return Redisson.create(config);
}
}
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.demo.demo.utils.RedisUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author shu
* @Date: 2021/10/29/ 19:17
* @Description 分布式鎖解決kafka消費重復的問題
**/
@Component
public class RedissonKafka {
private final static String TOPIC_NAME = "my-replicated-topic";
//程式執行的初始時間,只會保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//時間轉換
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//前綴
private static final String KEY_PREFIX = "key";
//快取
private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Resource
private KafkaTemplate<Object,Object> kafkaTemplate;
//redisson
@Resource
private RedissonClient redissonClient;
/**
* 訊息接受者(每隔30分鐘執行)
*/
@Scheduled(cron = "0/35 * * * * ?")
public void Consumer() {
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (60 * 1000)){
//初始化
redissonClient.getBucket("key").set(1);
for (ConsumerRecord<String, String> consumerRecord : DataList) {
//獲取鎖
RLock lock = redissonClient.getLock(redissonClient.getBucket(consumerRecord.key()).get().toString());
//上鎖
lock.lock();
MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
System.out.println("訊息:"+info);
redissonClient.getBucket(consumerRecord.key()).set(consumerRecord.offset()+1);
//解鎖
lock.unlock();
}
DataList.clear();
}
}
/**
* 訊息發送者(30s執行一次)
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void Provide(){
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (30 * 1000) ){
MsgInfo msgInfo=new MsgInfo(current-last,"訊息服務",last,new Date());
kafkaTemplate.send(TOPIC_NAME,"key",gson.toJson(msgInfo));
}
}
/**
* 單條消費
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
ack.acknowledge();
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/342151.html
標籤:其他
上一篇:Kafaka基礎快速入門
下一篇:kafka之訊息生成者基本知識
