主頁 > 資料庫 > Flink-5-DataStreamAPI

Flink-5-DataStreamAPI

2020-12-03 07:52:40 資料庫

第5章 DataStreamAPI

參考書籍

  • Stream Processing with Apache Flinkhttps://www.oreilly.com/library/view/stream-processing-with/9781491974285/

  • 《基于Apache Flink的流處理》https://book.douban.com/subject/34912177/

注:本文主要是針對《基于Apache Flink的流處理》的筆記

1-8章筆記下載地址

本章介紹了Flink的DataStream API的基礎知識,我們將展示常用的Flink流應用程式的結構和組件,討論Flink的型別系統和支持的資料型別,并介紹資料轉換(data transformation)和磁區轉換(partitioning transformation),讀完這一章,你將知道如何實作一個具有基本功能的流處理應用程式

首先舉一個簡單的例子作為開始

/** 定義一個case class作為傳感器讀取資料的資料型別*/
/** Case class to hold the SensorReading data. */
case class SensorReading(id: String, timestamp: Long, temperature: Double)

/** 放主函式的object*/
/** Object that defines the DataStream program in the main() method */
object AverageSensorReadings {
    
  /** main() defines and executes the DataStream program */
  def main(args: Array[String]) {

    // 設定流式執行環境  
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
	
    // 在應用中使用事件時間  
    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    // 配置水位線
    // configure watermark interval
    env.getConfig.setAutoWatermarkInterval(1000L)

    // 從流式資料源中創建DataStream[SensorReading]物件  
    // ingest sensor stream
    val sensorData: DataStream[SensorReading] = env
      // 添加傳感器Source
      // SensorSource generates random temperature readings
      .addSource(new SensorSource)
      // 設定時間戳和水位線
      // assign timestamps and watermarks which are required for event time
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    val avgTemp: DataStream[SensorReading] = sensorData
      // 將溫度從華氏溫度轉換為攝氏溫度
      // convert Fahrenheit to Celsius using an inlined map function
      .map( r =>
      SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
      // 根據傳感器id來分組資料
      // organize stream by sensorId
      .keyBy(_.id)
      // 按照1秒的滾動視窗分組
      // group readings in 1 second windows
      .timeWindow(Time.seconds(1))
      // 使用用戶自定義函式來計算平均溫度
      // compute average temperature using a user-defined function
      .apply(new TemperatureAverager)

    // 列印到控制臺  
    // print result stream to standard out
    avgTemp.print()

    // 開始執行應用  
    // execute application
    env.execute("Compute average sensor temperature")
  }
}

構建一個典型的Flink流式程式需要以下幾步

  1. 設定執行環境
  2. 資料源中讀取一潭訓多條流
  3. 通過一系列流式轉換來實作應用邏輯
  4. 選擇性地將結果輸出到一個或多個資料匯
  5. 執行程式

5.1.1 設定執行環境

Flink應用程式需要做的第一件事是設定它的執行環境,執行環境確定程式是在本地機器上運行還是在集群上運行,在DataStream API中,應用程式的執行環境由StreamExecutionEnvironment表示,

有兩種設定執行環境的方式

  1. 呼叫靜態getExecutionEnvironment()方法來檢索執行環境,此方法回傳本地或遠程環境,具體取決于呼叫該方法的背景關系,如果通過連接到遠程集群從提交客戶端呼叫該方法,則回傳遠程執行環境,否則,它回傳一個本地環境,

  2. 也可以通過createxxx方法來顯式設定執行環境,具體代碼如下

    // 創建一個本地的流式執行環境
    val localEnv = StreamExecutionEnvironment.createLocalEnvironment()
    // 創建一個遠程的流式執行環境
    val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
    	"host", 				// JobManager的主機名
        1234,					// JobManager的埠號
        "path/to/jarFile.jar"	// 需要傳輸到JobManager的JAR包
    )
    

執行環境還提供了很多配置選項,比如

  1. 通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)指定當前應用使用事件時間語意
  2. 設定并行度
  3. 啟動容錯

5.1.2 讀取輸入流

StreamExecutionEnvironment提供了一系列創建流式資料源方法,用來將資料流讀取到應用中,這些資料流的來源可以是訊息佇列或者檔案,也可以動態生成

在實體中,讀取代碼如下

// 從流式資料源中創建DataStream[SensorReading]物件  
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
    // 添加傳感器Source
    // SensorSource generates random temperature readings
    .addSource(new SensorSource)
    // 設定時間戳和水位線
    // assign timestamps and watermarks which are required for event time
    .assignTimestampsAndWatermarks(new SensorTimeAssigner)

5.1.3 應用轉換(Apply Transformation)

當獲取到了DataStream,就可以對其應用轉換(we can apply a transformation on it),轉換的型別有很多:

  1. 有些轉換可以生成新的DataStream,并且可能是不同型別的(eg. DataStream[Int] => DataStream[String])
  2. 有些轉換不修改DataStream中的條目,而是通過磁區分組對其進行重新組織
  3. 應用程式的邏輯是通過一系列轉換定義的,

在實體中,轉換代碼如下

val avgTemp: DataStream[SensorReading] = sensorData
    // 將溫度從華氏溫度轉換為攝氏溫度
    // convert Fahrenheit to Celsius using an inlined map function
    .map( r =>
    SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
    // 根據傳感器id來分組資料
    // organize stream by sensorId
    .keyBy(_.id)
    // 按照1秒的滾動視窗分組
    // group readings in 1 second windows
    .timeWindow(Time.seconds(1))
    // 使用用戶自定義函式來計算平均溫度
    // compute average temperature using a user-defined function
    .apply(new TemperatureAverager)

5.1.4 輸出結果

流應用程式通常將其結果發送到一些外部系統(external system),如Apache Kafka、檔案系統或資料庫,Flink提供了一組流式資料匯,可用于將資料寫入不同的系統,也可以實作自己的流式資料匯,還有一些應用程式不發出結果,而是通過Flink的可查詢狀態(queryable state)功能在內部保存結果,

在我們的示例中,會將DataStream[SensorReading]中的記錄作為結果輸出,每個記錄包含傳感器在5秒內的平均溫度,通過呼叫print()將結果流寫入標準輸出:

avgTemp.print()

5.1.5 執行

當應用定義完成后,可以通過呼叫StreamExecutionEnvironment.execute()來執行它:

env.execute("Compute average sensor temperature")

Flink程式都是通過延遲計算(lazily execute)的方式執行,

  • 也就是說,那些創建資料源和轉換操作的API呼叫不會立即觸發任何實際的資料處理,
  • 相反,這些API呼叫只是在執行環境中創建一個執行計劃,該計劃包括從環境創建的流式資料源以及應用于這些資料源之上的一系列轉換
  • 只有在呼叫execute()時,系統才會觸發程式的執行

構建完成的計劃會被轉換為JobGraph提交給JobManager執行,

  • 根據執行環境的型別,系統可能需要將JobGraph發送到作為本地執行緒啟動的JobManager,或將JobGraph發送到遠程JobManager,
  • 如果JobManager遠程運行,除了JobGraph之外,我們還需要提供一個包含應用程式的所有類和所需依賴項的JAR檔案

5.2 轉換操作

在本節中,我們將概述DataStream API中的基本轉換

  • 流式轉換以一個或多個資料流作為輸入,并將它們轉換為一個或多個輸出流,
  • 撰寫一個DataStream API程式本質上可以歸結為:通過組合不同的轉換創建一個滿足應用邏輯Dataflow圖

大多數流式轉換都基于用戶自定義的函式來完成,這些函式封裝了用戶的邏輯,指定了如何將輸入流的元素轉換為輸出流的元素,函式可以通過實作某個特定轉換的介面類來定義,例如下面的MapFunction

class MyMapFunction extends MapFunction[Int, Int] {
    override def map(value: Int): Int = value + 1
}

DataStream API為那些最常見的資料轉換操作都提供了對應的轉換抽象,我們將DataStreamAPI的轉換分為四類

  1. 作用于單個事件基本轉換
  2. 針對相同鍵值事件的KeyedStream轉換
  3. 將多條資料流合并為一潭訓將一條資料流拆分成多條流的轉換
  4. 對流中的事件進行重新組織分發轉換

5.2.1 基本轉換

基本轉換單獨處理每個事件,這意味著每個輸出記錄都是由單個輸入記錄生成的,常見的基本轉換函式有:簡單的值轉換、記錄拆分或過濾等,

5.2.1.1 Map

通過呼叫DataStream.map()方法可以指定map轉換產生一個新的DataStream,它將每個輸入事件傳遞給用戶自定義的映射器(user-defined mapper),映射器回傳一個輸出事件,這個輸出事件可能是不同型別的(eg, DataStream[Int] => DataStream[String]),圖5-1顯示了將每個正方形轉換為圓形的map轉換,

MapFunction的兩個型別引數分別是是輸入事件的型別和輸出事件的型別,MapFunctionmap()方法將每個輸入事件準確地轉換為一個輸出事件:

// T: 輸入元素的型別
// O: 輸出元素的型別
MapFunction[T,O] 
	> map(T): O

下面舉一個簡單的例子

val sensorIds: DataStream[String] = reading.map(new MyMapFunction)

class MyMapFunction extends MapFunction[SensorReading, String] {
    override def map(r: SensorReading): String = r.id
}

也可以用Lambda運算式進一步簡化

val sensorIds: DataStream[String] = reading.map(r => r.id)
5.2.1.2 Filter

fliter轉換通過一個回傳值為Boolean型別的函式決定事件的去留

  • 如果回傳值為true,那么它會保留輸入事件并且將其轉發到輸出,
  • 否則它會把事件丟棄,
  • 通過呼叫DataStream.filter()方法可以指定過濾器轉換,并生成與輸入DataStream相同型別的輸出DataStream,
  • 圖5-2顯示了一個只保留白色方塊的過濾操作,

FilterFunction型別引數輸入流的型別,它的filter()方法接收一個輸入事件回傳一個布林值:

FilterFunction[T]
	> filter(T): Boolean

下面舉個簡單的例子

var filteredSensors = readings.filter(r => r.temperature >= 25)
5.2.1.3 FlatMap

flatMap轉換與map類似,但是它可以為每個輸入事件生成零個一個多個 輸出事件

圖5-3顯示了一個基于傳入事件的顏色區分其輸出的flatMap操作,

  • 如果輸入是白色方塊,則不加改動直接輸出,
  • 將黑色方塊復制,
  • 將灰色方塊丟棄掉,

flatMap函式定義如下,可以通過向Collector物件傳遞資料的方式來回傳零個、一個或多個事件作為結果

// T: 輸入元素的型別
// O: 輸出元素的型別
FlatMapFunction[T, O]
	// 回傳值為Unit,也就是不回傳
	// Collector[O]作為輸出引數
	> flatMap(T, Collector[O]): Unit

flatMap函式還可以如下定義

FlatMapFunction[T, O]
	> flatMap(T):  TraversableOnce[O]

下面舉一個簡單的例子

val words = sensorData.flatMap(r => r.id.split(" "))

5.2.2 基于KeyedStream的轉換

KeyedStream抽象可以從邏輯上將事件按照鍵值分配到多個獨立的事件子流中,

KeyedStream可以根據鍵來維護內部狀態,所有具有相同鍵的事件可以訪問相同的狀態,

接下來先介紹keyBy轉換,它可以將要一個DataStream轉換為一個KeyedStream,然后介紹滾動聚合Reduce,它們可以作用在KeyedStream

5.2.2.1 keyBy

keyBy轉換通過將DataStream轉換為KeyedStream,資料流中的事件會根據不同的鍵被分配到不同的磁區(partition),具有相同鍵的所有事件都由下游算子同一個任務處理,

我們假設以輸入事件的顏色作為鍵,圖5-4將黑色事件分配給一個磁區,將所有其他事件分配給另一個磁區,

keyBy可以用多種方式來設定如何分類,如下所示

下面舉一個keyBy的例子

val readings: DataStream[SensorReading] = ...
val keyed: KeyedStream[SensorReading, String] = readings.keyBy(r => r.id)
5.2.2.2 滾動聚合

滾動聚合 應用于KeyedStream上,它生成一個包含聚合結果(如求和、最小值和最大值)的DataStream,

  • 滾動聚合運算子為每個鍵保存一個聚合值
  • 對于每個輸入事件,算子更新相應的聚合值,并將更新后的值作為輸出事件發送給下游
  • 滾動聚合操作需要接收一個用于指定聚合目標欄位的引數,該引數指定在哪個欄位上計算聚合,

DataStream API提供了以下滾動聚合方法:

名稱 描述
sum() 滾動計算輸入流在指定欄位上的
max() 滾動計算輸入流在指定欄位上的最大值
min() 滾動計算輸入流在指定欄位上的最小值
minBy() 滾動計算輸入流中迄今為止最小值,回傳該值所在事件
maxBy() 滾動計算輸入流中起勁為止最大值,回傳該值所在事件

注意:不能將多個滾動聚合方法組合使用,每次只能計算一個

例子:對一個Tuple3[Int, Int, Int]型別的資料在第一個欄位上按照鍵值分組,然后滾動計算第二個欄位的和

val inputStream: DataStream[(Int, Int, Int)] = env.fromElements(
    (1, 2, 2),
    (2, 3, 1),
    (2, 2, 4),
    (1, 5, 3))

val resultStream: DataStream[(Int, Int, Int)] = inputStream
	.keyBy(0)
	.sum(1)

"""output
(1, 2, 2)
(2, 3, 1)
(2, 5, 1)
(1, 7, 2)
第一個欄位是分組,第二個欄位是計算之后的和,第三個欄位沒有意義
"""
5.2.2.3 Reduce

reduce轉換是滾動聚合一般化(generalization),

  • 它在KeyedStream上應用了一個ReduceFunction,該函式將每個輸入事件當前的reduce結果進行一次組合,并輸出一個DataStream,
  • reduce不會改變DataStream的型別,輸出流的型別與輸入流的型別相同,

ReduceFunction介面定義如下

// T: 元素型別
ReduceFunction[T]
	> reduce(T, T): T

下面舉一個reduce轉換的例子,在下面的例子中,資料流是會以語言型別作為鍵來進行磁區,最終結果是針對每個語言產生一個不斷更新的單詞串列:

val inputStream: DataStream[(String, List[String])] = env.fromElements(
    ("en", List("tea")),
    ("fr", List("vin")),
    ("en", List("cake")))

val resultStream: DataStream[(String, List[String])] = inputstream
	.keyBy(0)
	.reduce((x, y) => (x._1, x._2 ::: y._2))

"""output
("en", List("tea"))
("fr", List("vin"))
("en", List("tea", "cake"))
"""

5.2.3 多流轉換

許多應用需要將多個輸入流聯合起來處理,還有一些應用需要將一條流分割成多條子流以應用不同的邏輯,下面,我們將討論那些同時處理多個輸入流或產生多個輸出流的DataStream API轉換,

5.2.3.1 Union

DataStream.union()方法可以合并兩個或多個相同型別DataStream,并生成一個新的型別相同的DataStream

圖5-5顯示了一個union操作,它將黑色和灰色事件合并到單個輸出流中,

union執行程序中,來自兩條流的事件會以FIFO的方式合并,其順序無法保證(The operator does not
produce a specific order of events.),此外,union運算子不會對資料進行去重,每個輸入事件都被發送到下游,

下面舉個把三條資料流合并為一條的例子

val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream,union(tokyoStream, rioStream)
5.2.3.2 Connect,coMap,coFlatMap

考慮這樣一個應用,它監視森林區域,并在發生火災的風險很高時發出警報,應用從溫度傳感器和煙感傳感器上接收資料,當溫度超過給定的閾值 并且 煙霧水平很高時,應用程式會發出火災警報,這時,為了判斷兩者是否同時成立,我們需要合并兩條流來根據兩條流的資訊來綜合判斷

由此可見,合并兩個流的事件是流處理中非常常見的需求,下面來看看相關的API

DataStream.connect()方法接收一個DataStream并回傳一個ConnectedStreams物件,該物件表示兩個聯結在一起的流:

val first: DataStream[Int] = ...
val second: DataStream[String] = ...

val connected: ConnectedStreams[Int, String] = first.connect(second)

ConnectedStreams物件提供了map()和flatMap(),具體用法略

默認情況下,connect()不會在兩個流的事件之間 建立關系,因此兩個流的事件被隨機分配給算子任務,這種行為會產生不確定的結果,通常是不希望看到的,為了在ConnectedStreams上產生確定性結果,可以將connect()與keyBy()或broadcast()結合使用,

  • 當使用keyBy()時,connect()轉換會將兩條資料流中具有相同鍵的事件發送到同一個算子任務
  • 而當使用broadcast()時,兩條流中有一條被廣播,它的事件被分發給下游算子的所有任務上,這樣可以保證聯合處理這兩個輸入流的元素,
5.2.3.1 Split和Select

split是union的逆操作,它將輸入流 分割為與輸入流相同型別的兩個或多個輸出流每個輸入事件可以被發送給零個一個多個 輸出流,因此,split操作還可以用于過濾復制事件,

圖5-6顯示了一個split算子,它將所有白色事件與其他事件分開,發往不同的資料流,

split()方法以一個OutputSelector函式介面作為引數,

// IN: 同DataStream的元素型別
OutputSelector[IN]
	> select(IN): Iterable[String]

每個輸入事件到來時都會呼叫OutputSelector.select()方法,并隨即回傳一個java.lang.Iterable[String],回傳的這個String串列中的每個String是這個事件所屬的輸出流的名稱,

split()方法回傳一個SplitStream物件,這個物件提供一個select()方法,通過指定輸出名稱從SplitStream中選擇一潭訓多條流,

實體5-2:將一個數字流分成一個大數字流和一個小數字流

val inputStream: DataStream[(Int, String)] = ...
val splitted: SplitStream[(Int, String)] = inputStream
	.split(t => if (t._1 > 1000) Seq("large") else Seq("small"))

val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("large", "small")

5.2.4 分發轉換

當使用DataStream API構建應用程式時,系統會根據操作語意和配置的并行度自動選擇資料磁區策略并將資料轉發到正確的目標,有時我們可能希望能夠手動選擇磁區策略,在本節中,我們將介紹DataStream中用于控制磁區策略或自定義磁區策略的方法,

下面是常見的磁區策略

名稱 描述
隨機 隨機資料交換策略由DataStream.shuffle()方法實作,該方法將事件隨機分配到下游算子的并行任務中,
輪流(Round-Robin) 輪流方法將輸入流的事件以輪流方式均勻分配給后繼任務,
重調(Rescale) rescale()也是以輪流的方式對事件進行分發,但是每個上游任務只與一部分下游任務建立發送通道,當上游任務數遠小于下游任務數時,這種方法比較好用,下圖展示了輪流和重調的區別
廣播(Broadcast) broadcast()將輸入流中的事件復制,并發送給發送給下游算子的所有并行任務,
全域(global) global()方法將輸入資料流的所有事件發送給下游運算子的第一個并行任務,必須小心使用這種磁區策略,因為將所有事件路由到同一任務可能會影回應用程式性能
自定義 如果所有預定義的磁區策略都不合適,你可以利用partitionCustom()方法來自定義磁區策略

自定義磁區例子如下

val numbers: DataStream[(Int)] = ...

numbers.partitionCustom(myPartitioner, 0)

object myPartitioner extends Partitioner[Int]{
    val r = scala.util.Random
    
    override def partition(key: Int, numPartitions: Int): Int = {
        if (key < 0) 0 else r.nextInt(numPartitions)
    }
}

5.3 設定并行度

算子的并行度可以在執行環境級別算子級別進行控制默認情況下,應用的所有算子的并行度被設定為應用執行環境的并行度,而執行環境的并行度根據應用啟動時所處的背景關系自動初始化

  • 如果應用程式在本地執行環境中運行,則將并行度設定為與CPU內核數量相等
  • 在向運行的Flink集群提交應用程式時,除非用戶顯示指定,否則環境并行度將設定為集群的默認并行度

最好將算子并行度設定為隨環境并行度變化的值而不要設定為定值,例如:假設環境并行度為x,可以設定算子并行度為y = x/2,當在本機運行時x=8, y=4,而在集群運行時x=32,y=16,這樣當運行環境變化時,算子并行度也可以隨之變化,

下面的例子演示了如何獲取環境并行度以及如何設定環境并行度

// 獲取環境并行度
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defaultParallelism = env.getParallelism

// 設定環境并行度
env.setParallelism(32)

下面的例子演示了如何設定算子的并行度

// 獲取環境并行度
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defaultParallelism = env.getParallelism

val result = env.addSource(new CustomSource)
	// 設定map的并行度為默認并行度的兩倍
	.map(new MyMapper).setParallelism(defalutParallelism * 2)
	// print資料匯的并行度固定為2
	.print().setParallelism(2)

5.4 型別

Flink DataStream應用所處理的事件以資料物件的形式存在,這些資料物件需要能夠被序列化反序列化,以通過網路發送它們,或將它們寫入狀態后端、檢查點和保存點,或從狀態后端讀取,Flink使用型別資訊(type information)的概念來表示資料型別,并為每種資料型別自動生成特定的序列化器、反序列化器和比較器

一般情況下,Flink都可以自動提取資料物件的型別資訊,但當自動提取器失效時,我們也需要手動指定型別資訊,

本節我們會討論Flink支持的型別,如何為資料型別創建型別資訊,以及當Flink無法自動推斷函式的回傳型別的時如何以提示的方式幫助型別系統,

5.4.1 支持的資料型別

Flink支持Java和Scala中可用的所有常見資料型別,可以分為以下類別

  • 原始型別
  • Java和Scala元組
  • Scala樣例類(case class)
  • POJO
  • 一些特殊型別:陣列、串列、映射、列舉等

對于POJO的解釋:如果一個類滿足如下條件,它會被Flink看作POJO

  • 是一個公有類
  • 有一個公有的無參默認建構式
  • 所有欄位都是公有的或者提供了相應的getter以及setter方法
  • 所有欄位型別都必須是Flink支持的

特殊型別的解釋:Flink支持多種特殊型別,比如

  • 原始或者物件型別的陣列;
  • Java的ArrayList、HashMap和Enum型別
  • Hadoop的Writable型別,
  • Scala的Either、Option和Try型別以及Flink內部實作的Java版本的Either型別

5.4.2 為資料型別創建型別資訊

在Flink的型別系統中,核心類是TypeInformation,它為系統生成序列化器比較器提供了必要的資訊,當應用提交執行時,Flink的型別系統嘗試為框架處理的每個資料型別自動推斷TypeInformation,因此,大多數情況下,我們都沒有必要手動指定型別資訊,但當自動推斷失靈時,就需要我們為特定資料型別手動生成TypeInformation了,

下面舉幾個生成TypeInformation的例子

// 原始型別的TypeInformation
val stringType: TypeInformation[String] = Types.STRING
// Scala元祖的TypeInformation
val tupleType: TypeInformation[(Int, Long)] = Types.TUPLE[(Int, Long)]
// case class的TypeInformation
val caseClassType: TypeInformation[Person] = Types.CASE_CLASS[Person]

5.4.3 顯式提供型別資訊

顯示提供TypeInformation的方式有兩種第一種是通過實作ResultTypeQueryable介面來擴展函式,如下面例子所示

class Tuple2ToPersonMapper extends MapFunction[(String, Int), Person] with ResultTypeQueryable[Person] {
    override def map(v: (String, Int)): Person = Person(v._1, v._2)
    
    //實作ResultTypeQueryable
    override def getProducedType: TypeInformation[Person] = Types.CASE_CLASS[Person]
}

第二種,在定義Dataflow時使用Java DataStream API中的returns()方法顯式指定某算子的回傳型別

persons = inputStream
	.map(t => new Person(t._1, t._2))
	.returns(Types.CASE_CLASS[Person])

5.5 定義鍵和參考欄位

在Flink中有很多需要使用鍵索引(key specification)和欄位參考(field reference)的地方,Flink采用各種各樣的靈活方式來定義鍵:通過元素的欄位位置來定義、通過基于字串的欄位運算式來定義、通過KeySelector函式來定義

5.5.1 欄位位置

如果資料型別是元組,則只需使用對應元組元素的欄位位置就可以定義鍵,

例如下面這個例子使用元組的第二個欄位作為輸入流的鍵值

val input: DataStream[(Int, String, Long)] = ...
val keyed = input.keyBy(1)

此外,還可以使用多個元組欄位來定義復合鍵

val keyed2 = input.keyBy(1, 2)

5.5.2 欄位運算式

另一種定義鍵和選擇欄位的方法是使用基于字串的欄位運算式,欄位運算式適用于元組、pojo和case類,

// 最簡單的欄位運算式
val keyedSensors = sensorStream.keyBy("id")

// 在元組型別上使用欄位運算式
val keyed = inputStream.keyBy("_1")

// 使用點運算子來嵌套POJO欄位為鍵值
val persons = inputStream.keyBy("address.zip")

// 使用通配符 `_` 來選擇元組中的全部欄位作為鍵
val keyed = inputStream.keyBy("birthday._")

5.5.3 KeySelector函式

第三種指定鍵的方式是使用KeySelector函式,它可以從輸入事件中提取鍵

// T: 輸入元素的型別
// KEY: 鍵值型別
KeySelector[IN, KEY]
	> getKey(IN): KEY

下面的例子會回傳元組中的最大欄位來作為鍵值

val input = DataStream[(Int, Int)] = ...
val keyedStream = input.keyBy(value =https://www.cnblogs.com/t0ugh/p/> math.max(value._1, value._2))

5.6 實作函式

在DataStream API中有很多地方需要使用自定義函式,本節將介紹Flink中定義函式的幾種方式

5.6.1 函式類

Flink中所有用戶自定義函式都是以介面或者抽象類的形式對外暴露的,如MapFunction、FilterFunction和ProcessFunction等

我們可以通過實作介面或者繼承抽象類的方式來定義函式,例如下面的例子

class MyFilter extends FilterFunction[String]{
    override def filter(value: String): Boolean = {
        value.contains("flink")
    }
}

val filted = sentences.filter(new MyFilter())

函式必須是可序列化的

Flink使用Java序列化來序列化所有函式物件,以便將它們發送給對應的算子任務,用戶函式中包含的所有內容都必須是可序列化的,如果您的函式需要一個非序列化的物件實體,您可以將其實作為一個富函式,并在open()方法中初始化非序列化欄位,或者重寫Java序列化和反序列化方法,

5.6.2 Lambda函式

也可以通過Lambda運算式的方式來定義函式

val filted = sentences.filter(_.contains("flink"))

5.6.3 富函式

有時,我們需要在函式 處理第一個記錄之前進行一些初始化作業或者取得函式執行相關的背景關系資訊,DataStream API提供了豐富的函式,它和我們之前見到的普通函式相比可以對外提供更多功能,

DataStream API中的所有轉換函式都有對應的富函式,富函式的使用位置和普通函式以及Lambda函式相同,富函式的名稱以Rich開頭,例如RichMapFunction、RichFlatMapFunction等,

當使用富函式時,你可以對應函式的生命周期實作兩個額外的方法:

  • open()方法是富函式的初始化方法,它在每個任務首次呼叫轉換方法之前呼叫一次

  • close()方法是富函式的終止方法,會在每個任務最后一次呼叫轉換方法呼叫一次,因此,它通常用于清理釋放資源,

  • 另外,還可以使用富函式自帶的getRuntimeContext()方法來從函式的Runtime中獲取一些資訊

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)]{
    var subTaskIndex = 0
    
    override def open(config: Configuration): Unit = {
        subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
        //進行一些初始化作業
    }
    
    override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
        //子任務的編號從0開始
        if(in % 2 == subTaskIndex){
            out.collect((subTaskIndex, in))
        }
        //做一些額外處理作業
    }
    
    override def close(): Unit = {
        //做一些清理作業
    }
}

5.7 匯入外部和Flink依賴

在實作Flink應用時經常需要添加一些外部依賴,應用在執行時,必須能夠訪問到所有依賴,默認情況下,Flink集群加載核心API依賴(DataStream和DataSet API),對于應用的其他依賴則必須顯式提供

兩種方法來確保所在執行應用時可以訪問到所有依賴:

  1. 將所有依賴打進應用的Jar包中,生成一個“胖Jar”
  2. 將依賴放到Flink的./lib目錄下,這樣在Flink行程啟動時就會將依賴加載到Classpath中

推薦使用第一種方式,

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/228948.html

標籤:大數據

上一篇:Flink-3-ApacheFlink架構

下一篇:mapreduce運行問題 :java.io.EOFException

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more