1.依賴配置
1.1 pom檔案
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<!--需要設定scala版本因為flink也參考了scala的一些東西-->
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相關依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相關依賴-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
1.2 日志檔案
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.撰寫代碼
2.1 在根目錄下創建資料

2.2 書寫批處理執行代碼
public static void main(String[] args) throws Exception {
// 1. 創建執行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 從檔案讀取資料 按行讀取(存盤的元素就是每行的文本)
DataSource<String> lineDs = env.readTextFile("input/word.txt");
// 3. 轉換資料格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =
lineDs.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
{
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
});
// 4.防止泛型擦除
FlatMapOperator<String, Tuple2<String, Long>> returns =
wordAndOne.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 5. 按照 word 進行分組
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUg = wordAndOne.groupBy(0);
// 6. 分組內聚合統計
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUg.sum(1);
// 7. 列印結果
sum.print();
// 結果
// (flink,1)
// (world,1)
// (hello,3)
// (java,1)
}
代碼說明和注意事項:
① Flink 在執行應用程式前應該獲取執行環境物件,也就是運行時背景關系環境,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();② Flink 同時提供了 Java 和 Scala 兩種語言的 API,有些類在兩套 API 中名稱是一樣的,所以在引入包時,如果有 Java 和 Scala 兩種選擇,要注意選用 Java 的包,
③ 直接呼叫執行環境的 readTextFile 方法,可以從檔案中讀取資料,
④ 我們的目標是將每個單詞對應的個數統計出來,所以呼叫 flatmap 方法可以對一行文字進行分詞轉換,將檔案中每一行文字拆分成單詞后,要轉換成(word,count)形式的二元組,初始 count 都為 1,returns 方法指定的回傳資料型別 Tuple2,就是 Flink 自帶的二元組資料型別,
⑤ 在分組時呼叫了 groupBy 方法,它不能使用分組選擇器,只能采用位置索引或屬性名稱進行分組,
需要注意的是,這種代碼的實作方式,是基于 DataSet API 的,也就是我們對資料的處理轉換,是看作資料集來進行操作的,事實上 Flink 本身是流批統一的處理架構,批量的資料集本質上也是流,沒有必要用兩套不同的 API 來實作,所以從 Flink 1.12 開始,官方推薦的做法是直接使用 DataStream API,在提交任務時通過將執行模式設為 BATCH 來進行批處理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
這樣,DataSet API 就已經處于“軟棄用”(soft deprecated)的狀態,在實際應用中我們只要維護一套 DataStream API 就可以了,這里只是為了方便大家理解,我們依然用 DataSet API做了批處理的實作,
2.3 書寫流處理執行代碼(有界)
public static void main(String[] args) throws Exception {
// 1. 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取檔案
DataStreamSource<String> lineDss = env.readTextFile("input/word.txt");
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne =
lineDss.flatMap((String line, Collector<String> out) ->
{
Arrays.stream(line.split(" ")).forEach(out::collect);
}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKs.sum(1);
// 6. 列印
result.print();
// 7. 執行
env.execute();
}
① 主要觀察與批處理程式 BatchWordCount 的不同:
② 創建執行環境的不同,流處理程式使用的是 StreamExecutionEnvironment,
③ 每一步處理轉換之后,得到的資料物件型別不同,
④ 分組操作呼叫的是 keyBy 方法,可以傳入一個匿名函式作為鍵選擇器(KeySelector),指定當前分組的 key 是什么,
⑤ 代碼末尾需要呼叫 env 的 execute 方法,開始執行任務,
- 輸出結果
3> (java,1)
9> (world,1)
5> (hello,1)
5> (hello,2)
13> (flink,1)
5> (hello,3)
我們可以看到,這與批處理的結果是完全不同的,批處理針對每個單詞,只會輸出一個最終的統計個數;而在流處理的列印結果中,“hello”這個單詞每出現一次,都會有一個頻次統計資料輸出,這就是流處理的特點,資料逐個處理,每來一條資料就會處理輸出一次,我們通過列印結果,可以清晰地看到單詞“hello”數量增長的程序,
看到這里大家可能又會有新的疑惑:我們讀取檔案,第一行應該是“hello flink”,怎么這里輸出的第一個單詞是“world”呢?每個輸出的結果二元組,前面都有一個數字,這又是什么呢?
我們可以先做個簡單的解釋,Flink 是一個分布式處理引擎,所以我們的程式應該也是分布式運行的,在開發環境里,會通過多執行緒來模擬 Flink 集群運行,所以這里結果前的數字,其實就指示了本地執行的不同執行緒,對應著 Flink 運行時不同的并行資源,這樣第一個亂序的問題也就解決了:既然是并行執行,不同執行緒的輸出結果,自然也就無法保持輸入的順序了,另外需要說明,這里顯示的編號為 1~13,是由于運行電腦的 CPU 的核心數來決定的,我自己的是16核的,所以默認模擬的并行執行緒有 16 個,這段代碼不同的運行環境,得到的結果會是不同的,關于 Flink 程式并行執行的數量,可以通過設定“并行度”(Parallelism)來進行配置,我們會在后續詳細講解這些內容,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/447003.html
標籤:Java
上一篇:Hibernate 學習筆記
