1:搭建Flintk所需的組件:
這些組件是:JobManager、ResourceManager、TaskManager和Dispatcher, (JVM)

JobManager:
作為主行程(masterprocess) , JobManager控制著單個應用程式的執行,換句話說,每個應用都由一個不同的JobManager掌控,(JobManager還要負責所有需要集中協調的操作,如創建檢查點,建立檢查點、保存點及狀態恢復)
簡單業務描述:JM從RM申請執行任務的必要資源(TM處理槽)),一旦JM收到了足夠的TM處理槽,就會將ExcecutionGraph(執行任務圖,JobGraph有JM轉換為ExecutionGraph)中的任務分發給TM來執行

ResourcManager:
負責管理Flink的處理資源單元一一TaskManager處理槽,(針對不同的環境和資源提供者(如YARN、Mesos、Kubernetes或獨立部署), Flink提供了不同的ResourceManager)
簡單業務描述:當JM向RM申請執行資源時,RM會指示擁有的空閑TM處理槽將提供給JM,如果RM的處理槽數量無法滿足JM請求時,則RM可以喝資源給提供者通信,讓他們提供額外容器來啟動TM行程,RM還負責終止空閑的TM以釋放計算資源

TaskManager:
TaskManager是Flink的作業行程(workerprocess),通常在Flink搭建程序中要啟動多個TaskManager,每個TaskManager提供一定數量的處理槽,處理槽的數目限制了一個TaskManager可執行的任務數
業務簡單描述:TM向RM注冊處理槽,當接收到RM的指示時,TM向JM提供一個或者多個處理槽,JM向處理槽分配執行任務,同一應用不同任務的TM之間進行資料交換

Dispatcher:
Dispatcher會跨多個作業運行
業務簡單描述:它提供了一個REST介面來讓我們提交需要執行的應用,一旦某個應用提交執行,Dispatcher會啟動一個JobManager并將應用轉交給它,
**REST介面代表:**Dispatcher集群的HTTP人口可以受到防火墻的保護,
**Dispatcher同時還會啟動一個WebUI:**用來提供有關作業執行的資訊,
某些應用提交執行的方式(我們會在“應用部署”一節討論)可能用不到Dispatcher,

也可有多個組件運行在同一個JVM行程中;獨立集群設定下沒有資源提供者,因此RM只能分配現有的TM吹超無法自己啟動新的TM;DP可以啟動JM
2:Flink中的資料傳輸
在運行程序中,應用的任務會持續進行資料交換,TaskManager負責將資料從發送任務傳輸至接收任務,它的網路模塊在記錄傳輸前會先將它們收集到緩沖區中,換言之,記錄并非逐個發送的,而是在緩沖區中以批次形式發送,
作用:該技術是1:有效利用網路資源:2:實作高吞吐的基礎,它的機制類似于網路以及磁盤I/O協議中的緩沖技術,
每個TaskManager都有一個用于收發資料的網路緩沖池(每個緩沖默認32KB大小),
- 如果發送端和接收端的任務運行在不同的TaskManager行程中,它們就要用到作業系統的網路堆疊進行通信,
- 當發送任務和接收任務處于同一個TaskManager行程時,發送任務會將要發送的記錄序列化到一個位元組緩沖區中,一旦該緩沖區占滿就會被放到一個佇列里,接收任務會從這個佇列里獲取緩沖區并將其中的記錄反序列化,這意味著罔一個TaskManager內不同任務之間的資料傳輸不會涉及網路通信,
流式應用需要以流水線方式交換資料,因此每對TaskManager之間都要維護一個或多個永久的TCP連接來執行資料交換,在Shuffle(隨機)連接模式下,每個發送端任務都需要向任意一個接收任務傳輸資料,對于每一個接收任務,TaskManager都要提供一個專用的網路緩沖區,用于接收其他任務發來的資料

1:Flink采用多種技術來降低任務之間的通信開銷
基于信用值的流量控制
**問題:**通過網路連接逐條發送記錄不但低效,還會導致很多額外開銷,若想充分利用網路連接帶寬,就需要對資料進行緩沖,在流處理環境下,緩沖的一個明顯缺點是會增加延遲,因為記錄首先要收集到緩沖區中而不會立即發送,
**流量控制機制:**接收任務會給發送任務授予一定的信用值,其實就是保留一些用來接收它資料的網路緩沖,一旦發送端收到信用通知,就會在信用值所限定的范圍內盡可能多地傳輸緩沖資料,并會附帶上積壓量(已經填滿準備傳輸的網路緩沖數目)大小,接收端使用保留的緩沖來處理收到的資料,同時依據各發送端的積壓量資訊來計算所有相連的發送端在下一輪的信用優先級,
**補充:**由于發送端可以在接收端有足夠資源時立即傳輸資料,所以基于信用值的流量控制可以有效降低延遲,
如圖所示在 Flink 層面實作反壓機制,就是每一次 ResultSubPartition 向 InputChannel 發送訊息的時候都會發送一個 backlog size 告訴下游準備發送多少訊息,下游就會去計算有多少的 Buffer 去接收訊息,算完之后如果有充足的 Buffer 就會返還給上游一個 Credit 告知他可以發送訊息(圖上兩個 ResultSubPartition 和 InputChannel 之間是虛線是因為最侄訓是要通過 Netty 和 Socket 去通信),下面我們看一個具體示例,

假設我們上下游的速度不匹配,上游發送速率為 2,下游接收速率為 1,可以看到圖上在 ResultSubPartition 中累積了兩條訊息,10 和 11, backlog 就為 2,這時就會將發送的資料 <8,9> 和 backlog = 2 一同發送給下游,下游收到了之后就會去計算是否有 2 個 Buffer 去接收,可以看到 InputChannel 中已經不足了這時就會從 Local BufferPool 和 Network BufferPool 申請,好在這個時候 Buffer 還是可以申請到的,

過了一段時間后由于上游的發送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已經到達了申請上限,這時候下游就會向上游回傳 Credit = 0,ResultSubPartition 接收到之后就不會向 Netty 去傳輸資料,上游 TaskManager 的 Buffer 也很快耗盡,達到反壓的效果,這樣在 ResultSubPartition 層就能感知到反壓,不用通過 Socket 和 Netty 一層層地向上反饋,降低了反壓生效的延遲,同時也不會將 Socket 去阻塞,解決了由于一個 Task 反壓導致 TaskManager 和 TaskManager 之間的 Socket 阻塞的問題,

任務鏈接
Flink采用一種名為任務鏈接的優化技術來降低某些情況下的本地通信開銷,任務鏈接的前提條件是,多個算子必須有相同的并行度且通過本地轉發通道( local forward channel)相連,(流水線應用反而不希望用到任務鏈接)

圖3-6展示了流水線如何在任務鏈接模式下執行,多個算子的函式被“融合”到同一個任務中,在同一個執行緒內執行,函式生成的記錄只需通過簡單的方法呼叫就可以分別發往各自的下游函式,因此函式之間的記錄傳輸基本上不存在序列化及通信開銷,

雖然任務鏈接可以有效地降低本地任務之間的通信開銷,但有的流水線應用反而不希望用到它,舉例而言,有時候我們需要對過長任務鏈接進行切分或者將兩個計算量大的函式分配到不同的處理槽中,

3:事件時間處理
處理時間是基于處理機器的本地時間,相對容易理解,但它會產生一些較為隨意、不一致且無法重現的結果,相反,事件時間語意會生成可重現且一致性的結果,這也是很多流處理用例的剛性需求,但和基于處理時間語意的應用相比,基于事件時間的應用需要一些額外的配置,此外,相比純粹使用處理時間的引擎,支持事件時間的流處理引擎內部要更加復雜,
Flink不僅針對常見的事件時間操作提供了直觀易用的原語,還支持一些表達能力很強API,允許使用者以自定義算子的方式實作更高級的事件時間處理應用,在面對這些高級應用時,充分理解Flink內部事件處理機制通常會有所幫助,有時候更是必要的,
接下面我們會介紹Flink內部如何實作和處理時間戳及水位線以支持事件時間語意的流式應用,
1:時間戳::
在事件時間模式下,Flink流式應用處理的所有記錄都必須包含時間戳,時間戳將記錄和特定時間點進行關聯,這些時間點通常是記錄所對應事件的發生時間,但實際上應用可以自由選擇時間戳的含義,只要保證流記錄的時間戳會隨著資料流的前進大致遞增即可,
注:這里有兩種方式來分配時間戳和生成水印:
1:直接在資料流源中進行,
:2:通過timestamp assigner和watermark generator生成:在Flink中,timestamp分配器也定義了用來發射的水印,
2:水位線
1:當Flink以事件時間模式處理資料流時,會根據記錄的時間戳觸發時間相關算子的計算,例如,時間視窗算子會根據記錄關聯的時間戳將其分配到視窗中,Flink內部采用8位元組的Long值對時間戳進行編碼,并將它們以元資料(metadata)的形式附加在記錄上,內置算子會將這個Long值決議為毫秒精度的Unix時間戳(自1970-01-01-00:00:00.000以來的毫秒數),但自定義算子可以有自己的時間戳決議機制,如將精度調整為微秒,
2:除了記錄的時間戳,Flink基于事件時間的應用還必須提供水位線(watermark),水位線用于在事件時間應用中推斷每個任務當前的事件時間,基于時間的算子會使用這個時間來觸發計算并推動進度前進,
例如:基于時間視窗的任務會在其事件時間超過視窗結束邊界時進行最終的視窗計算井發出結果,
在Flink中,水位線是利用一些包含Long值時間戳的特殊記錄來實作的,如圖3-8所示,它們像帶有額外時間戳的常規記錄一樣在資料流中移動,
Watermark是一種告訴Flink一個訊息延遲多少的方式,它定義了什么時候不再等待更早的資料,
可以把Watermarks理解為一個水位線,這個Watermarks在不斷的變化,Watermark實際上作為資料流的一部分隨資料流流動,
當Flink中的運算子接收到Watermarks時,它明白早于該時間的訊息已經完全抵達計算引擎,即假設不會再有時間小于水位線的事件到達,
這個假設是觸發視窗計算的基礎,只有水位線越過視窗對應的結束時間,視窗才會關閉和進行計算

3:
水位線擁有兩個基本屬性:
1.必須單調遞增,這是為了確保任務中的事件時間時鐘正確前進,不會倒退,
2.和記錄的時間戳存在聯系,一個時間戳為T的水位線表示,接下來所有記錄的時間戳一定都大于T,
第二個屬性可用來處理資料流中時間戳亂序的記錄,例如圖3-8中的時間戳為3和5的記錄,對基于時間的算子任務而言,其收集和處理的記錄可能會包含亂序的時間戳,這些算子只有當自己的事件時間時鐘(由接收的水位線驅動)指示不必再等那些包含相關時間戳的記錄時,才會最終觸發計算,當任務收到一個違反水位線屬性,即時間戳小于或等于前一個水位線的記錄時,該記錄本應參與的計算可能已經完成,我們稱此類記錄為遲到記錄(laterecord),
水位線的意義之一在于它允許應用控制結果的完整性和延遲,
3:水位線傳播和事件時間
主要討論算子對水位線的處理方式:
Flink內部將水位線實作為特殊的記錄,它們可以通過算子任務進行接收和發送,任務內部的時間服務(timeservice)會維護一些計時器(timer),它們依靠接收到水位線來激活,這些計時器是由任務在時間服務內注冊,并在將來的某個時間點執行計算,例如:視窗算子會為每個活動視窗注冊一個計時器,它們會在事件時間超過視窗的結束時間時清理視窗狀態,
當任務接收到一個水位線時會執行以下操作:
1.基于水位線記錄的時間戳更新內部事件時間時鐘,
2.任務的時間服務會找出所有觸發時間小于更新后事件時間的計時器,對于每個到期的計時器,呼叫回呼函式,利用它來執行計算或發出記錄,
3.任務根據更新后的事件時間將水位線發出,
任務在收到一個新的水位線之后,將如何發送水位線和更新其內部事件時間時鐘?
Flink會將資料流劃分為不同的磁區,并將它們交由不同的算子任務來并行執行,每個磁區作為一個資料流,都會包含帶有時間戳的記錄以及水位線,根據算子的上下游連接情況,其任務可能需要同時接收來自多個輸入磁區的記錄和水位線,也可能需要將它們發送到多個輸出磁區,
任務如何將水位線發送至多個輸出任務,以及它從多個輸入任務獲取水位線后如何推動事件時間時鐘前進?
一個任務會為它的每個輸入磁區都維護一個磁區水位線(partitionwatermark),當收到某個磁區傳來的水位線后,任務會以接收值和當前值中較大的那個去更新對應磁區水位線的值,隨后,任務會把事件時間時鐘調整為所有磁區水位線中最小的那個值,如果事件時間時鐘向前推動,任務會先處理因此而觸發的所有計時器,之后才會把對應的水位線發往所有連接的輸出磁區,以實作事件時間到全部下游任務的廣播,
重點圖:



4:檢查點、保存點及狀態恢復
重點:檢查點的演算法,保存點的創建
Flink是一個分布式的資料處理系統,因此必須能夠處理一些故障,例如:行程被強制關閉、機器故障以及網路連接中斷,由于每個任務會把狀態、維護在本地,Flink要保證發生故障時狀態不丟不錯,
介紹Fli礎的檢查點(checkpoint)及故障恢復機制,看一下它們如何提供精確一次的狀態一致性保障,
1:一致性檢查點(資料備份,并行運行)
Flink的故障恢復機制需要基于應用狀態的一致性檢查點,有狀態的流式應用的一致性檢查點是對全部任務狀態進行的一個拷貝,可以通過一個樸素算怯對應用建立一致性檢查點的程序進行解釋,
樸素演算法的步驟包括:
1:暫停接收所有輸入流,
2:等待已經流入系統的資料被完全處理,即所有任務已經處理完所有的輸入資料,
3:將所有任務的狀態拷貝到遠程持久化存盤(時間長的原因),生成檢查點,在所有任務完成自己的拷貝作業后,檢查點生成完畢,(經過網路傳輸)
4:恢復所有資料流的接收,(注:Flink沒有實作這種樸素策略,而是使用了一種更加復雜的檢查點演算法)
該應用有一個資料源任務,負責從一個遞增數字(1、2、3…)流中讀取資料,數字流會被分成奇數流和偶數流,求和算子的兩個任務會分別對它們求和,資料源算子的任務會把輸入流的當前偏移量存為狀態;求和算子的任務會把當前和值存為狀態,在圖中,Flink會在輸入偏移到達5的時候生成一個檢查點,此時兩個和值分別為6和9.
input offset:輸入偏移量

2:從一致性檢查點中恢復
在流式應用執行程序中,Flink會周期性地為應用狀態生成檢查點,一旦發生故障,Flink會利用最新的檢查點將應用狀態恢復到某個一致性的點并重啟處理行程,整個恢復程序如下圖,

恢復到創建檢查點時的狀態—距離最近檢查點的狀態


應用恢復要經過3個步驟:
1:重啟整個應用,
2:利用最新的檢查點重置任務狀態,
3:恢復所有任務的處理,
如果所有算子都將它們全部的狀態寫入檢查點并從中恢復,并且所有輸入流的消費位置都能重置到檢查點生成那一刻,那么該檢查點和恢復機制就能為整個應用的狀態提供精確一次的一致性保障(精確一次,每個資料只處理一次,每個狀態被影響只有一次)
資料源能否重置其輸入流取決于它的具體實作以及所消費外部系統是否提供相關介面,
例如,類似ApacheKafka的事件日志系統就允許從之前的某個偏移讀取記錄,相反,如果資料流是從套接字(socket)消費而來則無法重置,因為套接字會在資料被取走后將包們丟棄,
應用從檢查點恢復以后,它的內部狀態會和生成檢查點的時候完全一致,隨后應用就會重新消費并處理那些從之前檢查點完成開始,到發生系統故障之間已經處理過的資料,雖然這意味著Flink會重復處理部分訊息,但上述機制仍然可以實作精確一次的狀態一致性,(故障時看似不精確,本質上精確,狀態處理一次)因為所有算子的狀態都會重置到過去還沒有處理過那些資料的時間點,
注:Flink的檢查點和恢復機制僅能重置流式應用內部的狀態,根據應用所采用的資料匯算子,在恢復期間,某些結果記錄可能會向下游系統(如事件日志系統、檔案系統或資料庫)發送多次,
對于某些存盤系統,Flink提供的資料匯函式支持精確一次輸出,例如在檢查點完成后才會把寫出的記錄正式提交,另一種適用于很多存盤系統的方法是冪等更新,
暫停執行,生成檢查點,然后恢復應用
3:Flink檢查點演算法
樸素方法中的“停止一切”的行為對于那些具有中等延遲要求的應用很不切實際,而Flink的檢查點是基于Chandy-Lamport分布式快照算怯來實作的,該演算法不會暫停整個應用,而是會把生成檢查點的程序和處理程序分離,這樣在部分任務持久化狀態的程序中,其他任務還可以繼續執行,
Flink的檢查點演算法中會用到一類名為檢查點分隔符(checkpointbarrier)的特殊記錄,和水位線類似,這些檢查點分隔符會通過資料源算子注入到常規的記錄流中,相對其他記錄,它們在流中的位置無法提前或延后,
為了標識所屬的檢查點,每個檢查點分隔符都會帶有一個檢查點編號,這樣就把一條資料流從邏輯上分成了兩個部分,所有先于分隔符的記錄所引起的狀態更改都會被包含在分隔符所對應的檢查點之中;而所有晚于分隔符的記錄所引起的狀態更改者目會被納入之后的檢查點中,
應用包含了兩個資料源任務,每個任務都會各自消費一條自增數字流,資料源任務的輸出會被分成奇數流和偶數流兩個部分,每一部分都會有一個任務負責對收到的全部數字求和,并將結果值更新至下游資料匯,

JobManager會向每個資料擁任務發送一個新的檢查點編號,以此來啟動檢查點生成流程,

當一個資料源任務收到訊息后,會暫停發出記錄,利用狀態后端觸發生成本地狀態的檢查點,(此時的資料源,接收新的資料)并把該檢查點分隔符連同檢查點編號廣播至所有傳出的資料流磁區,狀態后端會在狀態存為檢查點完成后通知任務,隨后任務會給JobManager發送確認訊息,在將所有分隔符發出后,資料源將恢復正常作業,、

資料源任務發出的檢查點分隔符會傳輸到與之相連的任務,和水位線類似,檢查點分隔符總是以廣播形式發送,從而可以確保每個任務能從它們的每個輸入都收到一個分隔符,
當任務收到一個新檢查點的分隔符時,會繼續等待所有其他輸入磁區也發來這個檢查點的分隔符,在等待程序中,它會繼續處理那些從還未提供分隔符的磁區發來的資料,對于已經提供分隔符的磁區,它們新到來的記錄會被緩沖起來,不能處理,這個等待所有分隔符到達的程序稱為分隔符對齊,

任務在收齊全部輸入磁區發送的分隔符后,就會通知狀態后端開始生成檢查點,同時把檢查點分隔符廣播到下游相連的任務,

任務在發出所有的檢查點分隔符后就會開始處理緩沖的記錄,待所有緩沖的記錄處理完后,任務就會繼續處理輸入流,

任務在發出所有的檢查點分隔符后就會開始處理緩沖的記錄,待所有緩沖的記錄處理完后,任務就會繼續處理輸入流,

最終檢查點分隔符到達資料匯任務,資料匯任務在收到分隔符后會依次執行分隔符對齊,將自身狀態寫入檢查點,向JobManager確認已接收分隔符等一系列動作,JobManager在接收到所有應用任務回傳的檢查點確認訊息后,就會將此次檢查點標記為完成,

4:檢查點對性能的影響
雖然Flink的檢查點演算法能夠在不停止整個應用的情況下為流式應用生成一致的分布式檢查點,但它仍會增加應用處理延遲,Flink實作了一些調整策略,可以減輕某些條件下對性能的影響,
影響延遲的因素:
任務在將其狀態存入檢查點的程序中,會處于阻塞狀態,此時的輸入會進人緩沖區,由于狀態可能會很大,而且生成檢查點需要把這些資料通過網路寫入遠程存盤系統,該程序可能持續數秒,甚至數分鐘,
解決方式:本地拷貝、增量檢查點,(還有另外一種情況:不緩沖)
5:保存點
Flink的故障恢復算怯是基于狀態的檢查點來完成的,檢查點會周期性地生成,而且會根據配置的策略自動丟棄,檢查點的目的是保證應用在出現故障的時候可以順利重啟,因此當應用被手動停止后,檢查點也會隨之洗掉,
Flink最具價值且獨具一格的功能之一是保存點,原則上,保存點的生成演算法和檢查點完全一樣,因此可以把保存點看做包含一些額外元資料的檢查點,保存點的生成不是由Flink自動完成,而是需要由用戶(或外部調度器)顯式觸發,同時,Flink也不會自動清理保存點,

所有之前提到的保存點相關用例都遵循同一個模式,首先為正在運行的應用生成一個保存點,然后在應用啟動時用它去初始化狀態,

每個應用都會包含很多算子,而每個算子又可以定義一個或多個的鍵值或算子狀態,算子會在一個或多個任務上并行執行,因此一個典型的應用會包含多個狀態,它們分布在不同TaskManager行程內的算子任務上

保存點中的狀態副本會按照算子標識和狀態名稱進行組織,該算子標識和狀態名需要能將保存點的狀態資料映射到應用啟動后的算子狀態上,當應用從保存點啟動時,Flink會將保存點的資料分發到對應算子的任務上,
如果應用在從保存點啟動的時候發生過改動,那么保存點中的狀態只有在應用還保留著那些含有對應標識和狀態名稱的算子時才可以成功映射,默認情況下,Flink會給每個算子分配一個唯一標識,但該標識是根據前置算子的標識按照某種確定規則生成的,這意味著任何一個前置算子發生改變(例如添加或洗掉某個算子)都會導致該標識發生變化,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/286558.html
標籤:其他
上一篇:Docker實作SpringBoot專案的快速構建(一)
下一篇:再次學習vue的識訓(基礎篇)
