目錄
- 一、DataStream API概述
- 二、什么是DataStream ?
- 三、DataStream 資料處理程序
- 1)Data Sources(資料源)
- 1、Data Sources 原理
- 2、Data Sources 實作方式
- 1)基于檔案
- 2)基于套接字
- 3)基于集合
- 4)自定義
- 2)DataStream Transformations(資料流轉換//處理/算子)
- 1、資料流轉換
- 2、物理磁區
- 3、算子鏈和資源組
- 3)Data Sinks(資料輸出)
- 旁路輸出(分流)
- 2)Flink 程式剖析(scala)
- 1、 獲取一個執行環境(execution environment)
- 2、加載/創建初始資料
- 3、指定資料相關的轉換
- 4、指定計算結果的存盤位置
- 5、觸發程式執行
- 1)Data Sources(資料源)
- 四、什么是DataSet?
- 五、DataSet 資料處理程序
- 1)Data Sources (資料源)
- 1、基于檔案
- 2、基于集合
- 3、通用型
- 2)DataSet Transformations(資料集轉換//處理/算子)
- 3)Data Sinks(資料輸出)
- 1)Data Sources (資料源)
一、DataStream API概述
Flink 中的 DataStream 程式是對資料流(例如過濾、更新狀態、定義視窗、聚合)進行轉換的常規程式,資料流的起始是從各種源(例如訊息佇列、套接字流、檔案)創建的,結果通過 sink 回傳,例如可以將資料寫入檔案或標準輸出(例如命令列終端),Flink 程式可以在各種背景關系中運行,可以獨立運行,也可以嵌入到其它程式中,任務執行可以運行在本地 JVM 中,也可以運行在多臺機器的集群上,
二、什么是DataStream ?
- DataStream API 得名于特殊的 DataStream 類,該類用于表示 Flink 程式中的資料集合,你可以認為 它們是可以包含重復項的不可變資料集合,這些資料可以是有界(有限)的,也可以是無界(無限)的,但用于處理它們的API是相同的,
- DataStream 在用法上類似于常規的 Java 集合,但在某些關鍵方面卻大不相同,它們是不可變的,這意味著一旦它們被創建,你就不能添加或洗掉元素,你也不能簡單地察看內部元素,而只能使用 DataStream API 操作來處理它們,DataStream API 操作也叫作轉換(transformation),
- 你可以通過在 Flink 程式中添加 source 創建一個初始的 DataStream,然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流連接在一起,
三、DataStream 資料處理程序

1)Data Sources(資料源)
1、Data Sources 原理
官方檔案
一個資料 source 包括三個核心組件:分片(Splits)、分片列舉器(SplitEnumerator) 以及 源閱讀器(SourceReader),
-
分片(Split) 是對一部分 source 資料的包裝,如一個檔案或者日志磁區,分片是 source 進行任務分配和資料并行讀取的基本粒度,
-
源閱讀器(SourceReader) 會請求分片并進行處理,例如讀取分片所表示的檔案或日志磁區,SourceReader 在
TaskManagers上的 SourceOperators 并行運行,并產生并行的事件流/記錄流, -
分片列舉器(SplitEnumerator) 會生成分片并將它們分配給 SourceReader,該組件在
JobManager上以單并行度運行,負責對未分配的分片進行維護,并以均衡的方式將其分配給 reader,SplitEnumerator 被認為是整個 Source 的“大腦”,

2、Data Sources 實作方式
1)基于檔案
Source 是你的程式從中讀取其輸入的地方,你可以用
StreamExecutionEnvironment.addSource(sourceFunction)將一個 source 關聯到你的程式,Flink 自帶了許多預先實作的 source functions,不過你仍然可以通過實作 SourceFunction 介面撰寫自定義的非并行 source,也可以通過實作 ParallelSourceFunction 介面或者繼承 RichParallelSourceFunction 類撰寫自定義的并行 sources, 通過 StreamExecutionEnvironment 可以訪問多種預定義的 stream source,source 連接器,請查看連接器檔案,
readTextFile(path):讀取文本檔案,readFile(fileInputFormat, path)- 按照指定的檔案輸入格式讀取(一次)檔案,readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):這是前兩個方法內部呼叫的方法,它基于給定的 fileInputFormat 讀取路徑 path 上的檔案,根據提供的watchType的不同,source 可能定期(每 interval 毫秒)監控路徑上的新資料(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當前路徑中的資料然后退出(watchType 為 FileProcessingMode.PROCESS_ONCE),使用 pathFilter,用戶可以進一步排除正在處理的檔案,
2)基于套接字
socketTextStream:套接字讀取,元素可以由分隔符分隔,
3)基于集合
-
fromCollection(Collection):從 Java Java.util.Collection 創建資料流,集合中的所有元素必須屬于同一型別, -
fromCollection(Iterator, Class):從迭代器創建資料流,class 引數指定迭代器回傳元素的資料型別, -
fromElements(T ...):從給定的物件序列中創建資料流,所有的物件必須屬于同一型別, -
fromParallelCollection(SplittableIterator, Class):從迭代器并行創建資料流,class 引數指定迭代器回傳元素的資料型別, -
generateSequence(from, to):基于給定間隔內的數字序列并行生成資料流,
4)自定義
addSource:關聯一個新的 source function,例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 來從 Apache Kafka 獲取資料,更多詳細資訊見連接器,
2)DataStream Transformations(資料流轉換//處理/算子)
【溫馨提示】是用戶通過
算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程式中可以將多個資料轉換算子合并成一個復雜的資料流拓撲,這部分內容將描述 Flink DataStream API 中基本的資料轉換API,資料轉換后各種資料磁區方式,以及算子的鏈接策略,
官方檔案
1、資料流轉換
| 算子 | 資料轉換 | 解釋 | 示例 |
|---|---|---|---|
| Map | DataStream → DataStream | 獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式 | dataStream.map { x => x * 2 } |
| FlatMap | DataStream → DataStream | 獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式 | dataStream.flatMap { str => str.split(" ") } |
| Filter | DataStream → DataStream | 為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器 | dataStream.filter { _ != 0 } |
| KeyBy | DataStream → KeyedStream | 在邏輯上將流劃分為不相交的磁區,具有相同密鑰的所有記錄都被分配到同一磁區,在內部,keyBy()是通過哈希磁區實作的,類似于mysql里面的group by,有不同的方法來指定鍵 | dataStream.keyBy(.someKey) dataStream.keyBy(._1) |
| Reduce | KeyedStream → DataStream | 鍵控資料流上的“滾動”減少,將當前元素與上次減少的值合并,并發出新值,創建部分和流的reduce函式 | keyedStream.reduce { _ + _ } |
| Window | KeyedStream → WindowedStream | 可以在已磁區的KeyedStreams上定義視窗,Windows根據某些特征(例如,在過去5秒內到達的資料)對每個鍵中的資料進行分組,有關windows的完整說明,請參見windows, | dataStream .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) |
| WindowAll | DataStream → AllWindowedStream | 可以在常規資料流上定義視窗,Windows根據某些特征(例如,過去5秒內到達的資料)對所有流事件進行分組,有關windows的完整說明,請參見windows, | dataStream .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) |
| Window Apply | WindowedStream → DataStream ;AllWindowedStream → DataStream | 將常規功能應用于整個視窗,下面是一個手動求和視窗元素的函式,如果使用的是windowAll轉換,則需要使用AllWindowFunction, |
windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } |
| WindowReduce | WindowedStream → DataStream | 將reduce函式應用于視窗并回傳減少的值, | windowedStream.reduce { _ + _ } |
| Union | DataStream* → DataStream | 兩個或多個資料流的合并,創建一個包含所有流中所有元素的新流,注意:如果將一個資料流與其自身合并,則在結果流中會得到兩次每個元素, | dataStream.union(otherStream1, otherStream2, ...); |
| Window Join | DataStream,DataStream → DataStream | 在給定的密鑰和公共視窗上連接兩個資料流, | dataStream.join(otherStream) .where( .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } |
| Interval Join | KeyedStream,KeyedStream → DataStream | 在給定的時間間隔內,將兩個密鑰流的兩個元素e1和e2與一個公共密鑰連接,因此 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound | // this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...}) |
| Window CoGroup | DataStream,DataStream → DataStream | 在給定的鍵和公共視窗上對兩個資料流進行協組, | dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} |
| Connect | DataStream,DataStream → ConnectedStream | “連接”兩個保持其型別的資料流,連接允許兩個流之間的共享狀態, | someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) |
| CoMap, CoFlatMap | ConnectedStream → DataStream | 類似于連接資料流上的map和flatMap | connectedStreams.map( (_ : Int) => true, (_ : String) => false) ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) |
| Iterate | DataStream → IterativeStream → ConnectedStream | 通過將一個運算子的輸出重定向到前一個運算子,在流中創建一個“反饋”回圈,這對于定義不斷更新模型的演算法特別有用,下面的代碼從一個流開始,并連續地應用迭代體,大于0的元素被發送回反饋通道,其余的元素被下游轉發, | initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } |
2、物理磁區
Flink 也提供以下方法讓用戶根據需要在資料轉換完成后對資料磁區進行更細粒度的配置,
| 磁區 | 資料轉換 | 解釋 | 示例 |
|---|---|---|---|
| Custom Partitioning | DataStream → DataStream | 使用用戶定義的Partitioner為每個元素選擇目標任務, | dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0) |
| Random Partitioning | DataStream → DataStream | 根據均勻分布隨機劃分元素, | dataStream.shuffle() |
| Rescaling | DataStream → DataStream | 回圈地將元素磁區到下游操作的一個子集, | dataStream.rescale() |
| Broadcasting | DataStream → DataStream | 將元素廣播到每個磁區, |
3、算子鏈和資源組
將兩個算子鏈接在一起能使得它們在同一個執行緒中執行,從而提升性能,Flink 默認會將能鏈接的算子盡可能地進行鏈接(例如, 兩個 map 轉換操作),此外, Flink 還提供了對鏈接更細粒度控制的 API 以滿足更多需求,
如果想對整個作業禁用算子鏈,可以呼叫
StreamExecutionEnvironment.disableOperatorChaining(),下列方法還提供了更細粒度的控制,需要注 意的是,這些方法只能在 DataStream 轉換操作后才能被呼叫,因為它們只對前一次資料轉換生效,例如,可以 someStream.map(...).startNewChain() 這樣呼叫,而不能 someStream.startNewChain()這樣,
| 算子鏈操作 | 解釋 | 示例 |
|---|---|---|
| Start New Chain | 開始一個新的鏈,從這個運算子開始,這兩個映射器將被鏈接,過濾器將不會鏈接到第一個映射器, | someStream.filter(...).map(...).startNewChain().map(...) |
| Disable Chaining | 不要鏈接map運算子, | someStream.map(...).disableChaining() |
| Set Slot Sharing Group | 設定操作的槽位共享組,Flink將把具有相同槽共享組的操作放在相同槽中,而將沒有槽共享組的操作放在其他槽中,這可以用來隔離槽,如果所有的輸入操作都在同一個槽位共享組中,則從輸入操作繼承槽位共享組,默認槽位共享組的名稱為“default”,可以通過呼叫slotSharingGroup(“default”)顯式地將操作放入該組, |
someStream.filter(...).slotSharingGroup("name") |
3)Data Sinks(資料輸出)
sink 連接器,請查看連接器檔案,
Data sinks 使用 DataStream 并將它們轉發到檔案、套接字、外部系統或列印它們,Flink 自帶了多種內置的輸出格式,這些格式相關的實作封裝在 DataStreams 的算子里:
-
writeAsText() / TextOutputFormat: 將元素按行寫成字串,通過呼叫每個元素的 toString() 方法獲得字串, -
writeAsCsv(...) / CsvOutputFormat:將元組寫成逗號分隔值檔案,行和欄位的分隔符是可配置的,每個欄位的值來自物件的 toString() 方法, -
print() / printToErr():在標準輸出/標準錯誤流上列印每個元素的 toString() 值, 可選地,可以提供一個前綴(msg)附加到輸出,這有助于區分不同的 print 呼叫,如果并行度大于1,輸出結果將附帶輸出任務識別符號的前綴, -
writeUsingOutputFormat() / FileOutputFormat:自定義檔案輸出的方法和基類,支持自定義 object 到 byte 的轉換, -
writeToSocket:根據 SerializationSchema 將元素寫入套接字, -
addSink: 呼叫自定義 sink function,Flink 捆綁了連接到其他系統(例如 Apache Kafka)的連接器,這些連接器被實作為 sink functions,
【溫馨提示】DataStream 的 write*() 方法主要用于除錯目的,它們不參與 Flink 的 checkpointing,這意味著這些函式通常具有至少有一次語意,重繪到目標系統的資料取決于 OutputFormat 的實作,這意味著并非所有發送到 OutputFormat 的元素都會立即顯示在目標系統中,此外,在失敗的情況下,這些記錄可能會丟失,
為了將流可靠地、精準一次地傳輸到檔案系統中,請使用 StreamingFileSink,此外,通過 .addSink(...) 方法呼叫的自定義實作也可以參與 Flink 的 checkpointing,以實作精準一次的語意,
旁路輸出(分流)
旁路輸出在Flink中叫作SideOutput,用途類似于DataStream#split,本質上是一個資料流的切分行為,按照條件將DataStream切分為多個子資料流,子資料流叫作旁路輸出資料流,每個旁路輸出資料流可以有自己的下游處理邏輯,
使用旁路輸出時,首先需要定義用于標識旁路輸出流的 OutputTag:
val outputTag = OutputTag[String]("side-output")
可以通過以下方法將資料發送到旁路輸出:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
【示例】
package com
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object myOutputTag {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/hello.txt")
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
// 發送資料到主要的輸出
out.collect(value)
// 發送資料到旁路輸出
ctx.output(outputTag, "sideout-" + value)
}
})
// 獲取outputTag并輸出
mainDataStream.getSideOutput(outputTag).print()
// 必須呼叫execute或者executeAsync(),下面會講
env.execute("test OutputTag")
}
}

【問題】
Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream
【解決】在pom.xml添加下面依賴
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
2)Flink 程式剖析(scala)
Flink 程式看起來像一個轉換 DataStream 的常規程式,每個程式由相同的基本部分組成:
- 獲取一個執行環境(execution environment);
- 加載/創建初始資料;
- 指定資料相關的轉換;
- 指定計算結果的存盤位置;
- 觸發程式執行,
1、 獲取一個執行環境(execution environment)
val env = StreamExecutionEnvironment.getExecutionEnvironment
2、加載/創建初始資料
為了指定 data sources,執行環境提供了一些方法,支持使用各種方法從檔案中讀取資料:你可以直接逐行讀取資料,像讀 CSV 檔案一樣,或使用任何第三方提供的 source,下面是將一個文本檔案作為一個行的序列來讀,
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 加載資料源
val input: DataStream[String] = env.readTextFile("file:///path/to/file")
3、指定資料相關的轉換
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("file:///path/to/file")
// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }
4、指定計算結果的存盤位置
一旦你有了包含最終結果的 DataStream,你就可以通過創建 sink 把它寫到外部系統,下面是一些用于創建 sink 的示例方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/source")
// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }
// 存盤到檔案,當然還可以執行更多的sink
// writeAsText第二個引數來定義輸出模式,它有以下兩個可選值:
// WriteMode.NO_OVERWRITE:當指定路徑上不存在任何檔案時,才執行寫出操作;
// WriteMode.OVERWRITE:不論指定路徑上是否存在檔案,都執行寫出操作;如果原來已有檔案,則進行覆寫,
mapped.writeAsText("flink/data/sink", FileSystem.WriteMode.OVERWRITE)
5、觸發程式執行
-
一旦指定了完整的程式,需要呼叫
StreamExecutionEnvironment的execute()方法來觸發程式執行,根據 ExecutionEnvironment 的型別,執行會在你的本地機器上觸發,或將你的程式提交到某個集群上執行,execute() 方法將等待作業完成,然后回傳一個 JobExecutionResult,其中包含執行時間和累加器結果, -
如果不想等待作業完成,可以通過呼叫 StreamExecutionEnvironment 的
executeAsync()方法來觸發作業異步執行,它會回傳一個 JobClient,你可以通過它與剛剛提交的作業進行通信,如下是使用 executeAsync() 實作 execute() 語意的示例,
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
完整示例程式(官網示例)
【問題一】
【溫馨提示】如果出現這種報錯,一般就是IDEA 對scope為provided,這是IDEA的bug:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
【解決】
- 【第一種方式】把依賴范圍調大或者直接去掉都行,不清楚的可以看我之前的Java-Maven詳解,但是記住在打包的時候得加上,
- 【第二種方式】Run->Edit Configurations,設定如下:

【問題二】
【問題】
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.commons.math3.stat.descriptive.rank.Percentile.withNaNStrategy(Lorg/apache/commons/math3/stat/ranking/NaNStrategy;)Lorg/apache/commons/math3/stat/descriptive/rank/Percentile;hadoop-common中的commons-math3沖突導致,
【解決】排除hadoop-common中的commons-math3,設定如此:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
先啟動服務
$ nc -lk 9999
WindowWordCount原始碼如下:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}

四、什么是DataSet?
Flink用DataStream 表示無界資料集,用DataSet表示有界資料集,前者用于流處理應用程式,后者用于批處理應用程式,從操作形式上看,DataStream 和 DataSet 與集合 Collection 有些相似,但兩者有著本質的區別:
- DataStream 和 DataSet 是不可變的資料集合,因此不可以想操作集合那樣增加或者洗掉 DataStream 和 DataSet 中的元素,也不可以通過諸如下標等方式訪問某個元素,
- Flink 應用程式通過 Source 創建 DataStream 物件和 DataSet 物件,通過轉換操作產生新的 DataStream 物件和 DataSet 物件,
- 運行時是應用程式被調度執行時的背景關系環境,通過
StreamExecutionEnvironment或ExecutionEnvironment方法會根據當前環境自動選擇本地或者集群運行時環境,
五、DataSet 資料處理程序

1)Data Sources (資料源)
資料源創建初始資料集,比如從檔案或Java集合創建資料集,創建資料集的一般機制抽象在InputFormat后面,Flink提供了幾種內置格式,可以從常見的檔案格式創建資料集,它們中的許多在ExecutionEnvironment上都有快捷方法,
官方檔案
1、基于檔案
readTextFile(path) / TextInputFormat:讀取文本檔案,readTextFileWithValue(path) / TextValueInputFormat: 讀取檔案,并將它們作為StringValues回傳,StringValues是可變字串,readCsvFile(path) / CsvInputFormat:決議帶有逗號(或其他字符)分隔欄位的檔案,回傳由元組或pojo組成的資料集,支持基本java型別及其對應值作為欄位型別,readFileOfPrimitives(path, Class) / PrimitiveInputFormat:決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat:使用給定的分隔符決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,
2、基于集合
fromCollection(Collection):從Java.util.Collection創建一個資料集,集合中的所有元素必須具有相同的型別,fromCollection(Iterator, Class):從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,fromElements(T …):根據給定的物件序列創建一個資料集,所有物件必須是相同的型別,fromParallelCollection(SplittableIterator, Class):并行地從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,generateSequence(from, to):并行生成給定區間內的數字序列,
3、通用型
readFile(inputFormat, path) / FileInputFormat:接受檔案輸入格式,createInput(inputFormat) / InputFormat:接受通用輸入格式,
2)DataSet Transformations(資料集轉換//處理/算子)
資料轉換將一個或多個資料集轉換為新的資料集,程式可以將多個轉換組合成復雜的程式集,
| 算子 | 解釋 | 示例 |
|---|---|---|
| Map | 獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式, | data.map { x => x.toInt } |
| FlatMap | 獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式, | data.flatMap { str => str.split(" ") } |
| MapPartition | 在單個函式呼叫中轉換并行磁區,該函式以Iterable流的形式獲取磁區,并可以生成任意數量的結果值,每個磁區中的元素數量取決于并行度和之前的操作, | data.mapPartition { in => in map { (_, 1) } } |
| Filter | 為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器, | data.filter { _ > 1000 } |
| Reduce | 通過重復地將兩個元素組合成一個元素,將一組元素組合成一個元素,Reduce可以應用于完整的資料集或分組的資料集, | data.reduce { _ + _ } |
| ReduceGroup | 將一組元素組合成一個或多個元素,ReduceGroup可以應用于完整的資料集,也可以應用于分組的資料集, | data.reduceGroup { elements => elements.sum } |
| Aggregate | 將一組值聚合為一個值,聚合函式可以看作是內置的reduce函式,聚合可以應用于完整的資料集,也可以應用于分組的資料集, | val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2) |
| Distinct | 回傳資料集的不同元素,對于元素的所有欄位或欄位的子集,它將從輸入資料集中洗掉重復的條目, | data.distinct() |
| Join | 通過創建鍵值相等的所有元素對來連接兩個資料集,可選地使用JoinFunction將這對元素轉換為單個元素,或使用FlatJoinFunction將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵, | val result = input1.join(input2).where(0).equalTo(1) |
| OuterJoin | 對兩個資料集執行左、右或完全外部連接,外部連接類似于常規(內部)連接,它創建的所有元素對的鍵值相等,此外,如果在另一側沒有找到匹配的鍵,則保存外部的記錄(如果是完整的,則為左、右或兩者),匹配的元素對(或一個元素和另一個輸入的空值)被賦給一個JoinFunction以將這對元素轉換為單個元素,或者賦給一個FlatJoinFunction以將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵, | val joined = left.leftOuterJoin(right).where(0).equalTo(1) { (left, right) => val a = if (left == null) "none" else left._1 (a, right) } |
| CoGroup | 簡化運算的二維變體,對一個或多個欄位上的每個輸入進行分組,然后合并組,每對組呼叫一個變換函式,請參閱鍵部分以了解如何定義coGroup鍵, | data1.coGroup(data2).where(0).equalTo(1) |
| Cross | 構建兩個輸入的笛卡爾積(叉積),創建所有的元素對,可選地使用CrossFunction將這對元素轉換為單個元素, | val data1: DataSet[Int] = // [...] val data2: DataSet[String] = // [...] val result: DataSet[(Int, String)] = data1.cross(data2) |
| Union | 生成兩個資料集的并集, | data.union(data2) |
| Rebalance | 均勻地重新平衡資料集的并行磁區,以消除資料傾斜,只有類似map的轉換可以遵循rebalance轉換, | val data1: DataSet[Int] = // [...] val result: DataSet[(Int, String)] = data1.rebalance().map(...) |
| Hash-Partition | 哈希磁區一個給定鍵的資料集,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式, | val in: DataSet[(Int, String)] = // [...] val result = in.partitionByHash(0).mapPartition { ... } |
| Range-Partition | 根據給定的鍵對資料集進行范圍磁區,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式, | val in: DataSet[(Int, String)] = // [...] val result = in.partitionByRange(0).mapPartition { ... } |
| Custom Partitioning | 使用自定義Partitioner函式,根據鍵將記錄分配到特定的磁區,該鍵可以指定為位置鍵、運算式鍵和選擇鍵函式,注意:此方法只適用于單個欄位鍵, | val in: DataSet[(Int, String)] = // [...] val result = in .partitionCustom(partitioner, key).mapPartition { ... } |
| Sort Partitioning | 按照指定的順序在本地對指定欄位上的資料集的所有磁區進行排序,欄位可以指定為元組位置或欄位運算式,對多個欄位進行排序是通過鏈接sortPartition()呼叫來完成的, | val in: DataSet[(Int, String)] = // [...] val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... } |
| First-N | 回傳資料集的前n個(任意的)元素,First-n可以應用于常規資料集、分組資料集或分組排序資料集,分組鍵可以指定為鍵選擇器函式或欄位位置鍵, | val in: DataSet[(Int, String)] = // [...] // regular data set val result1 = in.first(3) // grouped data set val result2 = in.groupBy(0).first(3) // grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3) |
| MinBy / MaxBy | 從一個或多個欄位值為最小(最大值)的元組中選擇一個元組,用于比較的欄位必須是有效的關鍵欄位,即可比性,如果多個元組具有最小(最大)欄位值,則回傳這些元組中的任意一個元組,MinBy (MaxBy)可以應用于完整的資料集或分組的資料集, | val in: DataSet[(Int, Double, String)] = // [...] // a data set with a single tuple with minimum values for the Int and String fields. val out: DataSet[(Int, Double, String)] = in.minBy(0, 2) // a data set with one tuple for each group with the minimum value for the Double field. val out2: DataSet[(Int, Double, String)] = in.groupBy(2).minBy(1) |
| Specifying Keys | 一些轉換(join、coGroup、groupBy)要求在元素集合上定義鍵,其他轉換(Reduce、groureduce、Aggregate)允許在應用資料之前對資料進行分組, | DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/define key here/) .reduceGroup(/do something/); |
| Define keys for Tuples | 最簡單的情況是在元組的一個或多個欄位上分組元組, | val input: DataSet[(Int, String, Long)] = // [...] val keyed = input.groupBy(0) //val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1) |
3)Data Sinks(資料輸出)
資料接收器使用資料集,并用于存盤或回傳它們,使用OutputFormat描述資料接收器操作,Flink提供了多種內置的輸出格式,這些格式封裝在DataSet上的操作后面:
writeAsText() / TextOutputFormat:按行方式將元素寫入字串,字串是通過呼叫每個元素的toString()方法獲得的,writeAsFormattedText() / TextOutputFormat:將元素按行撰寫為字串,字串是通過為每個元素呼叫用戶定義的format()方法獲得的,writeAsCsv(…) / CsvOutputFormat:將元組寫入逗號分隔的值檔案,行和欄位分隔符是可配置的,每個欄位的值來自物件的toString()方法,print() / printToErr() / print(String msg) / printToErr(String msg) -列印出標準輸出/標準錯誤流中每個元素的toString()值,可選地,可以提供一個前綴(msg),作為輸出的前綴,這有助于區分不同的列印呼叫,如果并行度大于1,輸出也會被添加產生輸出的任務的識別符號,write() / FileOutputFormat:方法和基類用于自定義檔案輸出,支持自定義物件到位元組的轉換,output()/ OutputFormat:大多數通用輸出方法,用于非基于檔案的資料接收器(例如將結果存盤在資料庫中),
一個資料集可以被輸入到多個操作,程式可以寫或列印一個資料集,同時在它們上運行額外的轉換,
【示例】
package com
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSetTest001 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// text data
val textData: DataSet[String] = env.readTextFile("flink/data/s1")
// write DataSet to a file on the local file system
// textData.writeAsText("flink/data/sink01")
// write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
// 先創建目錄:hadoop fs -mkdir -p hdfs://hadoop-node1:8082/flink/DataSet/
// 操作添加依賴
/*<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
</dependency>*/
textData.writeAsText("hdfs://hadoop-node1:8082/flink/DataSet/sink02")
//
// // write DataSet to a file and overwrite the file if it exists
// textData.writeAsText("flink/data/sink03", WriteMode.OVERWRITE)
//
// // tuples as lines with pipe as the separator "a|b|c"
// val values: DataSet[(String, Int, Double)] = // [...]
// values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
//
// // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
// values.writeAsText("file:///path/to/the/result/file")
// this writes values as strings using a user-defined formatting
// values map { tuple => tuple._1 + " - " + tuple._2 }
// .writeAsText("file:///path/to/the/result/file")
env.execute("dataset test")
}
}

【示例】WordCount
package com
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val counts = text
.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
}
}

未完待續,更多大資料知識,請耐心等待~
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/471852.html
標籤:其他
上一篇:JDBC(1)eclipse連接MySQL 8.0.29.0
下一篇:如何在覆寫小部件上覆寫小部件
