主頁 > 資料庫 > 大資料Hadoop之——Flink DataStream API 和 DataSet API

大資料Hadoop之——Flink DataStream API 和 DataSet API

2022-05-10 07:53:11 資料庫

目錄
  • 一、DataStream API概述
  • 二、什么是DataStream ?
  • 三、DataStream 資料處理程序
    • 1)Data Sources(資料源)
      • 1、Data Sources 原理
      • 2、Data Sources 實作方式
        • 1)基于檔案
        • 2)基于套接字
        • 3)基于集合
        • 4)自定義
    • 2)DataStream Transformations(資料流轉換//處理/算子)
      • 1、資料流轉換
      • 2、物理磁區
      • 3、算子鏈和資源組
    • 3)Data Sinks(資料輸出)
      • 旁路輸出(分流)
    • 2)Flink 程式剖析(scala)
      • 1、 獲取一個執行環境(execution environment)
      • 2、加載/創建初始資料
      • 3、指定資料相關的轉換
      • 4、指定計算結果的存盤位置
      • 5、觸發程式執行
  • 四、什么是DataSet?
  • 五、DataSet 資料處理程序
    • 1)Data Sources (資料源)
      • 1、基于檔案
      • 2、基于集合
      • 3、通用型
    • 2)DataSet Transformations(資料集轉換//處理/算子)
    • 3)Data Sinks(資料輸出)

一、DataStream API概述

Flink 中的 DataStream 程式是對資料流(例如過濾、更新狀態、定義視窗、聚合)進行轉換的常規程式,資料流的起始是從各種源(例如訊息佇列、套接字流、檔案)創建的,結果通過 sink 回傳,例如可以將資料寫入檔案或標準輸出(例如命令列終端),Flink 程式可以在各種背景關系中運行,可以獨立運行,也可以嵌入到其它程式中,任務執行可以運行在本地 JVM 中,也可以運行在多臺機器的集群上,

二、什么是DataStream ?

  • DataStream API 得名于特殊的 DataStream 類,該類用于表示 Flink 程式中的資料集合,你可以認為 它們是可以包含重復項的不可變資料集合,這些資料可以是有界(有限)的,也可以是無界(無限)的,但用于處理它們的API是相同的,
  • DataStream 在用法上類似于常規的 Java 集合,但在某些關鍵方面卻大不相同,它們是不可變的,這意味著一旦它們被創建,你就不能添加或洗掉元素,你也不能簡單地察看內部元素,而只能使用 DataStream API 操作來處理它們,DataStream API 操作也叫作轉換(transformation),
  • 你可以通過在 Flink 程式中添加 source 創建一個初始的 DataStream,然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流連接在一起,

三、DataStream 資料處理程序

1)Data Sources(資料源)

1、Data Sources 原理

官方檔案
一個資料 source 包括三個核心組件:分片(Splits)分片列舉器(SplitEnumerator) 以及 源閱讀器(SourceReader)

  • 分片(Split) 是對一部分 source 資料的包裝,如一個檔案或者日志磁區,分片是 source 進行任務分配和資料并行讀取的基本粒度,

  • 源閱讀器(SourceReader) 會請求分片并進行處理,例如讀取分片所表示的檔案或日志磁區,SourceReader 在 TaskManagers 上的 SourceOperators 并行運行,并產生并行的事件流/記錄流,

  • 分片列舉器(SplitEnumerator) 會生成分片并將它們分配給 SourceReader,該組件在 JobManager 上以單并行度運行,負責對未分配的分片進行維護,并以均衡的方式將其分配給 reader,SplitEnumerator 被認為是整個 Source 的“大腦”,

2、Data Sources 實作方式

1)基于檔案

Source 是你的程式從中讀取其輸入的地方,你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 將一個 source 關聯到你的程式,Flink 自帶了許多預先實作的 source functions,不過你仍然可以通過實作 SourceFunction 介面撰寫自定義的非并行 source,也可以通過實作 ParallelSourceFunction 介面或者繼承 RichParallelSourceFunction 類撰寫自定義的并行 sources, 通過 StreamExecutionEnvironment 可以訪問多種預定義的 stream source,source 連接器,請查看連接器檔案,

  • readTextFile(path):讀取文本檔案,
  • readFile(fileInputFormat, path) - 按照指定的檔案輸入格式讀取(一次)檔案,
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) :這是前兩個方法內部呼叫的方法,它基于給定的 fileInputFormat 讀取路徑 path 上的檔案,根據提供的 watchType 的不同,source 可能定期(每 interval 毫秒)監控路徑上的新資料(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當前路徑中的資料然后退出(watchType 為 FileProcessingMode.PROCESS_ONCE),使用 pathFilter,用戶可以進一步排除正在處理的檔案,
2)基于套接字
  • socketTextStream:套接字讀取,元素可以由分隔符分隔,
3)基于集合
  • fromCollection(Collection) :從 Java Java.util.Collection 創建資料流,集合中的所有元素必須屬于同一型別,

  • fromCollection(Iterator, Class) :從迭代器創建資料流,class 引數指定迭代器回傳元素的資料型別,

  • fromElements(T ...) :從給定的物件序列中創建資料流,所有的物件必須屬于同一型別,

  • fromParallelCollection(SplittableIterator, Class) :從迭代器并行創建資料流,class 引數指定迭代器回傳元素的資料型別,

  • generateSequence(from, to) :基于給定間隔內的數字序列并行生成資料流,

4)自定義
  • addSource:關聯一個新的 source function,例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 來從 Apache Kafka 獲取資料,更多詳細資訊見連接器,

2)DataStream Transformations(資料流轉換//處理/算子)

【溫馨提示】是用戶通過算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程式中可以將多個資料轉換算子合并成一個復雜的資料流拓撲,這部分內容將描述 Flink DataStream API 中基本的資料轉換API,資料轉換后各種資料磁區方式,以及算子的鏈接策略,

官方檔案

1、資料流轉換

算子 資料轉換 解釋 示例
Map DataStream → DataStream 獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式 dataStream.map { x => x * 2 }
FlatMap DataStream → DataStream 獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式 dataStream.flatMap { str => str.split(" ") }
Filter DataStream → DataStream 為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器 dataStream.filter { _ != 0 }
KeyBy DataStream → KeyedStream 在邏輯上將流劃分為不相交的磁區,具有相同密鑰的所有記錄都被分配到同一磁區,在內部,keyBy()是通過哈希磁區實作的,類似于mysql里面的group by,有不同的方法來指定鍵 dataStream.keyBy(.someKey)
dataStream.keyBy(
._1)
Reduce KeyedStream → DataStream 鍵控資料流上的“滾動”減少,將當前元素與上次減少的值合并,并發出新值,創建部分和流的reduce函式 keyedStream.reduce { _ + _ }
Window KeyedStream → WindowedStream 可以在已磁區的KeyedStreams上定義視窗,Windows根據某些特征(例如,在過去5秒內到達的資料)對每個鍵中的資料進行分組,有關windows的完整說明,請參見windows, dataStream
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAll DataStream → AllWindowedStream 可以在常規資料流上定義視窗,Windows根據某些特征(例如,過去5秒內到達的資料)對所有流事件進行分組,有關windows的完整說明,請參見windows, dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window Apply WindowedStream → DataStream ;AllWindowedStream → DataStream 將常規功能應用于整個視窗,下面是一個手動求和視窗元素的函式,如果使用的是windowAll轉換,則需要使用AllWindowFunction windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
WindowReduce WindowedStream → DataStream 將reduce函式應用于視窗并回傳減少的值, windowedStream.reduce { _ + _ }
Union DataStream* → DataStream 兩個或多個資料流的合并,創建一個包含所有流中所有元素的新流,注意:如果將一個資料流與其自身合并,則在結果流中會得到兩次每個元素, dataStream.union(otherStream1, otherStream2, ...);
Window Join DataStream,DataStream → DataStream 在給定的密鑰和公共視窗上連接兩個資料流, dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... }
Interval Join KeyedStream,KeyedStream → DataStream 在給定的時間間隔內,將兩個密鑰流的兩個元素e1和e2與一個公共密鑰連接,因此 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound // this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
// lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...})
Window CoGroup DataStream,DataStream → DataStream 在給定的鍵和公共視窗上對兩個資料流進行協組, dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect DataStream,DataStream → ConnectedStream “連接”兩個保持其型別的資料流,連接允許兩個流之間的共享狀態, someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap ConnectedStream → DataStream 類似于連接資料流上的map和flatMap connectedStreams.map(
(_ : Int) => true,
(_ : String) => false)
)

connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Iterate DataStream → IterativeStream → ConnectedStream 通過將一個運算子的輸出重定向到前一個運算子,在流中創建一個“反饋”回圈,這對于定義不斷更新模型的演算法特別有用,下面的代碼從一個流開始,并連續地應用迭代體,大于0的元素被發送回反饋通道,其余的元素被下游轉發, initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/do something/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}

2、物理磁區

Flink 也提供以下方法讓用戶根據需要在資料轉換完成后對資料磁區進行更細粒度的配置,

磁區 資料轉換 解釋 示例
Custom Partitioning DataStream → DataStream 使用用戶定義的Partitioner為每個元素選擇目標任務, dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random Partitioning DataStream → DataStream 根據均勻分布隨機劃分元素, dataStream.shuffle()
Rescaling DataStream → DataStream 回圈地將元素磁區到下游操作的一個子集, dataStream.rescale()
Broadcasting DataStream → DataStream 將元素廣播到每個磁區,

3、算子鏈和資源組

將兩個算子鏈接在一起能使得它們在同一個執行緒中執行,從而提升性能,Flink 默認會將能鏈接的算子盡可能地進行鏈接(例如, 兩個 map 轉換操作),此外, Flink 還提供了對鏈接更細粒度控制的 API 以滿足更多需求,

如果想對整個作業禁用算子鏈,可以呼叫 StreamExecutionEnvironment.disableOperatorChaining(),下列方法還提供了更細粒度的控制,需要注 意的是, 這些方法只能在 DataStream 轉換操作后才能被呼叫,因為它們只對前一次資料轉換生效,例如,可以 someStream.map(...).startNewChain() 這樣呼叫,而不能 someStream.startNewChain()這樣,

算子鏈操作 解釋 示例
Start New Chain 開始一個新的鏈,從這個運算子開始,這兩個映射器將被鏈接,過濾器將不會鏈接到第一個映射器, someStream.filter(...).map(...).startNewChain().map(...)
Disable Chaining 不要鏈接map運算子, someStream.map(...).disableChaining()
Set Slot Sharing Group 設定操作的槽位共享組,Flink將把具有相同槽共享組的操作放在相同槽中,而將沒有槽共享組的操作放在其他槽中,這可以用來隔離槽,如果所有的輸入操作都在同一個槽位共享組中,則從輸入操作繼承槽位共享組,默認槽位共享組的名稱為“default”,可以通過呼叫slotSharingGroup(“default”)顯式地將操作放入該組, someStream.filter(...).slotSharingGroup("name")

3)Data Sinks(資料輸出)

sink 連接器,請查看連接器檔案,

Data sinks 使用 DataStream 并將它們轉發到檔案、套接字、外部系統或列印它們,Flink 自帶了多種內置的輸出格式,這些格式相關的實作封裝在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat : 將元素按行寫成字串,通過呼叫每個元素的 toString() 方法獲得字串,

  • writeAsCsv(...) / CsvOutputFormat :將元組寫成逗號分隔值檔案,行和欄位的分隔符是可配置的,每個欄位的值來自物件的 toString() 方法,

  • print() / printToErr():在標準輸出/標準錯誤流上列印每個元素的 toString() 值, 可選地,可以提供一個前綴(msg)附加到輸出,這有助于區分不同的 print 呼叫,如果并行度大于1,輸出結果將附帶輸出任務識別符號的前綴,

  • writeUsingOutputFormat() / FileOutputFormat :自定義檔案輸出的方法和基類,支持自定義 object 到 byte 的轉換,

  • writeToSocket :根據 SerializationSchema 將元素寫入套接字,

  • addSink : 呼叫自定義 sink function,Flink 捆綁了連接到其他系統(例如 Apache Kafka)的連接器,這些連接器被實作為 sink functions,

【溫馨提示】DataStream 的 write*() 方法主要用于除錯目的,它們不參與 Flink 的 checkpointing,這意味著這些函式通常具有至少有一次語意,重繪到目標系統的資料取決于 OutputFormat 的實作,這意味著并非所有發送到 OutputFormat 的元素都會立即顯示在目標系統中,此外,在失敗的情況下,這些記錄可能會丟失,

為了將流可靠地、精準一次地傳輸到檔案系統中,請使用 StreamingFileSink,此外,通過 .addSink(...) 方法呼叫的自定義實作也可以參與 Flink 的 checkpointing,以實作精準一次的語意,

旁路輸出(分流)

旁路輸出在Flink中叫作SideOutput,用途類似于DataStream#split,本質上是一個資料流的切分行為,按照條件將DataStream切分為多個子資料流,子資料流叫作旁路輸出資料流,每個旁路輸出資料流可以有自己的下游處理邏輯,

使用旁路輸出時,首先需要定義用于標識旁路輸出流的 OutputTag:

val outputTag = OutputTag[String]("side-output")

可以通過以下方法將資料發送到旁路輸出:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

【示例】

package com

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object myOutputTag {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input: DataStream[String] = env.readTextFile("flink/data/hello.txt")
    val outputTag = OutputTag[String]("side-output")

    val mainDataStream = input
      .process(new ProcessFunction[String, String] {
        override def processElement(
                                     value: String,
                                     ctx: ProcessFunction[String, String]#Context,
                                     out: Collector[String]): Unit = {
          // 發送資料到主要的輸出
          out.collect(value)
          // 發送資料到旁路輸出
          ctx.output(outputTag, "sideout-" + value)
        }
      })
    
    // 獲取outputTag并輸出
    mainDataStream.getSideOutput(outputTag).print()
    // 必須呼叫execute或者executeAsync(),下面會講
    env.execute("test OutputTag")
  }

}


【問題】Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream

【解決】在pom.xml添加下面依賴

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-compress</artifactId>
	<version>1.21</version>
</dependency>

Flink 程式看起來像一個轉換 DataStream 的常規程式,每個程式由相同的基本部分組成:

  1. 獲取一個執行環境(execution environment);
  2. 加載/創建初始資料;
  3. 指定資料相關的轉換;
  4. 指定計算結果的存盤位置;
  5. 觸發程式執行,

1、 獲取一個執行環境(execution environment)

val env = StreamExecutionEnvironment.getExecutionEnvironment

2、加載/創建初始資料

為了指定 data sources,執行環境提供了一些方法,支持使用各種方法從檔案中讀取資料:你可以直接逐行讀取資料,像讀 CSV 檔案一樣,或使用任何第三方提供的 source,下面是將一個文本檔案作為一個行的序列來讀,

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 加載資料源
val input: DataStream[String] = env.readTextFile("file:///path/to/file")

3、指定資料相關的轉換

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("file:///path/to/file")

// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }

4、指定計算結果的存盤位置

一旦你有了包含最終結果的 DataStream,你就可以通過創建 sink 把它寫到外部系統,下面是一些用于創建 sink 的示例方法:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/source")

// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }

// 存盤到檔案,當然還可以執行更多的sink
// writeAsText第二個引數來定義輸出模式,它有以下兩個可選值:
// WriteMode.NO_OVERWRITE:當指定路徑上不存在任何檔案時,才執行寫出操作;
// WriteMode.OVERWRITE:不論指定路徑上是否存在檔案,都執行寫出操作;如果原來已有檔案,則進行覆寫,
mapped.writeAsText("flink/data/sink", FileSystem.WriteMode.OVERWRITE)

5、觸發程式執行

  • 一旦指定了完整的程式,需要呼叫 StreamExecutionEnvironmentexecute()方法來觸發程式執行,根據 ExecutionEnvironment 的型別,執行會在你的本地機器上觸發,或將你的程式提交到某個集群上執行,execute() 方法將等待作業完成,然后回傳一個 JobExecutionResult,其中包含執行時間和累加器結果,

  • 如果不想等待作業完成,可以通過呼叫 StreamExecutionEnvironment 的 executeAsync() 方法來觸發作業異步執行,它會回傳一個 JobClient,你可以通過它與剛剛提交的作業進行通信,如下是使用 executeAsync() 實作 execute() 語意的示例,

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

完整示例程式(官網示例)

【問題一】

【溫馨提示】如果出現這種報錯,一般就是IDEA 對scope為provided,這是IDEA的bug:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo

【解決】

  • 【第一種方式】把依賴范圍調大或者直接去掉都行,不清楚的可以看我之前的Java-Maven詳解,但是記住在打包的時候得加上,
  • 【第二種方式】Run->Edit Configurations,設定如下:

【問題二】

【問題】Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.commons.math3.stat.descriptive.rank.Percentile.withNaNStrategy(Lorg/apache/commons/math3/stat/ranking/NaNStrategy;)Lorg/apache/commons/math3/stat/descriptive/rank/Percentile;hadoop-common中的commons-math3沖突導致,

【解決】排除hadoop-common中的commons-math3,設定如此:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.1</version>
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
        </exclusion>
    </exclusions>
</dependency>

先啟動服務

$ nc -lk 9999

WindowWordCount原始碼如下:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

四、什么是DataSet?

Flink用DataStream 表示無界資料集,用DataSet表示有界資料集,前者用于流處理應用程式,后者用于批處理應用程式,從操作形式上看,DataStream 和 DataSet 與集合 Collection 有些相似,但兩者有著本質的區別:

  • DataStream 和 DataSet 是不可變的資料集合,因此不可以想操作集合那樣增加或者洗掉 DataStream 和 DataSet 中的元素,也不可以通過諸如下標等方式訪問某個元素,
  • Flink 應用程式通過 Source 創建 DataStream 物件和 DataSet 物件,通過轉換操作產生新的 DataStream 物件和 DataSet 物件,
  • 運行時是應用程式被調度執行時的背景關系環境,通過StreamExecutionEnvironmentExecutionEnvironment方法會根據當前環境自動選擇本地或者集群運行時環境,

五、DataSet 資料處理程序

1)Data Sources (資料源)

資料源創建初始資料集,比如從檔案或Java集合創建資料集,創建資料集的一般機制抽象在InputFormat后面,Flink提供了幾種內置格式,可以從常見的檔案格式創建資料集,它們中的許多在ExecutionEnvironment上都有快捷方法,

官方檔案

1、基于檔案

  • readTextFile(path) / TextInputFormat :讀取文本檔案,
  • readTextFileWithValue(path) / TextValueInputFormat: 讀取檔案,并將它們作為StringValues回傳,StringValues是可變字串,
  • readCsvFile(path) / CsvInputFormat:決議帶有逗號(或其他字符)分隔欄位的檔案,回傳由元組或pojo組成的資料集,支持基本java型別及其對應值作為欄位型別,
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat:決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,
  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat:使用給定的分隔符決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,

2、基于集合

  • fromCollection(Collection):從Java.util.Collection創建一個資料集,集合中的所有元素必須具有相同的型別,
  • fromCollection(Iterator, Class):從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,
  • fromElements(T …) :根據給定的物件序列創建一個資料集,所有物件必須是相同的型別,
  • fromParallelCollection(SplittableIterator, Class):并行地從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,
  • generateSequence(from, to):并行生成給定區間內的數字序列,

3、通用型

  • readFile(inputFormat, path) / FileInputFormat :接受檔案輸入格式,
  • createInput(inputFormat) / InputFormat:接受通用輸入格式,

2)DataSet Transformations(資料集轉換//處理/算子)

資料轉換將一個或多個資料集轉換為新的資料集,程式可以將多個轉換組合成復雜的程式集,

算子 解釋 示例
Map 獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式, data.map { x => x.toInt }
FlatMap 獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式, data.flatMap { str => str.split(" ") }
MapPartition 在單個函式呼叫中轉換并行磁區,該函式以Iterable流的形式獲取磁區,并可以生成任意數量的結果值,每個磁區中的元素數量取決于并行度和之前的操作, data.mapPartition { in => in map { (_, 1) } }
Filter 為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器, data.filter { _ > 1000 }
Reduce 通過重復地將兩個元素組合成一個元素,將一組元素組合成一個元素,Reduce可以應用于完整的資料集或分組的資料集, data.reduce { _ + _ }
ReduceGroup 將一組元素組合成一個或多個元素,ReduceGroup可以應用于完整的資料集,也可以應用于分組的資料集, data.reduceGroup { elements => elements.sum }
Aggregate 將一組值聚合為一個值,聚合函式可以看作是內置的reduce函式,聚合可以應用于完整的資料集,也可以應用于分組的資料集, val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
Distinct 回傳資料集的不同元素,對于元素的所有欄位或欄位的子集,它將從輸入資料集中洗掉重復的條目, data.distinct()
Join 通過創建鍵值相等的所有元素對來連接兩個資料集,可選地使用JoinFunction將這對元素轉換為單個元素,或使用FlatJoinFunction將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵, val result = input1.join(input2).where(0).equalTo(1)
OuterJoin 對兩個資料集執行左、右或完全外部連接,外部連接類似于常規(內部)連接,它創建的所有元素對的鍵值相等,此外,如果在另一側沒有找到匹配的鍵,則保存外部的記錄(如果是完整的,則為左、右或兩者),匹配的元素對(或一個元素和另一個輸入的空值)被賦給一個JoinFunction以將這對元素轉換為單個元素,或者賦給一個FlatJoinFunction以將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵, val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
(left, right) =>
val a = if (left == null) "none" else left._1
(a, right)
}
CoGroup 簡化運算的二維變體,對一個或多個欄位上的每個輸入進行分組,然后合并組,每對組呼叫一個變換函式,請參閱鍵部分以了解如何定義coGroup鍵, data1.coGroup(data2).where(0).equalTo(1)
Cross 構建兩個輸入的笛卡爾積(叉積),創建所有的元素對,可選地使用CrossFunction將這對元素轉換為單個元素, val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)
Union 生成兩個資料集的并集, data.union(data2)
Rebalance 均勻地重新平衡資料集的并行磁區,以消除資料傾斜,只有類似map的轉換可以遵循rebalance轉換, val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
Hash-Partition 哈希磁區一個給定鍵的資料集,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式, val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
Range-Partition 根據給定的鍵對資料集進行范圍磁區,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式, val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
Custom Partitioning 使用自定義Partitioner函式,根據鍵將記錄分配到特定的磁區,該鍵可以指定為位置鍵、運算式鍵和選擇鍵函式,注意:此方法只適用于單個欄位鍵, val in: DataSet[(Int, String)] = // [...]
val result = in
.partitionCustom(partitioner, key).mapPartition { ... }
Sort Partitioning 按照指定的順序在本地對指定欄位上的資料集的所有磁區進行排序,欄位可以指定為元組位置或欄位運算式,對多個欄位進行排序是通過鏈接sortPartition()呼叫來完成的, val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-N 回傳資料集的前n個(任意的)元素,First-n可以應用于常規資料集、分組資料集或分組排序資料集,分組鍵可以指定為鍵選擇器函式或欄位位置鍵, val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
MinBy / MaxBy 從一個或多個欄位值為最小(最大值)的元組中選擇一個元組,用于比較的欄位必須是有效的關鍵欄位,即可比性,如果多個元組具有最小(最大)欄位值,則回傳這些元組中的任意一個元組,MinBy (MaxBy)可以應用于完整的資料集或分組的資料集, val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2).minBy(1)
Specifying Keys 一些轉換(join、coGroup、groupBy)要求在元素集合上定義鍵,其他轉換(Reduce、groureduce、Aggregate)允許在應用資料之前對資料進行分組, DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/define key here/)
.reduceGroup(/do something/);
Define keys for Tuples 最簡單的情況是在元組的一個或多個欄位上分組元組, val input: DataSet[(Int, String, Long)] = // [...]
val keyed = input.groupBy(0)

//val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

3)Data Sinks(資料輸出)

資料接收器使用資料集,并用于存盤或回傳它們,使用OutputFormat描述資料接收器操作,Flink提供了多種內置的輸出格式,這些格式封裝在DataSet上的操作后面:

  • writeAsText() / TextOutputFormat:按行方式將元素寫入字串,字串是通過呼叫每個元素的toString()方法獲得的,
  • writeAsFormattedText() / TextOutputFormat:將元素按行撰寫為字串,字串是通過為每個元素呼叫用戶定義的format()方法獲得的,
  • writeAsCsv(…) / CsvOutputFormat:將元組寫入逗號分隔的值檔案,行和欄位分隔符是可配置的,每個欄位的值來自物件的toString()方法,print() / printToErr() / print(String msg) / printToErr(String msg) -列印出標準輸出/標準錯誤流中每個元素的toString()值,可選地,可以提供一個前綴(msg),作為輸出的前綴,這有助于區分不同的列印呼叫,如果并行度大于1,輸出也會被添加產生輸出的任務的識別符號,
  • write() / FileOutputFormat:方法和基類用于自定義檔案輸出,支持自定義物件到位元組的轉換,
  • output()/ OutputFormat:大多數通用輸出方法,用于非基于檔案的資料接收器(例如將結果存盤在資料庫中),

一個資料集可以被輸入到多個操作,程式可以寫或列印一個資料集,同時在它們上運行額外的轉換,

【示例】

package com

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object DataSetTest001 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // text data
    val textData: DataSet[String] = env.readTextFile("flink/data/s1")

    // write DataSet to a file on the local file system
//    textData.writeAsText("flink/data/sink01")


    // write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
    // 先創建目錄:hadoop fs -mkdir -p hdfs://hadoop-node1:8082/flink/DataSet/
    // 操作添加依賴
    /*<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.3.1</version>
      <scope>provided</scope>
    </dependency>*/

    textData.writeAsText("hdfs://hadoop-node1:8082/flink/DataSet/sink02")

//
//    // write DataSet to a file and overwrite the file if it exists
//    textData.writeAsText("flink/data/sink03", WriteMode.OVERWRITE)
//
//    // tuples as lines with pipe as the separator "a|b|c"
//    val values: DataSet[(String, Int, Double)] = // [...]
//    values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
//
//    // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
//    values.writeAsText("file:///path/to/the/result/file")

    // this writes values as strings using a user-defined formatting
//    values map { tuple => tuple._1 + " - " + tuple._2 }
//      .writeAsText("file:///path/to/the/result/file")

    env.execute("dataset test")
  }
}

【示例】WordCount

package com

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

未完待續,更多大資料知識,請耐心等待~

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/471830.html

標籤:大數據

上一篇:403錯誤-ApacheVirtualHost-Ubuntu22.04

下一篇:SQL語言基礎

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more