主頁 >  其他 > 大資料Hadoop之——Flink DataStream API 和 DataSet API

大資料Hadoop之——Flink DataStream API 和 DataSet API

2022-02-28 07:27:04 其他

文章目錄

    • 一、DataStream API概述
    • 二、什么是DataStream ?
    • 三、DataStream 資料處理程序
      • 1)Data Sources(資料源)
        • 1、Data Sources 原理
        • 2、Data Sources 實作方式
          • 1)基于檔案
          • 2)基于套接字
          • 3)基于集合
          • 4)自定義
      • 2)DataStream Transformations(資料流轉換//處理/算子)
        • 1、資料流轉換
        • 2、物理磁區
        • 3、算子鏈和資源組
      • 3)Data Sinks(資料輸出)
        • 旁路輸出(分流)
      • 2)Flink 程式剖析(scala)
        • 1、 獲取一個執行環境(execution environment)
        • 2、加載/創建初始資料
        • 3、指定資料相關的轉換
        • 4、指定計算結果的存盤位置
        • 5、觸發程式執行
    • 四、什么是DataSet?
    • 五、DataSet 資料處理程序
      • 1)Data Sources (資料源)
        • 1、基于檔案
        • 2、基于集合
        • 3、通用型
      • 2)DataSet Transformations(資料集轉換//處理/算子)
      • 3)Data Sinks(資料輸出)

一、DataStream API概述

Flink 中的 DataStream 程式是對資料流(例如過濾、更新狀態、定義視窗、聚合)進行轉換的常規程式,資料流的起始是從各種源(例如訊息佇列、套接字流、檔案)創建的,結果通過 sink 回傳,例如可以將資料寫入檔案或標準輸出(例如命令列終端),Flink 程式可以在各種背景關系中運行,可以獨立運行,也可以嵌入到其它程式中,任務執行可以運行在本地 JVM 中,也可以運行在多臺機器的集群上,

二、什么是DataStream ?

  • DataStream API 得名于特殊的 DataStream 類,該類用于表示 Flink 程式中的資料集合,你可以認為 它們是可以包含重復項的不可變資料集合,這些資料可以是有界(有限)的,也可以是無界(無限)的,但用于處理它們的API是相同的,
  • DataStream 在用法上類似于常規的 Java 集合,但在某些關鍵方面卻大不相同,它們是不可變的,這意味著一旦它們被創建,你就不能添加或洗掉元素,你也不能簡單地察看內部元素,而只能使用 DataStream API 操作來處理它們,DataStream API 操作也叫作轉換(transformation),
  • 你可以通過在 Flink 程式中添加 source 創建一個初始的 DataStream,然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流連接在一起,

三、DataStream 資料處理程序

在這里插入圖片描述

1)Data Sources(資料源)

1、Data Sources 原理

官方檔案
一個資料 source 包括三個核心組件:分片(Splits)分片列舉器(SplitEnumerator) 以及 源閱讀器(SourceReader)

  • 分片(Split) 是對一部分 source 資料的包裝,如一個檔案或者日志磁區,分片是 source 進行任務分配和資料并行讀取的基本粒度,

  • 源閱讀器(SourceReader) 會請求分片并進行處理,例如讀取分片所表示的檔案或日志磁區,SourceReader 在 TaskManagers 上的 SourceOperators 并行運行,并產生并行的事件流/記錄流,

  • 分片列舉器(SplitEnumerator) 會生成分片并將它們分配給 SourceReader,該組件在 JobManager 上以單并行度運行,負責對未分配的分片進行維護,并以均衡的方式將其分配給 reader,SplitEnumerator 被認為是整個 Source 的“大腦”,

在這里插入圖片描述

2、Data Sources 實作方式

1)基于檔案

Source 是你的程式從中讀取其輸入的地方,你可以用 StreamExecutionEnvironment.addSource(sourceFunction)將一個 source 關聯到你的程式,Flink 自帶了許多預先實作的 source functions,不過你仍然可以通過實作 SourceFunction 介面撰寫自定義的非并行 source,也可以通過實作 ParallelSourceFunction 介面或者繼承 RichParallelSourceFunction 類撰寫自定義的并行 sources, 通過 StreamExecutionEnvironment 可以訪問多種預定義的 stream source,source 連接器,請查看連接器檔案,

  • readTextFile(path):讀取文本檔案,
  • readFile(fileInputFormat, path) - 按照指定的檔案輸入格式讀取(一次)檔案,
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) :這是前兩個方法內部呼叫的方法,它基于給定的 fileInputFormat 讀取路徑 path 上的檔案,根據提供的 watchType 的不同,source 可能定期(每 interval 毫秒)監控路徑上的新資料(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當前路徑中的資料然后退出(watchType 為 FileProcessingMode.PROCESS_ONCE),使用 pathFilter,用戶可以進一步排除正在處理的檔案,
2)基于套接字
  • socketTextStream:套接字讀取,元素可以由分隔符分隔,
3)基于集合
  • fromCollection(Collection) :從 Java Java.util.Collection 創建資料流,集合中的所有元素必須屬于同一型別,

  • fromCollection(Iterator, Class) :從迭代器創建資料流,class 引數指定迭代器回傳元素的資料型別,

  • fromElements(T ...) :從給定的物件序列中創建資料流,所有的物件必須屬于同一型別,

  • fromParallelCollection(SplittableIterator, Class) :從迭代器并行創建資料流,class 引數指定迭代器回傳元素的資料型別,

  • generateSequence(from, to) :基于給定間隔內的數字序列并行生成資料流,

4)自定義
  • addSource:關聯一個新的 source function,例如,你可以使用 addSource(new FlinkKafkaConsumer<>(…)) 來從 Apache Kafka 獲取資料,更多詳細資訊見連接器,

2)DataStream Transformations(資料流轉換//處理/算子)

【溫馨提示】是用戶通過算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程式中可以將多個資料轉換算子合并成一個復雜的資料流拓撲,這部分內容將描述 Flink DataStream API 中基本的資料轉換API,資料轉換后各種資料磁區方式,以及算子的鏈接策略,

官方檔案

1、資料流轉換

算子資料轉換解釋示例
MapDataStream → DataStream獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式dataStream.map { x => x * 2 }
FlatMapDataStream → DataStream獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式dataStream.flatMap { str => str.split(" ") }
FilterDataStream → DataStream為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器dataStream.filter { _ != 0 }
KeyByDataStream → KeyedStream在邏輯上將流劃分為不相交的磁區,具有相同密鑰的所有記錄都被分配到同一磁區,在內部,keyBy()是通過哈希磁區實作的,類似于mysql里面的group by,有不同的方法來指定鍵dataStream.keyBy(.someKey)
dataStream.keyBy(
._1)
ReduceKeyedStream → DataStream鍵控資料流上的“滾動”減少,將當前元素與上次減少的值合并,并發出新值,創建部分和流的reduce函式keyedStream.reduce { _ + _ }
WindowKeyedStream → WindowedStream可以在已磁區的KeyedStreams上定義視窗,Windows根據某些特征(例如,在過去5秒內到達的資料)對每個鍵中的資料進行分組,有關windows的完整說明,請參見windows,dataStream
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAllDataStream → AllWindowedStream可以在常規資料流上定義視窗,Windows根據某些特征(例如,過去5秒內到達的資料)對所有流事件進行分組,有關windows的完整說明,請參見windows,dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window ApplyWindowedStream → DataStream ;AllWindowedStream → DataStream將常規功能應用于整個視窗,下面是一個手動求和視窗元素的函式,如果使用的是windowAll轉換,則需要使用AllWindowFunctionwindowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
WindowReduceWindowedStream → DataStream將reduce函式應用于視窗并回傳減少的值,windowedStream.reduce { _ + _ }
UnionDataStream* → DataStream兩個或多個資料流的合并,創建一個包含所有流中所有元素的新流,注意:如果將一個資料流與其自身合并,則在結果流中會得到兩次每個元素,dataStream.union(otherStream1, otherStream2, …);
Window JoinDataStream,DataStream → DataStream在給定的密鑰和公共視窗上連接兩個資料流,dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { … }
Interval JoinKeyedStream,KeyedStream → DataStream在給定的時間間隔內,將兩個密鑰流的兩個元素e1和e2與一個公共密鑰連接,因此 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
// lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {…})
Window CoGroupDataStream,DataStream → DataStream在給定的鍵和公共視窗上對兩個資料流進行協組,dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
ConnectDataStream,DataStream → ConnectedStream“連接”兩個保持其型別的資料流,連接允許兩個流之間的共享狀態,someStream : DataStream[Int] = …
otherStream : DataStream[String] = …
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMapConnectedStream → DataStream類似于連接資料流上的map和flatMapconnectedStreams.map(
(_ : Int) => true,
(_ : String) => false)
)

connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
IterateDataStream → IterativeStream → ConnectedStream通過將一個運算子的輸出重定向到前一個運算子,在流中創建一個“反饋”回圈,這對于定義不斷更新模型的演算法特別有用,下面的代碼從一個流開始,并連續地應用迭代體,大于0的元素被發送回反饋通道,其余的元素被下游轉發,initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/do something/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}

2、物理磁區

Flink 也提供以下方法讓用戶根據需要在資料轉換完成后對資料磁區進行更細粒度的配置,

磁區資料轉換解釋示例
Custom PartitioningDataStream → DataStream使用用戶定義的Partitioner為每個元素選擇目標任務,dataStream.partitionCustom(partitioner, “someKey”)
dataStream.partitionCustom(partitioner, 0)
Random PartitioningDataStream → DataStream根據均勻分布隨機劃分元素,dataStream.shuffle()
RescalingDataStream → DataStream回圈地將元素磁區到下游操作的一個子集,dataStream.rescale()
BroadcastingDataStream → DataStream將元素廣播到每個磁區,

3、算子鏈和資源組

將兩個算子鏈接在一起能使得它們在同一個執行緒中執行,從而提升性能,Flink 默認會將能鏈接的算子盡可能地進行鏈接(例如, 兩個 map 轉換操作),此外, Flink 還提供了對鏈接更細粒度控制的 API 以滿足更多需求,

如果想對整個作業禁用算子鏈,可以呼叫 StreamExecutionEnvironment.disableOperatorChaining(),下列方法還提供了更細粒度的控制,需要注 意的是, 這些方法只能在 DataStream 轉換操作后才能被呼叫,因為它們只對前一次資料轉換生效,例如,可以 someStream.map(…).startNewChain() 這樣呼叫,而不能 someStream.startNewChain()這樣,

算子鏈操作解釋示例
Start New Chain開始一個新的鏈,從這個運算子開始,這兩個映射器將被鏈接,過濾器將不會鏈接到第一個映射器,someStream.filter(…).map(…).startNewChain().map(…)
Disable Chaining不要鏈接map運算子,someStream.map(…).disableChaining()
Set Slot Sharing Group設定操作的槽位共享組,Flink將把具有相同槽共享組的操作放在相同槽中,而將沒有槽共享組的操作放在其他槽中,這可以用來隔離槽,如果所有的輸入操作都在同一個槽位共享組中,則從輸入操作繼承槽位共享組,默認槽位共享組的名稱為“default”,可以通過呼叫slotSharingGroup(“default”)顯式地將操作放入該組,someStream.filter(…).slotSharingGroup(“name”)

3)Data Sinks(資料輸出)

sink 連接器,請查看連接器檔案,

Data sinks 使用 DataStream 并將它們轉發到檔案、套接字、外部系統或列印它們,Flink 自帶了多種內置的輸出格式,這些格式相關的實作封裝在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat : 將元素按行寫成字串,通過呼叫每個元素的 toString() 方法獲得字串,

  • writeAsCsv(...) / CsvOutputFormat :將元組寫成逗號分隔值檔案,行和欄位的分隔符是可配置的,每個欄位的值來自物件的 toString() 方法,

  • print() / printToErr():在標準輸出/標準錯誤流上列印每個元素的 toString() 值, 可選地,可以提供一個前綴(msg)附加到輸出,這有助于區分不同的 print 呼叫,如果并行度大于1,輸出結果將附帶輸出任務識別符號的前綴,

  • writeUsingOutputFormat() / FileOutputFormat :自定義檔案輸出的方法和基類,支持自定義 object 到 byte 的轉換,

  • writeToSocket :根據 SerializationSchema 將元素寫入套接字,

  • addSink : 呼叫自定義 sink function,Flink 捆綁了連接到其他系統(例如 Apache Kafka)的連接器,這些連接器被實作為 sink functions,

【溫馨提示】DataStream 的 write*() 方法主要用于除錯目的,它們不參與 Flink 的 checkpointing,這意味著這些函式通常具有至少有一次語意,重繪到目標系統的資料取決于 OutputFormat 的實作,這意味著并非所有發送到 OutputFormat 的元素都會立即顯示在目標系統中,此外,在失敗的情況下,這些記錄可能會丟失,

為了將流可靠地、精準一次地傳輸到檔案系統中,請使用 StreamingFileSink,此外,通過 .addSink(…) 方法呼叫的自定義實作也可以參與 Flink 的 checkpointing,以實作精準一次的語意,

旁路輸出(分流)

旁路輸出在Flink中叫作SideOutput,用途類似于DataStream#split,本質上是一個資料流的切分行為,按照條件將DataStream切分為多個子資料流,子資料流叫作旁路輸出資料流,每個旁路輸出資料流可以有自己的下游處理邏輯,

使用旁路輸出時,首先需要定義用于標識旁路輸出流的 OutputTag:

val outputTag = OutputTag[String]("side-output")

可以通過以下方法將資料發送到旁路輸出:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

【示例】

package com

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object myOutputTag {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input: DataStream[String] = env.readTextFile("flink/data/hello.txt")
    val outputTag = OutputTag[String]("side-output")

    val mainDataStream = input
      .process(new ProcessFunction[String, String] {
        override def processElement(
                                     value: String,
                                     ctx: ProcessFunction[String, String]#Context,
                                     out: Collector[String]): Unit = {
          // 發送資料到主要的輸出
          out.collect(value)
          // 發送資料到旁路輸出
          ctx.output(outputTag, "sideout-" + value)
        }
      })
    
    // 獲取outputTag并輸出
    mainDataStream.getSideOutput(outputTag).print()
    // 必須呼叫execute或者executeAsync(),下面會講
    env.execute("test OutputTag")
  }

}

在這里插入圖片描述


【問題】Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream

【解決】在pom.xml添加下面依賴

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-compress</artifactId>
	<version>1.21</version>
</dependency>

2)Flink 程式剖析(scala)

Flink 程式看起來像一個轉換 DataStream 的常規程式,每個程式由相同的基本部分組成:

  1. 獲取一個執行環境(execution environment);
  2. 加載/創建初始資料;
  3. 指定資料相關的轉換;
  4. 指定計算結果的存盤位置;
  5. 觸發程式執行,

1、 獲取一個執行環境(execution environment)

val env = StreamExecutionEnvironment.getExecutionEnvironment

2、加載/創建初始資料

為了指定 data sources,執行環境提供了一些方法,支持使用各種方法從檔案中讀取資料:你可以直接逐行讀取資料,像讀 CSV 檔案一樣,或使用任何第三方提供的 source,下面是將一個文本檔案作為一個行的序列來讀,

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 加載資料源
val input: DataStream[String] = env.readTextFile("file:///path/to/file")

3、指定資料相關的轉換

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("file:///path/to/file")

// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }

4、指定計算結果的存盤位置

一旦你有了包含最終結果的 DataStream,你就可以通過創建 sink 把它寫到外部系統,下面是一些用于創建 sink 的示例方法:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/source")

// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }

// 存盤到檔案,當然還可以執行更多的sink
// writeAsText第二個引數來定義輸出模式,它有以下兩個可選值:
// WriteMode.NO_OVERWRITE:當指定路徑上不存在任何檔案時,才執行寫出操作;
// WriteMode.OVERWRITE:不論指定路徑上是否存在檔案,都執行寫出操作;如果原來已有檔案,則進行覆寫,
mapped.writeAsText("flink/data/sink", FileSystem.WriteMode.OVERWRITE)

5、觸發程式執行

  • 一旦指定了完整的程式,需要呼叫 StreamExecutionEnvironmentexecute()方法來觸發程式執行,根據 ExecutionEnvironment 的型別,執行會在你的本地機器上觸發,或將你的程式提交到某個集群上執行,execute() 方法將等待作業完成,然后回傳一個 JobExecutionResult,其中包含執行時間和累加器結果,

  • 如果不想等待作業完成,可以通過呼叫 StreamExecutionEnvironment 的 executeAsync() 方法來觸發作業異步執行,它會回傳一個 JobClient,你可以通過它與剛剛提交的作業進行通信,如下是使用 executeAsync() 實作 execute() 語意的示例,

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

完整示例程式(官網示例)

【問題一】

【溫馨提示】如果出現這種報錯,一般就是IDEA 對scope為provided,這是IDEA的bug:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo

【解決】

  • 【第一種方式】把依賴范圍調大或者直接去掉都行,不清楚的可以看我之前的Java-Maven詳解,但是記住在打包的時候得加上,
  • 【第二種方式】Run->Edit Configurations,設定如下:
    在這里插入圖片描述
    【問題二】

【問題】Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.commons.math3.stat.descriptive.rank.Percentile.withNaNStrategy(Lorg/apache/commons/math3/stat/ranking/NaNStrategy;)Lorg/apache/commons/math3/stat/descriptive/rank/Percentile;hadoop-common中的commons-math3沖突導致,

【解決】排除hadoop-common中的commons-math3,設定如此:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.1</version>
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
        </exclusion>
    </exclusions>
</dependency>

先啟動服務

$ nc -lk 9999

WindowWordCount原始碼如下:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

在這里插入圖片描述

四、什么是DataSet?

Flink用DataStream 表示無界資料集,用DataSet表示有界資料集,前者用于流處理應用程式,后者用于批處理應用程式,從操作形式上看,DataStream 和 DataSet 與集合 Collection 有些相似,但兩者有著本質的區別:

  • DataStream 和 DataSet 是不可變的資料集合,因此不可以想操作集合那樣增加或者洗掉 DataStream 和 DataSet 中的元素,也不可以通過諸如下標等方式訪問某個元素,
  • Flink 應用程式通過 Source 創建 DataStream 物件和 DataSet 物件,通過轉換操作產生新的 DataStream 物件和 DataSet 物件,
  • 運行時是應用程式被調度執行時的背景關系環境,通過StreamExecutionEnvironmentExecutionEnvironment方法會根據當前環境自動選擇本地或者集群運行時環境,

五、DataSet 資料處理程序

在這里插入圖片描述

1)Data Sources (資料源)

資料源創建初始資料集,比如從檔案或Java集合創建資料集,創建資料集的一般機制抽象在InputFormat后面,Flink提供了幾種內置格式,可以從常見的檔案格式創建資料集,它們中的許多在ExecutionEnvironment上都有快捷方法,

官方檔案

1、基于檔案

  • readTextFile(path) / TextInputFormat :讀取文本檔案,
  • readTextFileWithValue(path) / TextValueInputFormat: 讀取檔案,并將它們作為StringValues回傳,StringValues是可變字串,
  • readCsvFile(path) / CsvInputFormat:決議帶有逗號(或其他字符)分隔欄位的檔案,回傳由元組或pojo組成的資料集,支持基本java型別及其對應值作為欄位型別,
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat:決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,
  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat:使用給定的分隔符決議以新行(或另一個字符序列)分隔的原始資料型別(如String或Integer)的檔案,

2、基于集合

  • fromCollection(Collection):從Java.util.Collection創建一個資料集,集合中的所有元素必須具有相同的型別,
  • fromCollection(Iterator, Class):從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,
  • fromElements(T …):根據給定的物件序列創建一個資料集,所有物件必須是相同的型別,
  • fromParallelCollection(SplittableIterator, Class):并行地從迭代器創建資料集,該類指定迭代器回傳的元素的資料型別,
  • generateSequence(from, to):并行生成給定區間內的數字序列,

3、通用型

  • readFile(inputFormat, path) / FileInputFormat :接受檔案輸入格式,
  • createInput(inputFormat) / InputFormat:接受通用輸入格式,

2)DataSet Transformations(資料集轉換//處理/算子)

資料轉換將一個或多個資料集轉換為新的資料集,程式可以將多個轉換組合成復雜的程式集,

算子解釋示例
Map獲取一個元素并生成一個元素,將輸入流的值加倍的映射函式,data.map { x => x.toInt }
FlatMap獲取一個元素并生成零個、一個或多個元素,將句子拆分為單詞的flatmap函式,data.flatMap { str => str.split(" ") }
MapPartition在單個函式呼叫中轉換并行磁區,該函式以Iterable流的形式獲取磁區,并可以生成任意數量的結果值,每個磁區中的元素數量取決于并行度和之前的操作,data.mapPartition { in => in map { (_, 1) } }
Filter為每個元素計算布爾函式,并保留該函式回傳true的元素,過濾掉零值的過濾器,data.filter { _ > 1000 }
Reduce通過重復地將兩個元素組合成一個元素,將一組元素組合成一個元素,Reduce可以應用于完整的資料集或分組的資料集,data.reduce { _ + _ }
ReduceGroup將一組元素組合成一個或多個元素,ReduceGroup可以應用于完整的資料集,也可以應用于分組的資料集,data.reduceGroup { elements => elements.sum }
Aggregate將一組值聚合為一個值,聚合函式可以看作是內置的reduce函式,聚合可以應用于完整的資料集,也可以應用于分組的資料集,val input: DataSet[(Int, String, Double)] = // […]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
Distinct回傳資料集的不同元素,對于元素的所有欄位或欄位的子集,它將從輸入資料集中洗掉重復的條目,data.distinct()
Join通過創建鍵值相等的所有元素對來連接兩個資料集,可選地使用JoinFunction將這對元素轉換為單個元素,或使用FlatJoinFunction將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵,val result = input1.join(input2).where(0).equalTo(1)
OuterJoin對兩個資料集執行左、右或完全外部連接,外部連接類似于常規(內部)連接,它創建的所有元素對的鍵值相等,此外,如果在另一側沒有找到匹配的鍵,則保存外部的記錄(如果是完整的,則為左、右或兩者),匹配的元素對(或一個元素和另一個輸入的空值)被賦給一個JoinFunction以將這對元素轉換為單個元素,或者賦給一個FlatJoinFunction以將這對元素轉換為任意多個(包括沒有)元素,參見鍵部分了解如何定義連接鍵,val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
(left, right) =>
val a = if (left == null) “none” else left._1
(a, right)
}
CoGroup簡化運算的二維變體,對一個或多個欄位上的每個輸入進行分組,然后合并組,每對組呼叫一個變換函式,請參閱鍵部分以了解如何定義coGroup鍵,data1.coGroup(data2).where(0).equalTo(1)
Cross構建兩個輸入的笛卡爾積(叉積),創建所有的元素對,可選地使用CrossFunction將這對元素轉換為單個元素,val data1: DataSet[Int] = // […]
val data2: DataSet[String] = // […]
val result: DataSet[(Int, String)] = data1.cross(data2)
Union生成兩個資料集的并集,data.union(data2)
Rebalance均勻地重新平衡資料集的并行磁區,以消除資料傾斜,只有類似map的轉換可以遵循rebalance轉換,val data1: DataSet[Int] = // […]
val result: DataSet[(Int, String)] = data1.rebalance().map(…)
Hash-Partition哈希磁區一個給定鍵的資料集,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式,val in: DataSet[(Int, String)] = // […]
val result = in.partitionByHash(0).mapPartition { … }
Range-Partition根據給定的鍵對資料集進行范圍磁區,鍵可以指定為位置鍵、運算式鍵和鍵選擇器函式,val in: DataSet[(Int, String)] = // […]
val result = in.partitionByRange(0).mapPartition { … }
Custom Partitioning使用自定義Partitioner函式,根據鍵將記錄分配到特定的磁區,該鍵可以指定為位置鍵、運算式鍵和選擇鍵函式,注意:此方法只適用于單個欄位鍵,val in: DataSet[(Int, String)] = // […]
val result = in
.partitionCustom(partitioner, key).mapPartition { … }
Sort Partitioning按照指定的順序在本地對指定欄位上的資料集的所有磁區進行排序,欄位可以指定為元組位置或欄位運算式,對多個欄位進行排序是通過鏈接sortPartition()呼叫來完成的,val in: DataSet[(Int, String)] = // […]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { … }
First-N回傳資料集的前n個(任意的)元素,First-n可以應用于常規資料集、分組資料集或分組排序資料集,分組鍵可以指定為鍵選擇器函式或欄位位置鍵,val in: DataSet[(Int, String)] = // […]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
MinBy / MaxBy從一個或多個欄位值為最小(最大值)的元組中選擇一個元組,用于比較的欄位必須是有效的關鍵欄位,即可比性,如果多個元組具有最小(最大)欄位值,則回傳這些元組中的任意一個元組,MinBy (MaxBy)可以應用于完整的資料集或分組的資料集,val in: DataSet[(Int, Double, String)] = // […]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2).minBy(1)
Specifying Keys一些轉換(join、coGroup、groupBy)要求在元素集合上定義鍵,其他轉換(Reduce、groureduce、Aggregate)允許在應用資料之前對資料進行分組,DataSet<…> input = // […]
DataSet<…> reduced = input
.groupBy(/define key here/)
.reduceGroup(/do something/);
Define keys for Tuples最簡單的情況是在元組的一個或多個欄位上分組元組,val input: DataSet[(Int, String, Long)] = // […]
val keyed = input.groupBy(0)

//val input: DataSet[(Int, String, Long)] = // […]
val grouped = input.groupBy(0,1)

3)Data Sinks(資料輸出)

資料接收器使用資料集,并用于存盤或回傳它們,使用OutputFormat描述資料接收器操作,Flink提供了多種內置的輸出格式,這些格式封裝在DataSet上的操作后面:

  • writeAsText() / TextOutputFormat:按行方式將元素寫入字串,字串是通過呼叫每個元素的toString()方法獲得的,
  • writeAsFormattedText() / TextOutputFormat:將元素按行撰寫為字串,字串是通過為每個元素呼叫用戶定義的format()方法獲得的,
  • writeAsCsv(…) / CsvOutputFormat:將元組寫入逗號分隔的值檔案,行和欄位分隔符是可配置的,每個欄位的值來自物件的toString()方法,print() / printToErr() / print(String msg) / printToErr(String msg) -列印出標準輸出/標準錯誤流中每個元素的toString()值,可選地,可以提供一個前綴(msg),作為輸出的前綴,這有助于區分不同的列印呼叫,如果并行度大于1,輸出也會被添加產生輸出的任務的識別符號,
  • write() / FileOutputFormat:方法和基類用于自定義檔案輸出,支持自定義物件到位元組的轉換,
  • output()/ OutputFormat:大多數通用輸出方法,用于非基于檔案的資料接收器(例如將結果存盤在資料庫中),

一個資料集可以被輸入到多個操作,程式可以寫或列印一個資料集,同時在它們上運行額外的轉換,

【示例】

package com

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object DataSetTest001 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // text data
    val textData: DataSet[String] = env.readTextFile("flink/data/s1")

    // write DataSet to a file on the local file system
//    textData.writeAsText("flink/data/sink01")


    // write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
    // 先創建目錄:hadoop fs -mkdir -p hdfs://hadoop-node1:8082/flink/DataSet/
    // 操作添加依賴
    /*<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.3.1</version>
      <scope>provided</scope>
    </dependency>*/

    textData.writeAsText("hdfs://hadoop-node1:8082/flink/DataSet/sink02")

//
//    // write DataSet to a file and overwrite the file if it exists
//    textData.writeAsText("flink/data/sink03", WriteMode.OVERWRITE)
//
//    // tuples as lines with pipe as the separator "a|b|c"
//    val values: DataSet[(String, Int, Double)] = // [...]
//    values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
//
//    // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
//    values.writeAsText("file:///path/to/the/result/file")

    // this writes values as strings using a user-defined formatting
//    values map { tuple => tuple._1 + " - " + tuple._2 }
//      .writeAsText("file:///path/to/the/result/file")

    env.execute("dataset test")
  }
}

在這里插入圖片描述

【示例】WordCount

package com

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

在這里插入圖片描述
未完待續,更多大資料知識,請耐心等待~

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/433385.html

標籤:其他

上一篇:R語言使用dim函式查看資料維度、例如、使用dim函式查看dataframe資料有多少行多少列

下一篇:Kafka網路模型之Producer

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more