以下測驗皆在windows下進行,請根據自己情況酌情配置kafka zookeeper等環境
本人使用的是jdk11,代碼中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友請自行轉換
kafka環境變數等暫時略過
1.java匯入依賴
<!--匯入kafka依賴-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
kafka Producer
匯入相關依賴后,創建測驗類ProducerDemo;
-
創建生產者物件
使用KafkaProducer 創建kafka生產者物件,這時可以發現kafka不允許我們使用空構造來創建物件;
那么我們就選用傳入properties的方式創建kafka生產者
創建生產者的時候,跟控制臺命令一樣,我們需要指定集群名稱以及序列化器,而這些相關設定都會存盤在我們的組態檔中;
kafka給我們提供了ProducerConfig類,并在其中已經給我們提前準備好了我們所需要的key,在向properties中put鍵值時,可以直接使用producerConfig的靜態常量作為key;并傳入相應value
-
向kafka中發送資訊
使用kafkaProducer向kafka中發送資訊,可以使用其提供的send()方法 ;使用時可以看到其需要傳入ProducerRecord以及一個可選的Callback
ProducerRecord: 即為每條資料所封裝成的物件
CallBack:可選;獲取函式的回呼
-
close()
在真實生產環境中,我們可能不需要手動呼叫close方法關閉kafkaProducer,但是目前的測驗階段,如果不使用close關閉,可能會導致發送的資訊在設定等待的時間內,不會被真正的發送;
流在關閉的時候會對資料進行回收操作
/**
* 描述:kafkaProducer生產者
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/10 23:14 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ProducerPartitionerDemo01 {
public static void main(String[] args) {
Properties properties = new Properties();
//自行修改為對應的集群地址 kafka默認為9092,此處我沒有更改
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//需要傳入序列化器的全類名,kafka需要通過反射全類名去獲取序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
?
KafkaProducer kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
//使用callBack收集回呼資訊,使用了lamdba運算式
kafkaProducer.send(new ProducerRecord("此處使用自己存在的主題","value"),((metadata, exception) ->{
if(exception==null){
System.out.println("沒有錯誤,資料添加成功");
}
} ));
}
//關閉
kafkaProducer.close();
}
}
自定義磁區器
如果想要自己根據業務需求撰寫自定義的磁區規則,可以自定義磁區器;
說到自定義,就勢必需要去實作某個介面或者繼承某個類
這里, 我們需要實作的是kafka給我們的Partitioner介面,實作后重寫方法
/**
* 描述: 自定義磁區器
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/10 22:24 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class MyPartitioner implements Partitioner {
?
/**
* 撰寫磁區規則
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//根據業務需求撰寫磁區規則
return 0;
}
?
@Override
public void close() {
?
}
?
/**
* 讀取配置資訊
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
?
}
}
在撰寫規則時可以參考Kafka對Partitioner的默認實作 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
//如果key也不存在,則會對可用磁區進行輪詢
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//如果沒有指定磁區,且存在key值,則會根據key的hash進行取模來選擇磁區
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
實作同步發送
正常情況下kafka生產者發送資訊采用的是異步發送的方式,主執行緒將資訊發送給共享變數 RecordAccumulator ,Sender執行緒不同的從共享變數中拉取資料發送到broker上;
實作邏輯
在兩個執行緒其中一個執行的時候去阻塞另一個執行緒,實作串行
我們可以發現kafkaProducer的send()方法是存在回傳值 Future 的;
而我們知道,當future物件呼叫get()方法時,不僅會獲得當前執行緒回呼的物件,還會阻塞當前執行緒
我們便使用這個方法來實作同步發送
同步發送的使用場景相對較少,我們可以使用同步發送來確保區內有序,即當上一條資訊發送后,未接收到ack之前,阻塞發送執行緒,不繼續發送,從而實作有序
消費者API
撰寫消費者api的邏輯與生產者十分的相像,使用kafka提供的 KafkaConsumer 來創建消費者物件
并在組態檔中傳遞對應資訊,可以使用ConsumerConfig中的靜態屬性充當key值
/**
* 描述:kafka消費者
*
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/11 0:21 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerTest {
?
public static void main(String[] args) {
Properties properties = new Properties();
//連接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//生產者需要指定序列化器,那么消費者就需要指定對應的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
?
//自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自動提交的延遲,提交的是消費者的offset
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
?
//消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
?
//創建消費者
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
//訂閱主題
//此處可以添加多個集群
//可以看到這里沒有回傳值,也就是說,這里只是單純的指定了主題,如果想獲取主題中的資訊,需要使用別的方法
kafkaConsumer.subscribe(List.of("你的主題"));
?
?
while (true){
//獲取的型別與Producer類似,不過為ConsumerRecords類,想要得到單個資料,需要遍歷輸出
//新版本建議使用傳入Duration的方式,直接傳入毫秒數的方式以過時
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
consumerRecords.forEach(stringStringConsumerRecord -> {
//可以看到,使用consumerRecord去呼叫方法的時候,可以獲取到Key,所以key并不只是用來劃分磁區之用,如果沒有指定key,會輸出null
System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+" :"+stringStringConsumerRecord.offset());
});
}
//consumer進行訂閱拉去資訊的時候不需要手動關閉,因為順序執行完畢后,jvm會關閉;所以可以使用一個while回圈來持續消費
}
}
啟動消費者,會發現我們可以連接到對應的主題,但是不會獲取到先前已經存在的資訊;
在我們使用控制臺呼叫消費者時,如果我們想獲取該磁區已經存在的資訊,我們可以使用 --from-beginning指令將offset放到最前端從頭獲取;
java api中也是一樣;
kafka中 命令列能做的事情,在組態檔中應該都有相關的配置
我們進入ConsumerConfig,可以查看到其已經給我們提供了 AUTO_OFFSET_RESET_CONFIG這個屬性;
根據下方的doc描述,該屬性默認值為lastest,這也是我們為什么在不設定的時候會無法獲取已存在資訊的原因,我們可以手動在組態檔中傳入
earierlast
此處注意這個指定的生效條件:
-
只有當當前消費者/消費者組第一次消費(即還沒有offset時),或當前的offset在這個server中不存在時,指令才會生效
這里解釋一下為什么會不存在,kafka的資料默認時7天清空一次,如果我們拿著已經清空的資料的offset去尋找資料,就會出現offset在server中不存在的現象,此時AUTO_OFFSET_RESET_CONFIG就會生效
關于offset的手動提交
我們為什么需要手動提交? 自動提交無法保證準確的提交時機
如果設定的提交延時過短,會丟是資料
如果設定的延時過長,會導致資料重復
1.在組態檔中關閉自動提交
既然我們需要手動提交,則必然需要在組態檔中將自動提交置為false
ENABLE_AUTO_COMMIT_CONFIG,<----將它改成false
-
在消費結束后進行手動提交
使用consumer的 commitSync() 同步提交,或commitAsync() 異步提交
-
commitSyn:
相比于異步提交,因為其提交offset時自帶失敗重試的機制,相對更加可靠
-
commitAsync:
同步提交相對可靠,但是會阻塞當先執行緒,影響吞吐量;
在大多數情況下,我們會選用異步提交的方式
-
自定義存盤offset
手動提交雖然可以解決丟是資料的問題,但是仍然會存在資料重復的現象;
kafka也早已考慮到這種情況,所以允許我們自定義存盤offset的規則,(比如我們可以和MySQL的寫入操作進行事務系結...)
但是相對于自定義磁區器,自定義存盤offset要相對麻煩一些;在0.9版本之后,kafka會將offset暫存在kafka內置的一個主題中,想要去維護一個offset,就需要考慮到消費者的Rebalance問題
即,如果當前消費者所消費的磁區掛掉了,消費者需要轉移到另一磁區去消費,此時的offset需要定位到這個磁區最近提交的offset
為此,我們需要實作kafka提供的ConsumerRebalanceListener
/**
* 描述: 自定義存盤Offset
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/11 23:26 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerConfigOffset {
//創建一個Map在暫存當前offset
private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
public static void main(String[] args) {
//組態檔較為冗長,我寫了個工具類進行配置,相關配置內容已經提到過,就不再贅述
PropertiesUtils propertiesUtils = new PropertiesUtils();
Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
?
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
?
//在此處創建ConsumerRebalanceListener類
kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
//在Rebalance之前呼叫
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
?
//在Rebalance之后呼叫
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
partitions.forEach(partition -> {
//定位到磁區中最近的offset,繼續消費
kafkaConsumer.seek(partition,getOffset(partition));
});
}
});
?
while (true){
ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
poll.forEach(consumerRecord ->{
System.out.printf("offset = %d %n",consumerRecord.offset());
System.out.printf("key = %s %n",consumerRecord.key());
System.out.printf("value = %s %n",consumerRecord.value());
?
//將下標快取到offset中
currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
});
commitOffset(currentOffset);
}
}
?
/**
* 提交當前offset
* @param currentOffset
*/
private static void commitOffset(Map<TopicPartition , Long> currentOffset){
//處理異步提交的業務邏輯
}
?
//獲取當前磁區的offset
private static long getOffset(TopicPartition partition){
return 0;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293899.html
標籤:其他
上一篇:Flink 內核原理與實作-入門
