Flink原始碼篇--關于Flink Window開始時間機制
- 前言:
- 測驗資料
- 測驗代碼
- 遇到的坑
- 查看原始碼
前言:
最近由于Flink relase了新的版本1.12,更新了upsert kafka,以及更佳完善的流批一體機制,所以迫不及待想試試 Flink1.12,前段時間因為事情比較多,也是
很久沒搞Flink,最近的一個需求剛好用到視窗,所以想著有空復習一下 Flink內容剛好也跟上一波12的風,于是打開官網了解了一下,順便復習復習window機制
其實之前的window 機制一直停留在使用層次,這次剛好有空就深入研究下,于是乎有了這篇文章 其實我覺得如果搞Flink,想要深入肯定是繞不過視窗和
watermark,這兩天周末寫了幾個demo去復習了一下,給我的感覺就是溫故而知新,因為之前需求匆忙 所以沒辦法深入,最近又玩了一次,發現好多點之前都
沒有注意到,首先我們拿出測驗用例
測驗資料
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771480000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771481000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771482000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771483000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771484000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771485000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771486000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771487000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771488000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771489000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771490000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771491000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771492000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771493000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771494000"}
{"orderId":"10001","shopName":"TestData","userID":"10007","amount":10.00,"sum":1,"orderStatus":1,"orderTime":"1607771495000"}
測驗代碼
public class FlinkDemo02 {
public static final String KAFKA_GROUP_ID = "test";
public static final String KAFKA_TOPIC = "flink";
public static void main(String[] args) {
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", GlobalPublicVariables.KAFKA_SERVER);
pro.setProperty("group.id", KAFKA_GROUP_ID);
final StreamExecutionEnvironment env = FlinkContextUtil.getFlinkEnv(true, true);
DataStreamSource<String> input = env.addSource(new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), pro).setStartFromLatest());
//todo 未寫完
SingleOutputStreamOperator<OrderInformation> data = input.map((line) -> {
return new ObjectMapper().readValue(line, OrderInformation.class);
}).returns(TypeInformation.of(OrderInformation.class)).
assignTimestampsAndWatermarks(WatermarkStrategy
.<OrderInformation>forBoundedOutOfOrderness(Duration.ofMillis(1000))
.withTimestampAssigner((orderInformation, timestamp) -> Long.parseLong(orderInformation.getOrderTime()))).startNewChain();
//需求每五秒統計一次金額
//統計要求,每隔五秒統計一下五秒內的,而不是每隔五秒統計一下滾動,而是滑動
SingleOutputStreamOperator<OrderInformation> sum = data.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
.sum("amount").startNewChain();
sum.print();
try {
//列印執行計劃
System.out.println(env.getExecutionPlan());
env.execute("TestFLink");
} catch (Exception e) {
e.printStackTrace();
}
}
}
遇到的坑
一直以為Flink的事件時間開始是與第一條資料的進入時間開始的,但是我測驗時發現并不是這樣,我們先看下我遇
到的情況,如上代碼一個五秒的窗,Watermark限制亂序,能夠忍受的觸發條件是:
eventTime - watermark時間(3s)> window閉合時間
視窗便會關閉,然后我們插入資料輸出結果
//當我的資料是從1607771480000;開始執行時輸出的內容是50
OrderInformation(orderId=10001, shopName=TestData, userID=10007, amount=50.0, sum=1, orderStatus=1, orderTime=1607771480000)
//當我的資料是從1607771481000;開始執行時輸出的內容是40,就很奇怪

查看原始碼
在我看來watermark觸發時間應該是時間戳到9000而且結果也應該是五十,但是并沒有得到我想要的結果于是我就就去百度了,發現關注這個問題的人不是很多或者說我的關鍵字搜索錯誤,又或者說各位大佬理解能力好,并不像我理解的這樣子,于是我去看了下原始碼

可以看到上面方法指定了Flink注冊視窗時,會先設定一個Start時間,這個Start計算邏輯如下

我們來試一下上面的代碼邏輯
@Test
public void testFlinkWindowTimeStamp() {
long timestamp = 1607771481000L;
long offset=0;
long windowSize=5000;
System.out.println((timestamp - (timestamp - offset + windowSize) % windowSize));
}
//輸出結果1607771480000
執行完以后我只想對自己說以后學習東西還是盡量深入別看了文章直接用,一定要嚴謹,本次文章到此結束,如果文章對你有識訓那就是對我最大的幫助
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/234221.html
標籤:其他
下一篇:年終績效評價填寫的意見與參考
