目錄
- 第3章 Apache Flink架構
- 3.1 系統架構
- 3.1.1 搭建Flink所需的組件
- 3.1.1.1 JobManager
- 3.1.1.2 RecourceManager
- 3.1.1.3 TaskManager
- 3.1.1.4 Dispatcher
- 3.1.1.5 整體架構圖
- 3.1.2 應用部署
- 3.1.2.1 框架模式
- 3.1.2.2 庫模式
- 3.1.3 任務執行
- 3.1.4 高可用性設定
- 3.1.4.1 TaskManager故障
- 3.1.4.2 JobManager故障
- 3.1.1 搭建Flink所需的組件
- 3.2 Flink中的資料傳輸
- 3.2.1 基于信用值的流量控制
- 3.2.2 任務鏈接
- 3.3 事件時間處理
- 3.3.1 時間戳
- 3.3.2 水位線
- 3.3.3 水位線傳播和事件時間
- 3.3.4 時間戳分配和水位線生成
- 3.4 狀態管理
- 3.4.1 算子狀態
- 3.4.2 鍵值磁區狀態
- 3.4.3 狀態后端
- 3.4.4 有狀態的算子的擴縮容
- 3.4.4.1 帶有鍵值磁區狀態的算子擴縮容
- 3.4.4.2 帶有算子串列狀態的算子擴縮容
- 3.4.4.3 帶有算子聯合狀態的算子擴縮容
- 3.4.4.4 帶有算子廣播狀態的算子擴縮容
- 3.5 檢查點、保存點、狀態恢復
- 3.5.1 一致性檢查點
- 3.5.2 從一致性檢查點中恢復
- 3.5.3 Flink檢查點演算法
- 3.5.4 檢查點對性能的影響
- 3.5.5 保存點
- 3.5.5.1 保存點的使用
- 3.5.5.2 從保存點啟動應用
- 3.1 系統架構
第3章 Apache Flink架構
參考書籍
Stream Processing with Apache Flinkhttps://www.oreilly.com/library/view/stream-processing-with/9781491974285/
《基于Apache Flink的流處理》https://book.douban.com/subject/34912177/
注:本文主要是針對《基于Apache Flink的流處理》的筆記
1-8章筆記下載地址
在這一章中,我們對Flink的架構進行了一個高層次的介紹,并描述了Flink如何解決我們之前討論過的流處理相關問題,特別地,我們重點解釋Flink的分布式架構,展示它在流處理應用中是如何處理時間和狀態的,并討論了它的容錯機制,
3.1 系統架構
Flink是一個用于狀態化并行資料流處理的分布式系統,Flink設定由多個行程組成,這些行程通常分布在多臺機器上運行,
分布式系統需要解決的常見挑戰是
- 集群中計算資源的分配和管理
- 行程協調
- 持久和高可用性資料存盤
- 故障恢復
Flink本身并沒有實作所有這些功能,它只關注于其核心功能——分布式資料流處理,但是利用了很多現有的開源中間件和框架來實作其他非核心部分,
- Flink與集群資源管理器(如Apache Mesos、YARN和Kubernetes)集成得很好,但也可以配置為作為獨立集群運行,
- Flink不提供持久的分布式存盤,相反,它利用了像HDFS這樣的分布式檔案系統或S3這樣的物件存盤,
- 對于高可用設定中的領導選舉,Flink依賴于Apache ZooKeeper,
3.1.1 搭建Flink所需的組件
Flink的搭建由四個不同的組件組成,它們一起作業來執行流應用程式,這些組件是JobManager、ResourceManager、TaskManager和Dispatcher,由于Flink是用Java和Scala實作的,所以所有組件都運行在Java虛擬機(jvm)上,各組成部分的職責將在下面四個子小節分別介紹,
3.1.1.1 JobManager
應用管理
JobManager是控制 單個應用程式執行的主行程,每個應用程式由一個的JobManager控制,(一對一關系)
- JobManager負責接收要執行的應用程式,該應用程式由一個所謂的JobGraph(一個邏輯資料流圖)和一個JAR檔案組成(JAR檔案捆綁了該應用程式所有必需的類、庫和其他資源),
- JobManager將JobGraph轉換為名為ExecutionGraph的物理資料流圖,該資料流圖由可并行執行的任務組成,
- JobManager從ResourceManager請求必要的資源(TaskManager槽)來執行任務,一旦它接收到足夠數量的TaskManager槽,它就會將ExecutionGraph的任務分配給執行它們的TaskManager,
- 在執行期間,JobManager負責所有需要集中協調的操作,如檢查點的協調,
3.1.1.2 RecourceManager
資源管理
Flink為 不同的環境和資源提供者(如YARN、Mesos、Kubernetes和獨立部署)提供了多個資源管理器,
- ResourceManager負責管理Flink的處理資源單元---TaskManager槽,
- 當JobManager請求TaskManager槽時,ResourceManager會命令某個帶有空閑槽的TaskManager將它的空閑槽提供給JobManager,
- 如果ResourceManager沒有足夠的槽來滿足JobManager的請求,則ResourceManager可以與資源提供者對話,讓資源提供者嘗試啟動更多的TaskManager,
- ResourceManager還負責終止空閑的TaskManager以釋放計算資源,
3.1.1.3 TaskManager
作業行程,執行任務的
TaskManager是Flink的作業行程(worker process,工人)
- 通常,在一個Flink集群中有多個TaskManager在運行,
- 每個TaskManager提供一定數量的槽,槽的數量限制了TaskManager可以執行的任務數量,
- 在TaskManager啟動之后,TaskManager將它的槽注冊到ResourceManager,
- 當JobManager請求槽的時候,根據ResourceManager的指示,TaskManager向JobManager提供一個或多個槽,
- 然后JobManager可以將任務分配到槽中,讓TaskManager執行這些任務,
- 在執行期間,TaskManager與運行相同應用但是不同任務的其他TaskManager交換資料,
3.1.1.4 Dispatcher
與用戶直接對話
Dispatcher提供一個REST介面讓用戶提交要執行的應用,
- 應用提交執行后,它將啟動JobManager,并將應用交給它來執行,
- Dispatcher還運行一個web儀表盤來提供關于作業執行的資訊,
3.1.1.5 整體架構圖

3.1.2 應用部署
Flink應用程式可以以兩種不同的模式來部署,
3.1.2.1 框架模式
在這種模式下,Flink應用程式被打包到一個JAR檔案中,并由客戶端提交給一個正在運行的服務,該服務可以是Flink Dispatcher、Flink JobManager或YARN的ResourceManager,
- 如果應用程式被提交到JobManager,它將立即開始執行應用程式,
- 如果應用程式被提交給Dispatcher或YARN ResourceManager,它將啟動JobManager并移交應用程式,然后JobManager將開始執行應用程式,
3.1.2.2 庫模式
在這種模式下,Flink應用程式被系結在一個應用程式特定的容器鏡像中,比如Docker鏡像,
- 該鏡像還包括運行JobManager和ResourceManager的代碼,
- 當容器從鏡像啟動時,它會自動啟動ResourceManager和JobManager,并執行系結的應用程式,
- 第二個獨立于應用程式的鏡像用于部署TaskManager容器,
- 從這個鏡像啟動的容器會自動啟動TaskManager,它連接到ResourceManager并注冊它的槽,
- 通常,外部資源管理器(如Kubernetes)負責啟動鏡像,并負責在發生故障時重新啟動容器,
第一種模式比較傳統,第二種模式常用于微服務中,
3.1.3 任務執行
TaskManager可以同時執行多個任務,
這些任務可以
- 屬于同一算子(資料并行)、
- 不同算子(任務并行)的子任務
- 甚至是來自不同應用程式的子任務(應用并行),
TaskManager提供固定數量的處理槽來控制它能夠并發執行的任務的數量,一個處理槽可以執行應用程式的某個算子的一個并行任務,下圖是一個TaskManager、處理槽、任務以及算子關系的例子,

左側是一個JobGraph(應用程式的非并行表示,邏輯圖),
-
它由5個算子組成,
-
算子A和C是資料源,算子E是資料匯,
右側是一個ExecutionGraph(物理圖)
- 算子C和E的并行度為2,其他算子的并行度為4,
- 由于最大算子并行度是4個,應用程式至少需要4個可用的處理槽來執行,
- 給定兩個各有兩個處理槽的Taskmanager,就滿足了這個需求,
- JobManager將JobGraph擴展為一個ExecutionGraph,并將任務分配給四個可用插槽,
- 并行度為4的算子各自有4個并行任務,這些任務 被分配給每個槽,
- 運算子C和E的各自有兩個并行任務,分別被分配到槽1.1和2.1以及槽1.2和2.2,
- 將多個不同算子的任務 分配到同一個插槽的優點是這些任務可以在同一個行程中高效地交換資料,而不需要訪問網路,
每個TaskManager是一個JVM,而每個Slot是JVM中的一個執行緒,TaskManager在同一個JVM行程中以多執行緒方式執行它的任務,執行緒比單獨的行程更輕量,通信成本更低,但不會嚴格地將任務彼此隔離,因此,一個行為不正常的任務可以殺死整個TaskManager行程和運行在它上面的所有任務,
3.1.4 高可用性設定
流式應用程式通常設計為24x7運行,因此,即使內部行程失敗,也不能停止運行,
而要想從失敗中恢復
- 系統首先需要重新啟動失敗的行程
- 其次,重新啟動應用程式并恢復其狀態,
本小節主要學習如何重新啟動失敗的行程,
3.1.4.1 TaskManager故障
下面舉例說明TaskManager故障應該如何處理
-
假設我們的應用程式要以最大并行度為8來執行,那么四個TaskManager(每個TaskManager提供兩個插槽)可以滿足我們對并行度的需求,
-
如果其中一個TaskManager發生故障,可用插槽的數量將減少到6個,
-
在這種情況下,JobManager將請求ResourceManager提供更多處理槽,
-
如果請求失敗,JobManager會按照一定的時間間隔連續地重啟應用,直到重啟成功(有足夠多的空閑插槽就能重啟成功),
3.1.4.2 JobManager故障
比TaskManager失敗更具挑戰性的問題是JobManager失敗,
-
JobManager控制流應用程式的執行,并保存有關其執行的元資料,例如指向已完成檢查點的指標,
-
如果負責的JobManager行程失敗,流應用程式將無法繼續處理,
-
這使得JobManager成為Flink中的應用程式的一個單點失效組件(也就是如果這個組件失效,那么整個系統失效),
為了克服這個問題,Flink支持一種高可用模式,該模式可以在原始JobManager失效時將應用的管理權和應用的元資料 遷移到另一個JobManager,
Flink的高可用模式 基于 ZooKeeper
-
它是一個分布式系統,來提供分布式協調和共識服務,
-
Flink使用ZooKeeper進行領袖選舉,并將其作為一個高可用性和持久的資料存盤,
-
在高可用性模式下操作時,JobManager將JobGraph和所有必需的元資料(如應用程式的JAR檔案)寫入遠程持久存盤系統,
-
此外,JobManager將一個指向存盤位置的指標 寫入ZooKeeper的資料存盤中,
-
在應用程式執行期間,JobManager接收各個任務檢查點的狀態句柄(存盤位置),當檢查點完成后,JobManager將狀態寫入遠程存盤,并將指向此遠程存盤位置的指標寫入ZooKeeper,
-
因此,從JobManager故障中恢復所需的所有資料都存盤在遠程存盤中,而ZooKeeper持有指向存盤位置的指標,
-
圖3-3說明了這種設計,

當JobManager失敗時,接管它作業的新JobManager執行以下步驟:
- 它從ZooKeeper請求存盤位置,然后從遠程存盤中獲取JobGraph、JAR檔案和應用程式最后一個檢查點的存盤位置,
- 它向ResourceManager請求處理槽以繼續執行應用程式,
- 它將重新啟動應用程式,并將其所有任務的狀態重置為檢查點中的狀態值,
最后還有一個問題,當TaskManager或者JobManager失效時,誰會觸發它們的重啟?
- 在容器環境(如Kubernetes)中作為庫部署運行應用程式時,失敗的JobManager或TaskManager容器通常由容器編排服務自動重新啟動,
- 在YARN或Mesos上運行時,Flink的其余行程將觸發JobManager或TaskManager行程的重新啟動,
3.2 Flink中的資料傳輸
在運行程序中,應用的任務不斷地交換資料,TaskManager 負責將資料從發送任務發送到接收任務,TaskManager的網路組件在發送記錄之前在緩沖區中收集記錄,就是說,記錄不是一個一個發送的,而是先快取到緩沖區中然后一批一批發送,這種技術是有效使用網路資源和實作高吞吐量的基礎,
每個TaskManager都有一個 網路緩沖池(默認大小為32 KB)用于發送和接收資料,
- 如果發送方任務和接收方任務在不同的TaskManager行程中運行,則它們通過網路通信,
- 每對TaskManager維護一個永久的TCP連接來交換資料,
- 使用shuffle連接模式時,每個發送方任務都需要能夠向每個接收方任務發送資料,TaskManager需要為每個接收任務提供一個專用的網路緩沖區,此任務對應的發送方會向該緩沖區發送資料,
圖3-4顯示了這個架構,

- 在shuffle連接模式下,由于接收端的并行度為4,所以每個發送端都需要4個網路緩沖區來向接收端任務發送資料
- 由于發送端的并行度也是4,所以每個接收端也都需要4個網路緩沖區來接受發送端發送的資料
- 同一個TaskManager中的快取區會共用同一條網路連接
- 在shuffle模式或者broadcast模式下,需要的緩沖區的大小將是并行度的平方級
- Flink的網路緩沖區的默認配置對于中小型的設定是足夠的,
當發送方任務和接收方任務在同一個TaskManager行程中運行時
- 發送方任務將傳出的記錄序列化到緩沖區中,并在緩沖區填滿后將其放入佇列中,
- 接收任務從佇列中獲取緩沖區,并對傳入的記錄進行反序列化,
- 因此,在同一TaskManager上運行的任務之間的資料傳輸不會導致網路通信,
3.2.1 基于信用值的流量控制
通過網路連接發送單條記錄很低效,并且造成很大的開銷,緩沖是充分利用網路連接的帶寬的關鍵,在流處理背景關系中,緩沖的一個缺點是增加了延遲,因為記錄是在緩沖區中收集的,而不是立即發送的,
Flink實作了一個基于信用值的流控制機制,其作業原理如下,
- 接收任務向發送任務 授予一定的信用值,也就是告訴發送端為了接收其資料,我為你保留的緩沖區的大小
- 一旦發送方收到信用值通知,就會在信用值允許范圍內盡可能多的傳輸緩沖資料,并會附帶上積壓量大小(已經填滿準備傳輸的網路緩沖數目)
- 接收方使用預留的緩沖來處理發送的資料,同時依據各發送端的積壓量資訊來計算所有發送方在下一輪的信用值分別是多少,
基于信用值的好處
- 基于信用的流控制減少了延遲,因為一旦接收方有足夠的資源接受資料,發送方就可以發送資料,
- 此外,在資料分布不均的情況下,它是一種有效的分配網路資源的機制,因為信用是根據發送方的積壓的大小授予的,
- 因此,基于信用的流控制是Flink實作高吞吐和低延遲的重要一環,
3.2.2 任務鏈接
Flink提供了一種被稱為任務鏈接的優化技術,它可以減少特定條件下本地通信的開銷,
- 為了滿足任務鏈接的要求,被鏈接的所有算子必須配置相同的并行性,并通過本地轉發通道進行連接,
- 圖3-5所示的操作管道滿足這些要求,它由三個算子組成,它們都被配置為任務并行度為2,并與本地轉發連接連接,

圖3-6描述了如何在任務鏈接模式下執行管道,
- 多個算子函式被融合到單個任務中,由單個執行緒執行,
- 通過一個簡單的方法呼叫,一個函式產生的記錄被單獨地移交給下一個函式,
- 因此,在函式之間傳遞記錄基本上沒有序列化開銷和沒有通信開銷,

Flink在默認情況下會開啟任務鏈接,但是也可以通過配置關閉這個功能
3.3 事件時間處理
正如上一節所述,事件時間語意會生成可重復且一致性的結果,這是許多流應用的剛性需求,下面,我們將描述Flink如何在內部實作和處理事件時間戳和水位線,以支持具有事件時間語意的流應用,
3.3.1 時間戳
Flink事件時間流應用處理的所有記錄都必須帶時間戳,時間戳將記錄與特定的時間點關聯起來,通常是記錄所表示的事件發生的時間點,此外,在現實環境中,時間戳亂序幾乎不可避免,
當Flink以事件時間模式處理資料流時,它會根據記錄的事件時間戳來觸發基于時間的算子操作,
- 例如,時間視窗運算子根據相關的時間戳將記錄分配給視窗,
- Flink將時間戳編碼為8位元組長的Long值,并將它們作為元資料附加到記錄中,
- 然后內置算子或者用戶自定義的算子決議這個Long值就可以獲得事件時間,
3.3.2 水位線
水位線用于標注事件時間應用程式中每個任務當前的事件時間,
- 基于時間的運算子使用這段時間來觸發相關的計算計算并推動這個流進行,
- 例如,基于時間視窗的任務會在水位線超過視窗邊界的時候觸發計算并且發出結果
在Flink中,水位線被實作為一種帶時間戳的特殊記錄,如圖3-8所示,水位線像常規記錄一樣在資料流中移動,

水位線有兩個基本特性:
- 水位線必須是單調遞增的,以確保任務的事件時間時鐘是前進的,而不是向后的,
- 水位線與記錄的時間戳存在關系,一個時間戳為T的水位線表示:所有后續記錄的時間戳都應該大于T,
第二個屬性用于處理資料流中時間戳亂序的記錄,例如圖3-8中具有時間戳2和5的記錄,
- 基于時間的算子任務可能會處理帶有無序時間戳的記錄,每個任務都會維護一個自己的事件時鐘,并通過時間戳來更新這個時鐘,
- 任務有可能接收到違反水位線屬性且時間戳 小于先前接收的水位線的記錄,該記錄所屬的計算可能已經完成,這樣的記錄稱為遲到記錄,
水位線的一個意義是,它們允許應用控制結果的完整性和延遲,
3.3.3 水位線傳播和事件時間
在本節中,我們將討論算子如何處理水位線,
- Flink將水位線實作為算子任務 接收和發出的特殊記錄,
- 任務內部的時間服務會維護一些計時器(Timer),任務可以在計時器服務上注冊計時器,以便將來在特定的時間點執行計算,這些計時器依靠收到的水位線來激活,
- 例如,視窗運算子為每個活動視窗注冊一個計時器,當事件時間超過視窗的結束時間時,計時器將清除視窗的狀態,
當一個任務收到水位線時,會發生以下操作:
- 任務根據水位線的時間戳 更新其內部事件時間時鐘,
- 任務的時間服務根據更新后的時鐘來執行那些超時計時器的回呼,對于每個過期的計時器,任務將呼叫一個回呼函式,該函式可以執行計算并發出記錄,
- 任務根據更新后的時鐘向下游任務發送水位線,
考慮到任務并行,我們將詳細介紹一個任務如何將水位線發送到多個下游任務,以及它從多個上游任務獲取水位線之后如何推動事件時間時鐘前進,具體的方式如下
- 任務為每個輸入磁區 維護 磁區水位線,
- 當它從一個磁區接收到水位線時,它將相應的磁區水位線 更新為接收值和當前值的最大值,
- 隨后,任務將其內部事件時間時鐘 更新為所有磁區水印的最小值,
- 如果事件時間時鐘前進,任務處理所有觸發的計時器,最后通過向所有連接的輸出磁區 發出更新后的水位線,向所有下游任務廣播它的新事件時間,
下圖舉了一個有4個輸入磁區和3個輸出磁區的任務在接受到水位線之后是如何更新它的磁區水位線和事件時間時鐘的,

Flink的水位線傳播演算法確保算子任務所發出帶時間戳的記錄和水位線一定會對齊,
- 然而,它依賴于這樣一個事實,即所有的磁區都不斷地提供自增的水位線,
- 一旦一個磁區不推進它的水位線,或者變成完全空閑而不再發送任何記錄和水位線,任務的事件時間時鐘將不會推進,進而導致計時器不會觸發,
- 因此,如果一個任務沒有定期從所有輸入任務接收到新的水位線,那么任務的處理延遲和狀態大小會顯著增加,
對于具有兩個輸入流且水位線差距很大的算子,也會出現類似的效果,具有兩個輸入流的任務的事件時間時鐘將受制于較慢的流,通常較快的流的記錄或中間結果將處于緩沖狀態,直到事件時間時鐘允許處理它們,
3.3.4 時間戳分配和水位線生成
下面介紹時間戳和水位線是如何產生的,
時間戳和水位線通常是在流應用接收資料流時 分配和生成的,Flink DataStream應用可以通過三種方式完成該作業
- 在資料源完成:當一個流被讀入到一個應用中時,資料源算子將產生帶有時間戳的記錄流,水位線可以作為特殊記錄在任何時間點發出,如果資料源暫時不再發出水位線了,可以將自己宣告為空閑,Flink會在后續算子計算水位線時將那些來自空閑資料源的流磁區排除在外,
- 周期性分配器(Periodic Assigner):這個Assigner可以從每個記錄中提取一個時間戳,并定期查詢當前的水位線,提取到的時間戳被分配給相應的記錄,所查詢的水印被加入到流中,
- 定點分配器(Punctuated Assigner):它可以用于根據特殊輸入記錄來生成水位線
3.4 狀態管理
大多數流應用是有狀態的,許多算子不斷讀取和更新某種狀態,不管是內置狀態還是用戶自定義狀態,Flink的處理方式都是一樣的,
在本節中,我們將討論
- Flink支持的不同型別的狀態,
- 狀態后端如何存盤和維護狀態
- 有狀態應用程式如何通過進行狀態再分配來實作擴縮容,
通常,需要任務去維護并用于計算結果的資料都屬于任務的狀態,圖3-10顯示了任務與其狀態之間的典型互動,

- 任務接收一些輸入資料,
- 在處理資料時,任務可以讀取和更新其狀態,
- 并根據其輸入資料和狀態計算其結果,
然而,高效可靠的狀態管理更具挑戰性,這包括處理非常大的狀態(可能超過記憶體),并確保在發生故障時不會丟失任何狀態,所有與狀態一致性、故障處理、高效存盤和訪問相關的問題都由Flink處理,以便開發人員能夠將重點放在應用程式的邏輯上,
在Flink中,狀態總是與一個特定的算子相關聯,為了讓Flink的運行時知道算子有哪些狀態,算子需要對其狀態進行注冊,根據作用域的不同,有兩種型別的狀態:算子狀態和鍵值磁區狀態
3.4.1 算子狀態
算子狀態的作用域為算子的單個任務,這意味著由同一并行任務之內的記錄都可以訪問同一狀態,算子狀態不能被其他任務訪問,如下圖

Flink為算子狀態提供了三類原語
- 串列狀態:將狀態表示為一個條目串列
- 聯合串列狀態:同樣將狀態表示為一個條目串列,但是,在出現故障或從保存點啟動應用程式時,它的恢復方式與常規串列狀態不同,
- 廣播狀態:專門為哪些需要保證算子的每個任務狀態都相同的場景而設計
3.4.2 鍵值磁區狀態
鍵值磁區狀態是根據算子輸入記錄中定義的鍵來維護和訪問的,Flink為每個鍵維護一個狀態實體,該狀態實體總是位于那個處理對應鍵值記錄的任務上,當任務處理一個記錄時,它自動將狀態訪問范圍限制到當前記錄的鍵,因此,具有相同鍵值磁區的所有記錄都訪問相同的狀態,圖3-12顯示了任務如何與鍵值磁區狀態互動,

鍵值磁區狀態是一個在算子的所有并行任務上進行磁區的分布式鍵值映射,鍵值磁區狀態原語如下
- 單值狀態:為每個鍵存盤一個任意型別的值,該值可以是一個任意復雜的資料結構,
- 串列狀態:為每個鍵儲存一個串列,串列條目可以是任意型別,
- 映射狀態:為每個鍵存盤鍵值映射,映射的鍵和值可以是任意型別,
3.4.3 狀態后端
為了確保快速的狀態訪問,每個并行任務都在本地維護其狀態,至于狀態的具體存盤、訪問和維護,則由一個稱為狀態后端的可拔插組件來完成,
狀態后端負責兩件事:
- 本地狀態管理
- 將狀態以檢查點的形式寫入遠程存盤
對于本地狀態管理,Flink提供兩種實作
- 第一種狀態后端,將狀態作為存盤在JVM堆記憶體資料結構中的物件進行管理,
- 第二種狀態后端,序列化狀態物件并將它們放入RocksDB中,這種方式是基于硬碟的,
- 雖然第一種實作提供非常快的訪問速度,但它受到記憶體空間大小的限制,訪問RocksDB會比較慢,但是空間大,
狀態檢查點很重要,因為Flink是一個分布式系統,狀態只能在本地維護,TaskManager行程可能在任何時間點失敗,因此,它的存盤必須被認為是易失的,狀態后端負責將任務的狀態檢查點指向遠程和持久存盤,用于檢查點的遠程存盤可以是分布式檔案系統或資料庫系統,狀態后端在狀態檢查點的方式上有所不同,例如,RocksDB狀態后端支持增量檢查點,這可以顯著減少非常大的狀態的檢查點開銷,
3.4.4 有狀態的算子的擴縮容
流應用的一個基本需求是根據輸入速率的增加或減少而調整算子的并行性,有狀態算子,調整并行度比較難,因為我們需要把狀態重新分組,分配到與之前數量不等的并行任務上,
3.4.4.1 帶有鍵值磁區狀態的算子擴縮容
帶有鍵值磁區狀態的算子可以通過將鍵重新劃分來進行任務的擴縮容,但是,為了提高效率,Flink不會以鍵為單位來進行劃分,相反,Flink以鍵組作為單位來重新分配,每個鍵組里面包含了多個鍵,

3.4.4.2 帶有算子串列狀態的算子擴縮容
帶有算子串列狀態的算子在擴縮容時會對串列中的條目進行重新分配,理論上來說,所有并行任務的串列項會被統一收集起來,并再均勻重新分配,如果串列項的數量少于算子的新并行度,一些任務將以空狀態開始,圖3-14顯示了運算子串列狀態的重新分配,

3.4.4.3 帶有算子聯合狀態的算子擴縮容
帶有算子聯合狀態的算子會在擴縮容時把狀態串列中的全部條目 廣播到全部任務中,然后,任務自己來選擇使用哪些項和丟棄哪些項,如圖3-15顯示,

3.4.4.4 帶有算子廣播狀態的算子擴縮容
帶有算子廣播狀態的算子在擴縮容時會把狀態拷貝到全部新任務上,這樣做是因為廣播狀態要確保所有任務具有相同的狀態,在縮容的情況下,直接簡單地停掉多余的任務即可,如圖3-16顯示,

3.5 檢查點、保存點、狀態恢復
Flink是一個分布式的資料處理系統,且任務在本地維護它們的狀態,Flink必須確保這種狀態不會丟失,并且在發生故障時保持一致,
在本節中,我們將介紹Flink的檢查點和故障恢復機制,看一下它們是如何提供精確一次的狀態一致性保障,此外,我們還討論了Flink獨特的保存點(savepoint)功能,它就像一把瑞士軍刀,解決了運行流式應用程序中的諸多難題,
3.5.1 一致性檢查點
有狀態流應用程式的一致檢查點是在所有任務都處理完等量的原始輸出后對全部任務狀態進行的一個拷貝,我們可以通過一個樸素演算法來對應用建立一致性檢查點的程序進行解釋,樸素演算法的步驟為:
- 暫停接收所有輸入流,
- 等待所有流入系統的資料被完全處理,即所有任務已經處理完所有的輸入資料,
- 將所有任務的狀態復制到遠程持久存盤,生成檢查點,當所有任務拷貝完成后,檢查點就完成了
- 恢復接收所有輸入流,
下圖展示了一個一致性檢查點的例子,這個演算法讀取資料,然后對奇數和偶數分別求和

3.5.2 從一致性檢查點中恢復
在流應用執行期間,Flink周期性為應用程式生成檢查點,一旦發生故障,Flink會使用最新的檢查點將應用狀態恢復到某個一致性的點并重啟應用,圖3-18顯示了恢復程序,

應用程式恢復分為三個步驟:
- 重啟整個應用程式,
- 將所有狀態重置為最新的檢查點,
- 恢復所有任務的處理,
假設所有算子都將它們的狀態寫入檢查點并從中恢復,并且所有輸入流的消費位置都能重置到檢查點生成那一刻,那么這種檢查點和恢復機制可以為整個應用提供精確一次的一致性保障,輸入流是否可以重置,取決于它的具體實作以及所消費外部系統是否提供相關介面,例如,像Apache Kafka這樣的事件日志可以從之前的某個偏移讀取記錄,相反,如果是從socket消費而來則無法重置,因為socket一旦消耗了資料就會丟棄資料,
我們必須指出,Flink的檢查點和恢復機制只能重置流應用內部的狀態,根據應用所采用的資料匯算子,在恢復期間,某些結果記錄可能被多次發送到下游系統,例如事件日志、檔案系統或資料庫,對于某些存盤系統,Flink提供的資料匯可以保證了精確一次輸出,
3.5.3 Flink檢查點演算法
Flink基于Chandy-Lamport的分布式快照演算法來實作檢查點,該演算法并不會暫停整個應用程式,在部分任務持久化狀態的程序中,其他任務可以繼續執行,
Flink的檢查點演算法使用一種稱為檢查點分隔符的特殊型別的記錄,它與水位線類似,檢查點分隔符攜帶一個檢查點ID來標識它所屬的檢查點,分隔符從邏輯上將流分割為兩個部分,由檢查點之前的記錄 引起的所有狀態修改都包含在分隔符對應的檢查點中,而由屏障之后的記錄引起的所有修改都不包含在分隔符對應的檢查點中,
下面我們通過一個簡單的例子來解釋這個演算法
我們使用一個簡單的流應用程式示例逐步解釋該演算法,應用程式由兩個資料源任務組成,每個資料源任務消耗一個不斷增長的數字流,資料源任務的分別輸出奇數磁區和偶數磁區,每個磁區都由一個任務處理,該任務計算所有接收到的數字的總和,并將更新后的總和發送給下游資料匯,該應用程式如圖3-19所示,

JobManager通過向每個資料源任務 發送一個新的帶有檢查點編號的訊息來啟動檢查點生成流程,如圖3-20所示,

當資料源任務接收到檢查點訊息時,
- 它暫停處理資料流,并利用狀態后端 生成本地狀態的檢查點,并發送到遠程存盤
- 把該檢查點分隔符廣播至所有下游任務,
- 狀態后端會在檢查點保存好之后通知TaskManager,TaskManager會給JobManager發送確認訊息,
- 在發出了分隔符之后,資料源將恢復正常的作業狀態,
- 如下圖所示

資料源發出的檢查點分隔符被廣播給下游任務,當下游任務接收到新的檢查點分隔符時,將繼續等待來自所有其他上游任務的分隔符到達檢查點,在等待期間,它繼續處理那些尚未提供分隔符的上游任務的記錄,而那些提供了分隔符的上游任務的記錄會被快取,等待稍后處理,等待所有檢查點到達的程序稱為檢查點對齊,如圖3-22所示,

一旦一個任務從它的所有上游任務收到分隔符,它就會讓狀態后端生成一個檢查點,并將檢查點分隔符廣播給它的所有下游任務,如圖3-23所示,

在發出檢查點分隔符后,任務就開始處理緩沖的記錄,在處理完所有緩沖記錄之后,任務會繼續處理其輸入流,圖3-24顯示了此時的應用程式,

最后,檢查點分隔符到達資料匯,當資料匯接收到分割符時,會先進行對齊操作,然后將自身狀態寫入檢查點,并向JobManager確認接收到該分隔符,一旦應用的所有任務都發送了檢查點確認,JobManager就會將應用程式的檢查點記錄為已完成,圖3-25顯示了檢查點演算法的最后一步,如前所述,已完成的檢查點可用于從故障中恢復應用,

3.5.4 檢查點對性能的影響
Flink的檢查點演算法從流應用中產生一致的分布式檢查點,而不會停止整個應用,但是,它會增加應用的處理延遲,Flink實作了一些調整,可以在某些條件下減輕性能影響,
任務在將其狀態寫入檢查點的程序中,將被阻塞,一種好的方法是先將檢查點寫入本地,然后任務繼續執行它的常規處理,另一個行程負責將檢查點傳到遠端存盤,
此外,還可以在分隔符對齊的程序中不快取那些已經收到分隔符所對應磁區的記錄,而是直接處理,但這會讓一致性保證從精確一次降低到至少一次
3.5.5 保存點
Flink最有價值和最獨特的功能之一是保存點,原則上,保存點的生成演算法與檢查點生成演算法一樣,因此可以把保存點看作是帶有一些額外元資料的檢查點,Flink不會自動生成保存點,而是需要用戶顯式的呼叫來生成保存點,
3.5.5.1 保存點的使用
給定一個應用和一個兼容的保存點,我們可以從該保存點啟動應用,這將把應用的狀態初始化為保存點的狀態,并從獲取保存點的位置運行應用,
保存點可以用在很多情況
- 可以從保存點啟動一個不同但兼容的應用程式,這意味著可以修復一些小bug之后從保存點重啟
- 可以使用不同的并行度啟動原應用
- 可以在不同的集群上啟動原應用
- 可以使用保存點暫停應用程式并在稍后恢復它,這樣就可以為其他高優先級的應用騰出集群資源
- 可以用保存點來完成歸檔操作
3.5.5.2 從保存點啟動應用
在本節中,我們將描述Flink在從保存點啟動時如何去初始化應用狀態,
一個典型的應用程式包含多個狀態,它們分布在不同算子的不同任務上,
下圖顯示了一個具有三個算子的應用程式,每個算子各運行兩個任務,其中一個算子(OP-1)有一個算子狀態(OS-1),另一個算子(OP-2)有兩個鍵值磁區狀態(KS-1和KS-2),當生成保存點時,所有任務的狀態都會被復制到一個持久化存盤位置上,

保存點中的狀態副本會按照算子識別符號和狀態名稱進行組織,該算子識別符號和狀態名需要能夠將保存點的狀態資料映射到應用啟動后的狀態上,當從保存點啟動應用程式時,Flink將保存點資料重新分發給相應算子的任務,
如果應用發生了修改,只有那些算子識別符號和狀態名稱沒變的狀態副本才能被成功還原,默認情況下,Flink會分配唯一的算子識別符號,但是,算子的識別符號是基于其前面算子的識別符號生成的,這樣,假如上游的算子識別符號發生了變化,那么下游的算子也會變化,因此,我們強烈建議為運算子手動分配唯一識別符號,而不依賴于Flink的默認賦值,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/228947.html
標籤:大數據
