
本文基于Flink1.9版本簡述如何連接Kafka,
流式連接器

我們知道可以自己來開發Source 和 Sink ,但是一些比較基本的 Source 和 Sink 已經內置在 Flink 里,
預定義的source支持從檔案、目錄、socket,以及 collections 和 iterators 中讀取資料,
預定義的sink支持把資料寫入檔案、標準輸出(stdout)、標準錯誤輸出(stderr)和 socket,
連接器可以和多種多樣的第三方系統進行互動,目前支持以下系統:
- Apache Kafka
- Apache Cassandra(sink)
- Amazon Kinesis Streams(source/sink)
- Elasticsearch(sink)
- Hadoop FileSystem (sink)
- RabbitMQ(source/sink)
- Apache NiFi(source/sink)
- Twitter Streaming API(source)
請記住,在使用一種連接器時,通常需要額外的第三方組件,比如:資料存盤服務器或者訊息佇列,
Apache Bahir 中定義了其他一些連接器
- Apache ActiveMQ(source/sink)
- Apache Flume(sink)
- Redis(sink)
- Akka (sink)
- Netty (source)
使用connector并不是唯一可以使資料進入或者流出Flink的方式,一種常見的模式是從外部資料庫或者 Web 服務查詢資料得到初始資料流,然后通過 Map 或者 FlatMap 對初始資料流進行豐富和增強,這里要使用Flink的異步IO,
而向外部存盤推送大量資料時會導致 I/O 瓶頸問題出現,在這種場景下,如果對資料的讀操作遠少于寫操作,可以讓外部應用從 Flink 拉取所需的資料,需要用到Flink的可查詢狀態介面,
本文重點介紹Apache Kafka Connector
Kafka連接器
此連接器提供對Apache Kafka提供的事件流的訪問,
Flink提供特殊的Kafka連接器,用于從/向Kafka主題讀取和寫入資料,Flink Kafka Consumer集成了Flink的檢查點機制,可提供一次性處理語意,為實作這一目標,Flink并不完全依賴Kafka 的消費者組的偏移量,而是在內部跟蹤和檢查這些偏移,
下表為不同版本的kafka與Flink Kafka Consumer的對應關系,
| Maven Dependency | Supported since | Consumer and Producer Class name | Kafka version |
|---|---|---|---|
| flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x |
| flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x |
| flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x |
| flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x |
| flink-connector-kafka_2.11 | 1.7.0 | FlinkKafkaConsumer FlinkKafkaProducer | >= 1.0.0 |
而從最新的Flink1.9.0版本開始,使用Kafka 2.2.0客戶端,
下面簡述使用步驟,
匯入maven依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>
安裝Kafka:
可以參照 Kafka入門寶典(詳細截圖版)
兼容性:
從Flink 1.7開始,它不跟蹤特定的Kafka主要版本,相反,它在Flink發布時跟蹤最新版本的Kafka,如果您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka連接器,如果使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的連接器,
升級Connect要注意Flink升級作業,同時
-
在整個程序中使用Flink 1.9或更新版本,
-
不要同時升級Flink和運營商,
-
確保您作業中使用的Kafka Consumer和/或Kafka Producer分配了唯一識別符號(
uid), -
使用stop with savepoint功能獲取保存點(例如,使用
stop --withSavepoint),
用法:
引入依賴后,實體化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer),
Kafka Consumer
先分步驟介紹構建程序,文末附Flink1.9連接Kafka完整代碼,
Kafka consumer 根據版本分別叫做FlinkKafkaConsumer08 FlinkKafkaConsumer09等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer,
構建FlinkKafkaConsumer
java示例代碼如下:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
scala:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
.print()
必須有的:
1.topic名稱
2.用于反序列化Kafka資料的DeserializationSchema / KafkaDeserializationSchema
3.配置引數:“bootstrap.servers” “group.id” (kafka0.8還需要 “zookeeper.connect”)
配置消費起始位置
java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour
//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<String> stream = env.addSource(myConsumer);
scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
val stream = env.addSource(myConsumer)
檢查點
啟用Flink的檢查點后,Flink Kafka Consumer將使用主題中的記錄,并以一致的方式定期檢查其所有Kafka偏移以及其他操作的狀態,如果作業失敗,Flink會將流式程式恢復到最新檢查點的狀態,并從存盤在檢查點中的偏移量開始重新使用Kafka的記錄,
如果禁用了檢查點,則Flink Kafka Consumer依賴于內部使用的Kafka客戶端的自動定期偏移提交功能,
如果啟用了檢查點,則Flink Kafka Consumer將在檢查點完成時提交存盤在檢查點狀態中的偏移量,
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
磁區發現
Flink Kafka Consumer支持發現動態創建的Kafka磁區,并使用一次性保證消費它們,
還可以使用正則:
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
...
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
...
時間戳和水印
在許多情況下,記錄的時間戳(顯式或隱式)嵌入記錄本身,另外,用戶可能想要周期性地或以不規則的方式發出水印,
我們可以定義好Timestamp Extractors / Watermark Emitters,通過以下方式將其傳遞給您的消費者:
java
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
scala
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
.addSource(myConsumer)
.print()
Kafka Producer
Kafka Producer 根據版本分別叫做FlinkProducer011 FlinkKafkaProducer010等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer ,
構建FlinkKafkaConsumer
java
DataStream<String> stream = ...;
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);
scala
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)
stream.addSink(myProducer)
需要指定broker list , topic,序列化類,
自定義磁區:默認情況下,將使用FlinkFixedPartitioner將每個Flink Kafka Producer并行子任務映射到單個Kafka磁區,
可以實作FlinkKafkaPartitioner類自定義磁區,
Flink1.9消費Kafka完整代碼:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
//構建FlinkKafkaConsumer
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
//指定偏移量
myConsumer.setStartFromEarliest();
DataStream<String> stream = env
.addSource(myConsumer);
env.enableCheckpointing(5000);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
}
專案地址:https://github.com/tree1123/flink_demo_1.9
更多Flink知識,歡迎關注實時流式計算

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/52376.html
標籤:大數據
上一篇:[大資料學習研究] 4. Zookeeper-分布式服務的協同管理神器
下一篇:安裝虛擬機和網路配置
