Checkpoint
- 目的
為了保證程式發生故障時狀態不丟也不錯,它是保證狀態一致性而不是資料一致性, - 原理
使用異步屏障快照Asynchronous Barrier Snapshotting(簡稱 ABS)演算法(依賴于Chandy-Lamport演算法的變種)實作分布式快照, - 流程
1)JobManager周期性產生Barrier,并廣播給所有Source算子,
2)Source算子收到Barrier后,生成自己的狀態快照(包含資料源對應的offset/partition等資訊),然后把Barrier廣播給下游所有非Source算子
3)非Source算子收到某條上游算子的Barrier后,會阻塞此上游算子的輸入流,把再Barrier之后流過來的資料先放到算子的緩沖區,等收到上游所有算子的Barrier后,此時才會進行生成自己的狀態快照,然后把此算子的Barrier廣播給下游所有非Source算子,(這里是Barrier對齊機制,保證資料不會被重復處理,當然,如果為了效率,也可以不進行對齊,此時資料會至少處理一次,可能導致資料被重復處理,對于資料的EXACTLY_ONCE來說,在1.11版本對于Barrier對齊機制進行了優化,因為對齊機制會導致checkpoint時間過長以及當作業出現反壓時,從而加重作業的反壓,此時引入了Unaligned Checkpoint機制,這個機制會導致接受到第一個Barrier時,不會阻塞此流后續資料的計算,這種方法也由壞處就是要持久化一部分快取資料)
4)當所有Sink算子完成checkpoint后,且向JobManager發送確認訊息后,該次checkpoint完成,
State
- 狀態型別
1)原生狀態(Raw State)
Raw State是開發者自己管理的,需要自己序列化,
2)托管狀態(Managed State)
Managed State是由Flink管理的,Flink幫忙存盤、恢復和優化,Managed State再進行細分,由兩種型別:Keyed State和Operator State,
Keyed State:一個SubTask有多個State,每一個Key對應一個State,有ValueState,ListState,MapState等
Operator State:一個SubTask有一個State,有ListState,BroadcastState等 - 狀態后端
Flink 內置了以下這些開箱即用的 state backends :
①HashMapStateBackend:狀態資料以 Java 物件的形式存盤在堆中,
②EmbeddedRocksDBStateBackend:狀態資料保存在 RocksDB 資料庫中,資料被以序列化位元組陣列的方式存盤,RocksDB 資料庫默認將資料存盤在 TaskManager 的資料目錄,
如果不設定,默認使用 HashMapStateBackend,
在Flink1.13版本對狀態后端進行了改進,幫助用戶更好理解本地狀態存盤和 checkpoint 存盤的區分,
1)MemoryStateBackend
舊版本的 MemoryStateBackend 等價于使用 HashMapStateBackend 和 JobManagerCheckpointStorage,
2)FsStateBackend
舊版本的 FsStateBackend 等價于使用 HashMapStateBackend 和 FileSystemCheckpointStorage,
3)RocksDBStateBackend
舊版本的 RocksDBStateBackend 等價于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage.
Time
- 時間語意
Flink在1.12版本后默認使用Event Time
1)處理時間(Process Time)資料進入Flink被處理的系統時間(Operator處理資料的系統時間)
2)事件時間(Event Time)資料在資料源產生的時間,一般由事件中的時間戳描述,比如用戶日志中的TimeStamp,
3)攝取時間(Ingestion Time)資料進入Flink的時間,記錄被Source節點觀察到的系統時間, - 水位線
flink1.11中對flink的水印生成介面進行了重構,創建watermark主要有以下三種方式
1)使用createWatermarkGenerator 創建watermark,
2)使用固定延時策略生成水印,呼叫WatermarkStrategy中的靜態方法forBoundedOutOfOrderness,
3)使用單調遞增的方式生成水印,呼叫WatermarkStrategy中的靜態方法forMonotonousTimestamps,
Window
- 分類
1)Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
2)Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- 生命周期
開始:當應該屬于該視窗的第一個元素到達時,就會創建一個視窗,
結束:當時間超過其結束時間戳加上用戶指定的允許延遲時,該視窗將被完全洗掉,
每個視窗都有一個觸發器和一個函式,函式是用于視窗內資料的計算,觸發器是決定此視窗的函式多會進行計算的條件, - 型別
1)Tumbling Windows(滾動視窗)
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
2)Sliding Windows(滑動視窗)
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
3)Session Windows(會話視窗)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
4)Global Windows(全域視窗)
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423619.html
標籤:其他
