狀態編程是Flink最出色的功能沒有之一
一、什么是狀態?
在流式計算中有些操作一次處理一個獨立的事件(比如決議一個事件), 有些操作卻需要記住多個事件的資訊(比如視窗操作).
那些需要記住多個事件資訊的操作就是有狀態的.
流式計算分為無狀態計算和有狀態計算兩種情況
無狀態計算:無狀態的計算觀察每個獨立事件,并根據最后一個事件輸出結果,例如,流處理應用程式從傳感器接收水位資料,并在水位超過指定高度時發出警告
有狀態計算:有狀態的計算則會基于多個事件輸出結果,以下是一些例子,例如,計算過去一小時的平均水位,就是有狀態的計算,所有用于復雜事件處理的狀態機,
二、需要狀態的場景:
去重
資料流中的資料有重復,我們想對重復資料去重,需要記錄哪些資料已經流入過應用,當新資料流入時,根據已流入過的資料來判斷去重,
檢測
檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式快取下來,比如,判斷一個溫度傳感器資料流中的溫度是否在持續上升,
聚合
對一個時間視窗內的資料進行聚合分析,分析一個小時內水位的情況
更新機器學習模型
在線機器學習場景下,需要根據新流入資料不斷更新機器學習的模型引數,
三、Flink的Failover(故障轉移機制)
Job 的重啟:每次運行,默認都是新的Job,沒法實作
Task的重啟:
有個算子在某個Task,這個Task拋出了例外,Flink可以采取failover(故障轉移機制)
重新找插槽,重新運行Task 可以實作,但不能保存原有狀態
注意:Flink默認開啟了故障時不重啟策略,我們使用故障轉移機制時需要將其關閉,不然會出現如下報錯
Recovery is suppressed by NoRestartBackoffTimeStrategy
設定故障轉移機制
//設定故障轉移,第一個引數最多重試重啟次數,第二個引數兩次重啟次數的時間間隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
代碼實作
package net.cyan.state;
?
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
?
import java.util.ArrayList;
import java.util.List;
?
?
/**
?
把收到的每個字串都存到一個list集合
并且希望達到當程式掛掉時,狀態可以自動恢復
?
Job 的重啟:每次運行,默認都是新的Job,沒法實作
Task的重啟:
有個算子在某個Task,這個Task拋出了例外,Flink可以采取failover(故障轉移機制)
重新找插槽,重新運行Task 可以實作
*/
public class Demo1_test {
public static void main(String[] args) {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設定故障轉移,第一個引數最多重試重啟次數,第二個引數兩次重啟次數的時間間隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
?
env.socketTextStream("hadoop103", 9999)
.map(new MyMapFunction())
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
//模擬出現例外
if (value.contains("x")){
throw new RuntimeException("出例外了");
}
System.out.println(value);
}
});
?
try {
//啟動執行環境
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
?
?
}
public static class MyMapFunction implements MapFunction<String,String>{
private List<String> list=new ArrayList<>();
?
@Override
public String map(String value) throws Exception {
list.add(value);
return list.toString();
}
}
}
故障轉移機制受限于用戶所設定的重啟次數,一旦達到最大重啟次數后將不再進行重啟而是直接err,而且狀態會丟失,
我們可以通過另一種方法達到無限的Task重啟次數以及狀態持久化保存,開啟CheckPoint
//開啟checkpoint,每間隔2秒持久化到磁盤一次,可以實作無限重啟
env.enableCheckpointing(2000);
//設定持久化路徑,此路徑不設定會默認保存在idea檔案目錄下
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ck");
開啟CheckPoint可以讓我們能夠無限次的重啟Task這樣來足以應對流式計算,但如何在Task重啟后恢復之前存檔的狀態呢?
那就是使用Flink提供的編程狀態Managed State
Flink中的狀態分類
Flink包括兩種基本型別的狀態Managed State和Raw State
| Raw State | ||
|---|---|---|
| 狀態管理方式 | Flink Runtime托管, 自動存盤, 自動恢復, 自動伸縮 | 用戶自己管理 |
| 狀態資料結構 | Flink提供多種常用資料結構, 例如:ListState, MapState等 | 位元組陣列: byte[] |
| 使用場景 | 絕大數Flink算子 | 所有算子 |
使用狀態編程需要實作CheckpointedFunction介面,重寫它的兩個方法
代碼如下
package net.cyan.state;
?
import net.cyan.POJO.MyUtil;
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
?
import java.util.ArrayList;
import java.util.List;
?
?
/**
?
把收到的每個字串都存到一個list集合
并且希望達到當程式掛掉時,狀態可以自動恢復
?
Job 的重啟:每次運行,默認都是新的Job,沒法實作
Task的重啟:
有個算子在某個Task,這個Task拋出了例外,Flink可以采取failover(故障轉移機制)
重新找插槽,重新運行Task 可以實作
*/
public class Demo1_test {
public static void main(String[] args) {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//設定故障轉移,第一個引數最多重試重啟次數,第二個引數兩次重啟次數的時間間隔
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
//開啟checkpoint,每間隔2秒持久化到磁盤一次,可以實作無限重啟
env.enableCheckpointing(2000);
//設定持久化路徑
env.getCheckpointConfig().setCheckpointStorage("file:///e:/ck");
?
env.socketTextStream("hadoop103", 9999)
.map(new MyMapFunction())
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
//模擬出現例外
if (value.contains("x")){
throw new RuntimeException("出例外了");
}
System.out.println(value);
}
});
?
try {
//啟動執行環境
env.execute();
} catch (Exception e) {
e.printStackTrace();
}