文章目錄
- Kafka消費者
- 1、消費方式
- 2、基礎消費者
- 3、消費者組案例
- 4、磁區分配策略
- 5、offset的維護
- 消費offset案例
- 6、自動提交offset
- 7、重置offset
- 8、手動提交offset
- 同步提交
- 異步提交
- 9、Consumer事務
Kafka消費者
1、消費方式
- pull(拉)模式從broker中讀取資料,可以根據consumer的消費能力以適當的速率消費訊息,pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入回圈中,一直回傳空資料,針對這一點,Kafka的消費者在消費資料時會傳入一個時長引數timeout,如果當前沒有資料可供消費,consumer會等待一段時間之后再回傳,這段時長即為timeout,
- push(推)模式很難適應消費速率不同的消費者,因為訊息發送速率是由broker決定的,它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞,
2、基礎消費者
步驟:
-
創建消費者的配置物件
-
給消費者配置物件添加引數
配置kafka的地址埠
配置反序列化
配置消費者組
-
注冊主題并讓消費者訂閱主題集合
-
拉取資料列印
package com.hpu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @author zyn
* @version 1.0
* @date 2021/12/29 14:30
*/
public class MyConsumer {
public static void main(String[] args) {
//1.創建消費者的配置物件
Properties properties = new Properties();
//2.給消費者配置物件添加引數
//配置kafka的地址埠
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//配置反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//配置消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "zyn");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
//3.注冊主題
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//4.拉取資料列印
while (true) {
ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : poll) {
System.out.println(record);
}
}
}
}
3、消費者組案例
復制兩份基礎消費者的代碼,在idea中同時啟動,即可啟動同一個消費者組中的三個消費者,
啟動一個生產者往topic中發送資料,可以看到發往不同的磁區,一個磁區只會對應一個消費者,
4、磁區分配策略
多個磁區如何分配給各個消費者?
Kafka有三種分配策略:
-
RoundRobin:輪循
//消費者 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor"); //生產者 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.apache.kafka.clients.producer.RoundRobinPartitioner"); -
Range(默認):范圍劃分
//消費者 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor"); //生產者 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.apache.kafka.clients.producer.internals.DefaultPartitioner"); -
Sticky:隨機
//消費者 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor"); //生產者 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.apache.kafka.clients.producer.UniformStickyPartitioner");
當某個消費者掛掉時,前兩種需要將全部磁區重新進行輪循或者范圍劃分出磁區與對應剩余消費者的對應關系,而Sticky保持原有正常消費者的磁區不變的基礎上,將掛掉broker的磁區隨機分配給其余正常的broker,效率更高,
5、offset的維護
consumer在消費程序中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續消費,
解決方案:
Kafka 0.9版本之前,consumer默認將offset保存在Zookeeper中,從0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets,
消費offset案例
properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,“false”);
package com.hpu.kafka.offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @author zyn
* @version 1.0
* @date 2021/12/29 20:25
*/
public class MyOffset {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"offset");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("__consumer_offsets");
kafkaConsumer.subscribe(topics);
while (true){
ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
#指定消費者組
kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testTopic --describe
#全部消費者組和全部主題
kafka-consumer-groups.sh --all-groups --all-topics --describe --bootstrap-server hadoop102:9092
6、自動提交offset
KafkaConsumer:需要創建一個消費者物件,用來消費資料
ConsumerConfig:獲取所需的一系列配置引數
ConsuemrRecord:每條資料都要封裝成一個ConsumerRecord物件
為了使我們能夠專注于自己的業務邏輯,Kafka提供了自動提交offset的功能,
自動提交offset的相關引數:
enable.auto.commit:是否開啟自動提交offset功能
auto.commit.interval.ms:自動提交offset的時間間隔
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 1. 創建消費者配置類
* 2. 添加配置
* 3. 創建消費者物件
* 4. 設定消費的主題
* 5. 掛起消費資料
*/
public class CustomConsumer {
public static void main(String[] args) {
// 1. 創建kafka消費者配置類
Properties properties = new Properties();
// 2. 添加配置引數
// 添加連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必須
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 提交offset的時間周期
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//3. 創建kafka消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 設定消費主題 形參是串列
consumer.subscribe(Arrays.asList("first"));
//5. 消費資料
while (true){
// 讀取訊息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 輸出訊息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}
7、重置offset
auto.offset.reset = earliest | latest | none |
(1)earliest:自動將偏移量重置為最早的偏移量
(2)latest(默認值):自動將偏移量重置為最新偏移量
(3)none:如果未找到消費者組的先前偏移量,則向消費者拋出例外
8、手動提交offset
由于其是基于時間提交的,開發人員難以把握offset提交的時機,手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交),兩者的相同點是,都會將本次poll的一批資料最高的偏移量提交;不同點是,commitSync阻塞當前執行緒,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而commitAsync則沒有失敗重試機制,故有可能提交失敗,
同步提交
將自動提交關閉:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
開啟同步提交:
consumer.commitSync();
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 1. 修改自動提交offset為手動
* 2. 在業務代碼完成之后手動同步提交offset
*/
public class CustomConsumerByHand {
public static void main(String[] args) {
// 1. 創建kafka消費者配置類
Properties properties = new Properties();
// 2. 添加配置引數
// 添加連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必須
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 提交offset的時間周期
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//3. 創建kafka消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 設定消費主題 形參是串列
consumer.subscribe(Arrays.asList("first"));
//5. 消費資料
while (true){
// 讀取訊息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 輸出訊息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交offset
consumer.commitSync();
}
}
}
異步提交
同步提交offset更可靠一些,但是由于其會阻塞當前執行緒,直到提交成功,因此吞吐量會受到很大的影響,因此更多的情況下,會選用異步提交offset的方式,
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
* 1. 修改自動提交offset為手動
* 2. 在業務代碼完成之后手動異步提交offset
*/
public class CustomConsumerByHand {
public static void main(String[] args) {
// 1. 創建kafka消費者配置類
Properties properties = new Properties();
// 2. 添加配置引數
// 添加連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必須
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 提交offset的時間周期
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//3. 創建kafka消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 設定消費主題 形參是串列
consumer.subscribe(Arrays.asList("first"));
//5. 消費資料
while (true){
// 讀取訊息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 輸出訊息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交offset
//consumer.commitSync();
// 異步提交offset
consumer.commitAsync(new OffsetCommitCallback() {
/**
* 回呼函式輸出
* @param offsets offset資訊
* @param exception 例外
*/
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// 如果出現例外列印
if (exception != null){
System.err.println("Commit failed for " + offsets);
}
else{
Set<TopicPartition> topicPartitions = offsets.keySet();//該方法可以得到我們消費的的訊息 所處的topic partition 有哪些
for (TopicPartition topicPartition : topicPartitions) {//遍歷我們消費的topic 以及parition 元資料
OffsetAndMetadata offsetAndMetadata = offsets.get(topicPartition);//每個topic 的每個parition 的消費到的offset
long offset = offsetAndMetadata.offset();//獲取提交的offset值
int partition = topicPartition.partition();//獲取該parttion的值
String topic = topicPartition.topic();//獲取該toipic的值
System.out.printf("----提交的offset = %s, 該 partition = %s ,以及topic = %s---------\n",
offset,partition,topic);
}
}
}
});
}
}
}
9、Consumer事務
無論是同步提交還是異步提交offset,都有可能會造成資料的漏消費或者重復消費,先提交offset后消費,有可能造成資料的漏消費;而先消費后提交offset,有可能會造成資料的重復消費,
如果想完成Consumer端的精準一次性消費,那么需要kafka消費端將消費程序和提交offset程序做原子系結,要么都成功,要么都失敗,此時我們需要將kafka的offset保存到支持事務的自定義介質(比如mysql),
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/398639.html
標籤:其他
