Flink簡單使用教程
一、基礎
1.1 環境配置
在pom.xml引入flink的相關依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
如果已經引入了Kafka的依賴,為了避免flink和Kafka使用的scala版本不同導致的錯誤,需要在Kafka的依賴中排除掉對scala的依賴:
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
1.2 基礎概念
Flink是一個流批一體的資料處理平臺,
自然環境中,資料的產生是流式的,無論是來自 Web 服務器的事件資料、證券交易所的交易資料,還是來自車間機器上的傳感資料,其資料都是流式的,
分析資料時,可以圍繞 有界流(bounded 或 無界流(unbounded) 兩種模型來處理資料,
批處理針對的是有界資料流,可以在計算結果輸出之前輸入整個資料集,因此,可以對整個資料集的資料進行排序、統計或匯總計算后再輸出結果,
流處理針對的是無界資料流,理論上來說輸入永遠不會結束,因此,程式必須持續不斷地對到達的資料進行處理,
1.3 什么能被轉換成流?
Flink 的DataStream API 可以將任何可序列化的物件轉化為流,Flink 自帶的序列化器有:
- 基本型別,即 String、Long、Integer、Boolean、Array
- 復合型別:Tuples、POJOs 和 Scala case classes
Tuple就是一個元組,例如Tuple2就是一個二元組,如下是一個有兩個屬性的二元組實體:

1.4 DataStream
簡單的來說就是需要處理的資料源,DataStream是flink程式中必不可少的一個類,該類用于表示資料集合,可以是有界(有限)的,也可以是無界(無限)的,但用于處理它們的API是相同的,
在用法上類似于常規的 Java集合,一旦它們被創建就不能添加或洗掉元素,也不能簡單地察看內部元素,只能使用 DataStream API 操作來處理它們,這種通過DataStream API的操作也叫作轉換(transformation),
1.5 Flink程式基本構成
Flink 程式看起來像一個轉換 DataStream 的常規程式,每個程式由相同的基本部分組成:
- 獲取一個執行環境(StreamExecutionEnvironment);
- 加載/創建初始資料;
- 指定資料相關的轉換;
- 指定計算結果的存盤位置;
- 觸發程式執行(Execute),
StreamExecutionEnvironment 是所有 Flink 程式的基礎,可以使用 StreamExecutionEnvironment 的靜態方法獲取 StreamExecutionEnvironment:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,只需要使用 getExecutionEnvironment() ,該方法會根據背景關系做正確的處理:如果你在 IDE 中執行你的程式或將其作為一般的 Java 程式執行,那么它將創建一個本地環境,該環境將在你的本地機器上執行你的程式,
如果你基于程式創建了一個 JAR 檔案,并通過命令列運行它,Flink 集群管理器將執行程式的 main 方法,回傳一個執行環境以在集群上執行你的程式,
1.6 Data Source
Source 是程式讀取其輸入的地方,通過 StreamExecutionEnvironment`可以訪問多種預定義的 stream source:
基于檔案:
readTextFile(path)- 讀取文本檔案,逐行讀取并將它們作為字串回傳,readFile(fileInputFormat, path)- 按照指定的檔案輸入格式讀取(一次)檔案,
基于套接字:
socketTextStream- 從套接字讀取,
基于集合:
fromCollection(Collection)- 從 Java Java.util.Collection 創建資料流,集合中的所有元素必須屬于同一型別,fromCollection(Iterator, Class)- 從迭代器創建資料流,class 引數指定迭代器回傳元素的資料型別,fromElements(T ...)- 從給定的物件序列中創建資料流,所有的物件必須屬于同一型別,fromParallelCollection(SplittableIterator, Class)- 從迭代器并行創建資料流,class 引數指定迭代器回傳元素的資料型別,generateSequence(from, to)- 基于給定間隔內的數字序列并行生成資料流,
自定義:
addSource- 關聯一個新的 source function,例如,你可以使用addSource(new FlinkKafkaConsumer<>(...))來從 Apache Kafka 獲取資料,
1.7 Data Sick
Data sinks 使用 DataStream 并將它們轉發到檔案、套接字、外部系統或列印它們,
Flink 自帶了多種內置的輸出格式,這些格式相關的實作封裝在 DataStreams 的算子里:
writeAsText()- 將元素按行寫成字串,writeAsCsv(...)- 將元組寫成逗號分隔值檔案,print()- 在標準輸出流上列印每個元素的 toString() 值,writeUsingOutputFormat()- 自定義檔案輸出的方法和基類,writeToSocket- 根據SerializationSchema將元素寫入套接字,addSink- 呼叫自定義 sink function,Flink 捆綁了連接到其他系統(例如 Apache Kafka)的連接器,這些連接器被實作為 sink functions,
二、流處理實體
2.1 通過年齡過濾出成年人和未成年人
public class Example {
public static void main(String[] args) throws Exception {
//1.創建flink的執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.創建資料源
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
//3.對資料進行過濾
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
//4.資料的處理方式(輸出控制臺)
adults.print();
//5.執行flink任務
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
2.2 統計單詞個數
基于流視窗的單詞統計應用程式,計算 5 秒視窗內來自 Web 套接字的單詞數
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.通過socket創建資料源,并進行過濾
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())//呼叫Splitter類按規則過濾輸入字串,回傳的是一個個二元組(單詞,單詞出現次數)
.keyBy(value -> value.f0) //按照單詞進行排序
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//限定視窗時間
.sum(1);//1表示按照第2個位置的數字進行求和,即計算單詞出現的次數
//3.輸出處理結果
//需要注意的是,輸出結果是源源不斷的程序,首先env.execute執行,然后只要flink接收到資料,就會呼叫注釋2.xx和3.xx的代 碼,過濾資料,輸出結果,中間這一部分代碼會一直執行
dataStream.print();
//4.執行程式
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
//通過空格對輸入進行分組,每一個單詞的屬性值(出現次數)均為1
//例如輸入:a b c a b,輸出為:a 2,b 2,c 1(a出現2次,b出現2次,c出現1次)
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要運行示例程式,首先從終端使用 netcat (要下載相關軟體)啟動輸入流:
nc -lk 9999
只需輸入一些單詞,然后按回車鍵即可傳入新單詞,
三、批處理實體
public class WordCount {
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.通過文本檔案(有界資料)創建資料源
DataSource<String> dataSource = env.readTextFile("C:\\...")
//3.處理輸入
DataSet<Tuple2<String,Integer>> dataSet = dataSource
.flatMap(new Splitter())//呼叫Splitter類按規則過濾輸入字串,回傳的是一個個二元組(單詞,單詞出現次數)
.groupBy(0) //0表示按照第1個位置的單詞進行排序
.sum(1);//1表示按照第2個位置的數字進行求和,即計算單詞出現的次數
//4.將處理結果寫出到檔案
dataSet.writeAsCsv("C:\\...");
//5.執行程式
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
//通過空格對輸入進行分組,每一個單詞的屬性值(出現次數)均為1
//例如輸入:a b c a b,輸出為:a 2,b 2,c 1(a出現2次,b出現2次,c出現1次)
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
四、flink讀寫Kafka資料
測驗時,首先運行消費者程式,再運行生產者
4.1 創建消費者類FlinkConsumer
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.組態檔
Properties props = new Properties();
props.put("bootstrap.servers","Kafka集群地址");
//3.構造消費者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
//4.配置消費者
DataStreamSource stream = env.addSource(consumer);
//5.data sick
stream.print();
//6.執行程式
env.execute("消費者程式");
}
}
4.2 創建生產者類FlinkProducer
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.組態檔
Properties props = new Properties();
props.put("bootstrap.servers","Kafka集群地址");
//3.構造生產者
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), props);
//4.配置資料源和生產者
env.fromElement("hello", "flink", "kafka").addSink(producer);
//5.執行程式
env.execute("生產者程式");
}
}
4.3 概念
Flink 的 Kafka consumer 稱為 FlinkKafkaConsumer,它提供對一個或多個 Kafka topics 的訪問,
建構式接受以下引數:
- Topic 名稱或者名稱串列
- 用于反序列化 Kafka 資料的 DeserializationSchema 或者 KafkaDeserializationSchema
- Kafka 消費者的屬性,需要以下屬性:
- “bootstrap.servers”(以逗號分隔的 Kafka broker 串列)
- “group.id” 消費組 ID
Flink Kafka Producer 被稱為 FlinkKafkaProducer,它允許將訊息流寫入一個或多個 Kafka topic,
構造器接收下列引數:
- 事件被寫入的默認輸出 topic
- 序列化資料寫入 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka client 的 Properties,下列 property 是必須的:
- “bootstrap.servers” (逗號分隔 Kafka broker 串列)
- 容錯語意
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/431486.html
標籤:其他
