在Flink中的每個函式和運算子都是有狀態的,在處理程序中可以用狀態來存盤資料,這樣可以利用狀態來構建復雜操作,為了讓狀態容錯,Flink需要設定checkpoint狀態,Flink程式是通過checkpoint來保證容錯,通過checkpoint機制,Flink可恢復作業的狀態和計算位置,
checkpoint檢查點
前提條件
Flink的checkpoin機制需要與流和狀態的持久化存盤互動,一般它要求:
- 一個持久化的資料源
- 當Flink程式出現問題時,可以通過checkpoint持久化存盤中恢復,然后從出錯的地方開始重新消費資料
- 該資料源可以在一定時間內重跑資料,例如:Kafka、RabbitMQ或者檔案系統HDFS、S3、…
- 狀態的持久存盤
- 狀態需要永久的保存下來,通常是分布式檔案系統(例如:HDFS、S3、GFS、…)
啟用和配置檢查點
默認情況,Flink是禁用檢查點,要啟用檢查點,呼叫
// 啟用檢查點// 單位:毫秒
env.enableCheckpointing(1000);
在啟用檢查點時,還可以配置檢查點的其他引數,
- exactly-one or at-least-once(僅一次或者至少一次)
- 大多數程式都是設定為exactly-once,只有在某些超低延遲的應用(例如:始終要求是毫秒級的應用)
- 通過查看原始碼,我們看到,Flink默認是 exactly-once
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
- 檢查點超過規定的時間就會自動終止
- 檢查點之間的最小時間
- 下一個檢查點將在上一個檢查點完成后5秒鐘啟動
- 檢查點最小間隔時間不會受檢查點間隔更容易配置
- 檢查點的并發數目,默認情況一個檢查點在運行時不會觸發另一個檢查點,這樣可以確保Flink不會花太多時間在checkpoint上,并確保流可以有效進行,
- 可以設定多個重疊的checkpoint,這對容許有一定延遲,并希望較頻繁的檢查(100ms)來重新處理故障是有用的
- 外部檢查點
- 可以將檢查點設定為外部持久化,這樣檢查點的元資料將寫入持久存盤,并且但作業運行失敗是不會自動清理
- 這樣可以做雙重保險
- 檢查點執行發生錯誤,是否執行任務,
- 默認情況,如果checkpoint失敗,任務也將失敗
- 即時最近有更多的savepoint可用于恢復,flink依然會選擇使用最近一次的checkpoint來進行錯誤恢復
參考配置:
// --------
// 配置checkpoint
// 啟用檢查點
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);選擇狀態后端存盤
Flink的checkpoint機制可以存盤計時器和有狀態operation的所有快照,包括:連接器、視窗或者用戶自定義狀態,具體checkpoint存盤在哪兒(例如:是JobManager記憶體、檔案系統或者資料庫),依賴于狀態后端的配置,
默認情況,狀態保存在TaskManager的記憶體中,檢查點存盤在TM的記憶體中,為了適當地保存大狀態,Flink支持其他的存盤,我們可以通過:
StreamExecutionEnvironment.setStateBackend(…)
來指定存盤方式
Flink狀態管理
狀態的應用場景:
- 當應用程式想要按照某種模式搜索某些事件時,狀態可以保存迄今所有的事件序列
- 當每分鐘/小時/天需要對流資料進行聚合,狀態可以保存掛起的聚合
- 當在資料流上訓練機器學習模型時,狀態可以用來保存某一類引數的版本
- 當需要管理歷史資料時,狀態允許訪問過去歷史資料
Flink狀態可以保存在堆內、或者是堆外,Flink也可以管理應用程式的狀態,必要時也可以溢位到磁盤,如果應用要保持非常大的狀態,可以不修改程式邏輯情況下配置狀態后端存盤,
Flink狀態分類
Flink中有兩種基本的狀態:
- Keyed State
- Operator State
Keyed State
Keyed State通常和key相關,僅僅在KeyedStream的方法和算子中使用,可以把 Keyed State看作是磁區,而且每一個key僅出現在一個磁區內,邏輯上每個 keyed-state和唯一元組<算子并發實體, key>系結,由于每個key僅屬于算子的一個并發,因此可以簡化為<算子, key>
Operator State
對于 Operator State來說,每個Operator State和一個并發實體系結,Kafka connector是Flink中使用operator state的一個很好的示例,每個Kafka消費者的并發在Operator State中維護一個 topic partition到offset的映射關系,
Operator state在Flink作業的并發改變后,會重新分發狀態,分發的策略和keyed stated不一樣,
Raw State與Managed State
Keyed Stated和Operator State分別有兩種形式:managed 和 raw
Managed State是由Flink運行時管理的資料結構來表示的,例如:內部的Hash Table或者RocksDB,例如:ValueState、ListState等,Flink運行時會對這些狀態進行編碼并寫入Checkpoint,
Raw State則保存在自己的資料結構中,checkpoint的時候,Flink并不知道狀態里面具體的內容,僅僅寫入一串位元組序列到checkpoint中,
所有的DataStream的function都可以使用managed state,但raw state只能在實作算子時使用,由于Flink可以在修改并發時更好的分發狀態資料,并且能夠更好的管理記憶體,因為講義使用 managed state.
使用Managed Keyed State
Managed keyed state介面提供不同型別的狀態訪問介面,這些狀態都作用在當前輸入資料的key下,這些狀態僅可在KeyedStream上使用,可以通過 stream.keyBy(…)得到KeyedStream,
所有支持的狀態型別如下:
- ValueState<T>
- 保存一個可以更新和獲取的值,算子接收到的每個key都可能對應一個值
- 可以通過update(T)進行更新,通過value()獲取
- ListState<T>
- 保存一個元素的串列,可以往這個串列中追加資料,并在當前串列上檢索
- 可以通過 add(T)或者addAll(List<T>)進行追加元素
- 通過get()獲取整個串列
- 通過 update(List<T>)覆寫當前串列
- ReducingState<T>
- 保存一個單值,表示添加到狀態的所有值的聚合,介面與ListState類似
- AggregatingState<IN, OUT>
- 保存一個單值,表示添加到狀態的所有值的聚合
- 與ReducingState相反的是,聚合型別可能與添加到狀態的元素型別不同,介面與ListState類似
- FoldingState<T, ACC>(后續將過期)
- 保存一個單值,白搜狐添加到狀態的所有值的集合
- 與ReducingState相反的是,聚合型別可能與添加到狀態的元素型別不同,介面與ListState類似
- MapState<UK, UV>
- 維護一個映射串列,可以添加鍵值到狀態中,可以獲取當前映射的迭代器
- 使用put、putAll添加映射,使用 get檢索特定key
注意:
- 這些狀態物件僅用于狀態互動,狀態本身不一定存盤在記憶體中,還有可能保存在磁盤或者其他位置
- 從狀態中獲取的值取決于輸入元素說代表的key,因此,在不同key上呼叫同一個介面,可能得到不同的值
使用Managed Operator State
可以通過實作 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>介面來使用Managed Operator State,
CheckpointedFunction介面:
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
在Flink進行checkpoint時,會呼叫snapshotstate(),用戶自定義函式初始化時會呼叫 initializeState,初始化包括第一次自定義函式初始化和從之前的 checkpoint 回復,因此,initializeState 中應該也包括狀態恢復的邏輯,
Managed Operator State以list的形式存在,這些狀態是一個可序列化物件的集合List,彼此獨立,方便在改變并發后進行狀態的重新分派,換句話說,這些物件是重新分配 non-keyed state的最細粒度,根據狀態的不同訪問方式,有以下兩種分配模式:
- Even-split redistribution
- 每個算子都存盤一個串列形式的狀態集合,整個狀態由所有的串列拼接而成
- 但作業恢復或者重新分配時,整個狀態按照算子的并行度均勻分配
- Union redistribution
- 每個算子保存一個串列形式的狀態集合,整個狀態由所有的串列拼接而成
- 但作業恢復或者重新分配時,每個算子都將獲得所有的狀態資料
ListCheckpointed介面:
ListCheckpointed介面是CheckpointedFunction介面的精簡版,僅支持 even-split redistribution的list state
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List<T> state) throws Exception;
snapshotState()需要回傳一個將寫入到checkpoint的物件串列, restoreState則需要處理恢復回來的物件串列,
參考文獻:
Flink官方檔案:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/32513.html
標籤:大數據
