歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
歡迎訪問我的GitHub
這里分類和匯總了欣宸的全部原創(含配套原始碼):https://github.com/zq2599/blog_demos
Flink處理函式實戰系列鏈接
- 深入了解ProcessFunction的狀態操作(Flink-1.10);
- ProcessFunction;
- KeyedProcessFunction類;
- ProcessAllWindowFunction(視窗處理);
- CoProcessFunction(雙流處理);
關于ProcessFunction狀態的疑惑
學習Flink的ProcessFunction程序中,官方檔案中涉及狀態處理的時候,不止一次提到只適用于keyed stream的元素,如下圖紅框所示:

之前寫過一些flink應用,keyed stream常用但不是必須用的,所以產生了疑問:
- 為何只有keyed stream的元素能讀寫狀態?
- 每個key對應的狀態是如何操作的?
Flink的"狀態"
先去回顧Flink"狀態"的知識點:
- 官方檔案說就兩種狀態:keyed state和operator state:

- 如上圖,keyed stream的元素是具有key的特征,與ProcessFunction的操作狀態時要求匹配,其他steam的元素由于沒有key的特征,所以也就沒有狀態一說了;
- 另一種狀態是Operator State,如下圖,這是和多并行度計算時的算子實體系結的,例如當前算子消費kafka的某個磁區的最新offset,而ProcessFunction是用來處理stream元素的,不會涉及到Operator State:

官方demo
為了學習ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html ,簡單說說這個demo的功能:
- 資料源在不間斷的產生單詞,每個單詞對應一個Tuple2<String,String>的實體;
- 資料源被keyBy方法轉成KeyedStream,key是Tuple2實體的f0欄位;
- 一個KeyedProcessFunction的子類CountWithTimeoutFunction,被用來處理KeyedStream的每個元素,處理的邏輯:為每個key維護一個狀態,狀態的內容是這個key的出現次數和最后一次出現時間;
- 如果那個key連續一分鐘沒有出現,KeyedProcessFunction就向下游發送這個元素;
以上就是官方demo的功能,本來是想通過demo來加深認識,結果看完不但沒有明白,反而更暈了,下圖是我對demo代碼的疑惑:

從上圖可見我的疑惑,這里再復述一下:
5. 入參value是Tuple2型別,假設其f0欄位等于aaa,那么processElement方法的作用,就是取出aaa的狀態,更新后保存;
6. 從代碼上看,state.value()回傳了aaa的狀態,這個value方法并沒有將aaa作為入參,那怎么做到回傳aaa的狀態呢?如果下一個入參value的f0欄位等于bbb了,這個state.value()能回傳bbb的狀態嗎?
7. 對更新狀態的代碼state.update(current)也是同樣的疑惑;
8. 然后又產生了新的疑惑:成員變數state難道是一直在變?每執行一次processElement,都會變成該key對應的state實體?
先反思為何會有上述疑惑
- 上述疑惑產生的原因,應該是受到平時使用HashMap的影響,HashMap獲取值就是在呼叫get方法時指定key,設定值也是在put時指定key,所以看到state.value()方法沒有用key做入參就不習慣了
- 要消除這種不適應,要做的第一件事就是提醒自己:processElement是在框架內運行的,很多資料在之前已經由框架準備好了;
- 接下來要做的,就是把框架準備資料的邏輯看一遍,除了弄明白自己的問題,由于ProcessFunction屬于最低階抽象(如下圖的最下方位置),看懂了這些,其實也是在了解DataStream/DataSet API的設計思路:

跟蹤原始碼
- 如下圖,讓我們從一個斷點的堆疊開始吧,這是在執行上面demo中的processElement方法之前的一個斷點,可見根源是個執行緒的run方法,也就是KeyedProcessFunction對應的算子執行任務的執行緒:

- 上面的堆疊不必每一層都細看,只關注重要的部分,下圖這段很重要:StreamTask.run方法中,有個無限回圈(猜測是每次執行processInput方法都處理KeyedStream的一個元素):

- 如下圖,StreamOneInputProcessor.processInput方法取出KeyedStream的一個元素,呼叫processElement方法,并將此元素作為入參,再結合上一幅圖可以看出:在撰寫KeyedProcessFunction子類的時候,KeyedStream的每個元素都會作為入參,在呼叫你重寫的processElement方法時傳進去;這一點,在做ProcessFunction和KeyedProcessFunction開發時都是要格外注意的:

- 接下來到了最關鍵的地方了,下圖紅框中的streamOperator.setKeyContextElement1(record)會解答我前面的疑惑,一定要進去看個清楚,(后面的黃線上的代碼,您應該猜到了,里面其實就是呼叫demo中的processElement方法)

- 下圖中,AbstractStreamOperator.setKeyContextElement給出了答案:對于KeyedStream的每個元素,都會在這里算出key,再呼叫setCurrentKey保存這個key:

- 展開setCurrentKey,如下圖,發現key的保存和當前狀態的存盤策略(StateBackend)有關,我這里是默認策略HeapKeyedStateBackend:

- 最終,根據當前元素得到的key會在StateBackend的keyContext物件中找地方保存,StateBackend的具體實作和Flink設定有關,我這里是保存到了InternalKeyContextImpl實體的currentKey變數中:

- 代碼讀到這里,對我前面的疑惑,您應該能推測出答案了:state.value()里面會通過StateBackend的keyContext取出剛才保存的key,接下來就能像HashMap那樣根據key查出該key的狀態了,接下來是愉快的印證我們推測的程序;
- 在state.value()代碼位置打斷點一次看個明白,如下圖,果然,state里面有StateBackend的keyContext物件的參考,訪問剛才保存的key就不成問題了:

- 展開state.value()方法如下,簡單明了,直接拿keyContext保存的key作為入參去取對應的狀態:

- 再展開上面的get方法,可見最終是從stateMap中取得的,而這個stateMap的具體實作是CopyOnWriteStateMap型別的實體:

- 代碼讀到這里,只剩最后一處需要印證了:更新狀態的state.update(current)方法,應該也是以StateBackend的keyContext中的key作為自己的key,再將入參的current作為value,更新到stateMap中,來吧,一起印證這個推測;
- 展開方法,看到的是stateTable.put方法(前面剛看過stateTable的get方法,穩了):

- stateTable.put方法里面和前面的get方法一樣,直接拿keyContext保存的key作為自己的key:

- 最終是呼叫了stateMap.put方法,將資料保存在CopyOnWriteStateMap實體中:

- 得益于Flink代碼自身規范、清晰的設計和實作,再加上IDEA強大的debug功能,整個閱讀和分析程序十分順利,這其中的識訓會逐漸在今后深入學習DataStreamAPI的程序中見效;
最后,根據上面的分析程序繪制了一幅簡陋的流程圖,希望能幫助您加快理解:

歡迎關注公眾號:程式員欣宸
微信搜索「程式員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/224703.html
標籤:其他
上一篇:python 爬蟲
