1、到官網查詢所在版本的依賴,匯入pom.xml(在此用Flink1.13) 官網->教程->connectors->datastream->kafka
網址:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
2.在此頁面找到Kafka source 示例代碼,將此代碼填充至類中并將其具體引數修改即可
//如果方法在回傳值的位置宣告了泛型,此時在呼叫這個方法時,需要在方法名前補充泛型KafkaSource.<String>builder()
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092") //集群地址,寫一個也行,多個也行
.setTopics("first")//消費的主題
.setGroupId("my-group")//消費者組id
/*設定起始偏移量有以下幾種情況:
1.從指定的位置消費:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.從最新位置消費(最后一條處):OffsetsInitializer.latest()
3.從最早位置消費(第一條處):OffsetsInitializer.earliest()
4.從上次提交的位置消費:OffsetsInitializer.committedOffsets()
5.新的組,從來沒有提交過,再指定一個消費方式: OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//設定起始偏移量,也就是從哪里消費
//由于大多數情況下key列沒有值,所以只設定value的反序列化器即可
.setValueOnlyDeserializer(new SimpleStringSchema()) //消費必須設定的Key-value的反序列化器
.build();
//用設定好的組件獲取source
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
需注意!
1、flink的kafkaSource默認是把消費的offsets提交到當前Task的狀態中,并不會主動提交到kafka的——consumer_offsets中
所以,上述代碼無論運行多少次消費的都是一樣的內容,想要達到這次消費起始位置是上次消費的最后一條的情況
需要手動設定,把offsets提交到kafka一份
//設定額外的消費者引數
.setProperty("enable.auto.commit","true")//允許consumer自動提交offsets
.setProperty("auto.commit.interval.ms","1000")//每次提交的時間間隔
2、Job重啟時,如果開啟了Checjpoint,默認從哪Checkpoint中獲取之前提交的offsets
獲取不到時,才會從kafka的_consumer_offsets中獲取
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/518954.html
標籤:其他
下一篇:Python第七章實驗報告
