Flink的狀態與容錯是這個框架很核心的知識點,其中一致檢查點也就是Checkpoints也是Flink故障恢復機制的核心,這篇文章將詳細介紹Flink的狀態管理和Checkpoints的概念以及在生產環境中的引數設定,
什么是State狀態?
- 在使用Flink進行視窗聚合統計,排序等操作的時候,資料流的處理離不開狀態管理
- 是一個Operator的運行的狀態/歷史值,在記憶體中進行維護
- 流程:一個算子的子任務接收輸入流,獲取對應的狀態,計算新的結果,然后把結果更新到狀態里面
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-su4PajFa-1635586788360)(picture/image-20211030165453846.png)]](https://img.uj5u.com/2021/10/31/279395310746133.png)
有狀態和無狀態介紹
-
無狀態計算: 同個資料進到算子里面多少次,都是一樣的輸出,比如 filter
-
有狀態計算:需要考慮歷史狀態,同個輸入會有不同的輸出,比如sum、reduce聚合操作
-
狀態管理分類
- ManagedState(用的多)
- Flink管理,自動存盤恢復
- 細分兩類
- Keyed State 鍵控狀態(用的多)
- 有KeyBy才用這個,僅限用在KeyStream中,每個key都有state ,是基于KeyedStream上的狀態
- 一般是用richFlatFunction,或者其他richfunction里面,在open()宣告周期里面進行初始化
- ValueState、ListState、MapState等資料結構
- Operator State 算子狀態(用的少,部分source會用)
- ListState、UnionListState、BroadcastState等資料結構
- Keyed State 鍵控狀態(用的多)
- RawState(用的少)
- 用戶自己管理和維護
- 存盤結構:二進制陣列
- ManagedState(用的多)
-
State資料結構(狀態值可能存在記憶體、磁盤、DB或者其他分布式存盤中)
- ValueState 簡單的存盤一個值(ThreadLocal / String)
- ValueState.value()
- ValueState.update(T value)
- ListState 串列
- ListState.add(T value)
- ListState.get() //得到一個Iterator
- MapState 映射型別
- MapState.get(key)
- MapState.put(key, value)
- ValueState 簡單的存盤一個值(ThreadLocal / String)
State狀態后端:存盤在哪里
-
Flink 內置了以下這些開箱即用的 state backends :
-
(新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
- 如果沒有其他配置,系統將使用 HashMapStateBackend,
-
(舊版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
- 如果不設定,默認使用 MemoryStateBackend,
-
-
狀態詳解
-
HashMapStateBackend 保存資料在內部作為Java堆的物件,
- 鍵/值狀態和視窗運算子持有哈希表,用于存盤值、觸發器等
- 非常快,因為每個狀態訪問和更新都對 Java 堆上的物件進行操作
- 但是狀態大小受集群內可用記憶體的限制
- 場景:
- 具有大狀態、長視窗、大鍵/值狀態的作業,
- 所有高可用性設定,
-
EmbeddedRocksDBStateBackend 在RocksDB資料庫中保存狀態資料
- 該資料庫(默認)存盤在 TaskManager 本地資料目錄中
- 與HashMapStateBackend在java存盤 物件不同,資料存盤為序列化的位元組陣列
- RocksDB可以根據可用磁盤空間進行擴展,并且是唯一支持增量快照的狀態后端,
- 但是每個狀態訪問和更新都需要(反)序列化并可能從磁盤讀取,這導致平均性能比記憶體狀態后端慢一個數量級
- 場景
- 具有非常大狀態、長視窗、大鍵/值狀態的作業,
- 所有高可用性設定
-
舊版
MemoryStateBackend(記憶體,不推薦在生產場景使用) FsStateBackend(檔案系統上,本地檔案系統、HDFS, 性能更好,常用) RocksDBStateBackend (無需擔心 OOM 風險,是大部分時候的選擇) 代碼配置: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); //或者 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); -
什么是Checkpoint檢查點
-
Flink中所有的Operator的當前State的全域快照
-
默認情況下 checkpoint 是禁用的
-
Checkpoint是把State資料定時持久化存盤,防止丟失
-
手工呼叫checkpoint,叫 savepoint,主要是用于flink集群維護升級等
-
底層使用了Chandy-Lamport 分布式快照演算法,保證資料在分布式環境下的一致性
-
有狀態流應用的一致檢查點,其實就是所有任務的狀態,在某個時間點的一份 拷貝(一份快照);這個時間點,應該是所有任務都恰好處理完一個相同的輸入資料的時候
Flink 捆綁的些檢查點存盤型別:
- 作業管理器檢查點存盤 JobManagerCheckpointStorage
- 檔案系統檢查點存盤 FileSystemCheckpointStorage
端到端(end-to-end)狀態一致性
資料一致性保證都是由流處理器實作的,也就是說都是在Flink流處理器內部保證的
在真實應用中,了流處理器以外還包含了資料源(例如Kafka、Mysql)和輸出到持久化系統(Kafka、Mysql、Hbase、CK)
端到端的一致性保證,是意味著結果的正確性貫穿了整個流處理應用的各個環節,每一個組件都要保證自己的一致性,
- Source
- 需要外部資料源可以重置讀取位置,當發生故障的時候重置偏移量到故障之前的位置
- 內部
- 依賴Checkpoints機制,在發生故障的時可以恢復各個環節的資料
- Sink:
- 當故障恢復時,資料不會重復寫入外部系統,常見的就是 冪等和事務寫入(和checkpoint配合)
有關檢查點配置的常用引數配置介紹
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設定checkpoint的周期, 每隔1000 ms進行啟動一個檢查點
env.getCheckpointConfig().setCheckpointInterval(1000);
// 設定狀態級別模式為exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//超時時間,可能是保存太耗費時間或者是狀態后端的問題,任務同步執行不能一直阻塞
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 設定取消和故障時是否保留Checkpoint資料,這個設定較為重要,沒有正確的選擇好可能會導致檢查點資料失效
//有兩個引數可以設定
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業時保留檢查點,必須在取消后手動清理檢查點狀態,
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業時洗掉檢查點,只有在作業失敗時,檢查點狀態才可用,
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
實戰部分:
為了模擬生產環境中實時產生的訂單資料,這里我們自己定義一個資料源來源源不斷的產生模擬訂單資料
訂單類:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
@Override
public String toString() {
return "VideoOrder{" +
"tradeNo='" + tradeNo + '\'' +
", title='" + title + '\'' +
", money=" + money +
", userId=" + userId +
", createTime=" + createTime +
'}';
}
}
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<VideoOrder> list = new ArrayList<>();
static {
list.add(new VideoOrder("","java",10,0,null));
list.add(new VideoOrder("","spring boot",15,0,null));
}
/**
* run 方法呼叫前 用于初始化連接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 產生資料的邏輯
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString().substring(30);
int userId = random.nextInt(10);
int videoNum = random.nextInt(list.size());
VideoOrder videoOrder = list.get(videoNum);
videoOrder.setUserId(userId);
videoOrder.setCreateTime(new Date());
videoOrder.setTradeNo(id);
System.out.println("產生:"+videoOrder.getTitle()+",價格:"+videoOrder.getMoney()+", 時間:"+ TimeUtil.format(videoOrder.getCreateTime()));
ctx.collect(videoOrder);
}
}
/**
* 控制任務取消
*/
@Override
public void cancel() {
flag = false;
}
}
產生資料的格式如下:
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-J8KdoB7o-1635586788363)(picture/image-20211030171800058.png)]](https://img.uj5u.com/2021/10/31/279395310746131.png)
主程式:使用reduce算子對資料進訂單價格進行滾動計算,并設定Checkpoint保證資料狀態可以存取
public class FlinkKeyByReduceApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//構建執行任務環境以及任務的啟動的入口, 存盤全域相關的引數
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//這是我本機的ip地址
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint"));
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
@Override
public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setTitle(value1.getTitle());
videoOrder.setMoney(value1.getMoney() + value2.getMoney());
return videoOrder;
}
});
reduce.print();
env.execute("job");
}
}
在本地測驗運行結果,可以看到資料根據訂單分組不斷的進行滾動計算
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-D8NDkIa1-1635586788365)(picture/image-20211030172433842.png)]](https://img.uj5u.com/2021/10/31/279395310746134.png)
進入服務器的HDFS查看檢查點資料是否存在

之后將應用進行打包,上傳到服務器進行測驗,可以使用Flink的Web頁面進行手動提交jar包運行,也可以使用命令進行提交,之后可以看到程式運行程序中的相關日志輸出
./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar

模擬宕機
運行程式的時候我們可以在Flink看到任務進行的id號,這個時候我們手動的cancel掉或者是直接把服務kill掉,這個時候任務被強制暫停,
進入到HDFS可以看到我們設定的檢查點的資料依舊存在,我們使用如下命令,讓程式從上次宕機前的訂單計算狀態繼續往下計算,
-s : 指定檢查點的元資料的位置,這個位置記錄著宕機前程式的計算狀態
./bin/flink run -s /checkpoint/id號/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar

運行命令,進入WEB頁面進行查看,是否成功,
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-DDN7NI7I-1635586788372)(picture/image-20211014174041183.png)]](https://img.uj5u.com/2021/10/31/279395310746137.png)
可以看到出現一次close的時候,代表我們的程式以及停止,服務器已經宕機,這個時候訂單的計算結果如上圖的紅色方框,在我們運行了上面那條命令后再次查看日志的資料,從open開始可以看到這次就不是從訂單最初的狀態開始進行的了,而是從上一次宕機前計算的結果,繼續往下計算,到這里Checkponit的實戰應用測驗就完成了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/342154.html
標籤:其他
上一篇:超詳細超簡單的搭建三臺虛擬機集群
