反壓機制
- Flink在1.5版本之前是基于TCP的流量控制和反壓的,
缺點:一個TaskManager執行的一個Task觸發反壓,該TaskManager和上游TaskManager的Socket就不能傳輸資料,從而影響到其他Task,也會影響到Barrier的流動,導致作業雪崩, - 在1.5版本之后,Flink是基于信用值的流量控制和反壓的,接收端會給發送端授予一定信用值,發送端一但接受到信用通知,就會在信用值所允許的范圍內盡可能多的向接收端發送資料,且附帶上發送端的積壓量,接收端會依據發送端的積壓量資訊來計算所有發送端在下一輪的信用值,然后再傳遞給發送端,
- 優點:精確的在Task粒度控制流量,輕量級且高效,
任務鏈機制

Flink是將一個Job拆成多Task,Task拆成一個或多個SubTask,然后將SubTask放到TaskManager中的Slot中進行運行,根據并行度的不同,一個算子會有一個或多個SubTask,為了使得程式運行的更快,Flink會將多個SubTask融合到一起放到一個Slot里且在同一執行緒里運行,這就是Flink中的任務鏈,而如何判斷SubTask是否能在一個任務鏈里,取決于以下幾個條件,
底層原始碼(1.12.0)
StreamingJobGraphGenerator類中createJobGraph->setChaining->createChain->isChainable->isChainableInput
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
return false;
}
// check that we do not have a union operation, because unions currently only work
// through the network/byte-channel stack.
// we check that by testing that each "type" (which means input position) is used only once
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
判斷條件:
1)上下游算子實體處于同一個SlotSharingGroup中;
2)下游算子的鏈接策略(ChainingStrategy)為ALWAYS——既可以與上游鏈接,也可以與下游鏈接,我們常見的map()、filter()等都屬此類;
3)上游算子的鏈接策略為HEAD或ALWAYS,HEAD策略表示只能與下游鏈接,這在正常情況下是Source算子的專屬;
4)兩個算子間的物理磁區邏輯是ForwardPartitioner(Flink DataStream有八種物理磁區邏輯);
5)兩個算子間的shuffle方式不是批處理模式;
6)上下游算子實體的并行度相同;
7)沒有禁用算子鏈,
處理槽共享組機制(Slot-Sharing Group)
Flink中每個算子都會屬于一個處理槽共享組,一個處理槽共享組所需要的處理槽數等于它內部算子的最大并行度,不同處理槽共享組內的算子,其任務會交給不同的處理槽進行,所有,一個Flink的Job所需要的處理槽等于每一個處理槽共享組內算子最大并行度相加,
默認情況下,所有算子都屬于“default”處理槽共享組,如果一個算子的所有輸入都屬于同一個處理槽共享組,則此算子會繼承這個處理槽共享組,如果一個算子的所有輸入都屬于不同一個處理槽共享組,則此算子會默認加入“default”這個處理槽共享組,
磁區機制
磁區機制就是為了使得每一個subtask知道我的資料將要發往哪一個subtask,Flink一共有8中磁區策略(詳情可查看原始碼,idea雙擊Shift,搜索磁區策略名稱就可以),
- GlobalPartitioner
將資料分發到下游算子第一個subtask - ShufferPartitioner
將資料隨機發放到下游算子的某一個subtask - BroadcastPartitioner
將資料發放給下游算子的每一個subtask - RebalancePartitioner
將資料以回圈的方式發放給下游一個個subtask(第一個資料會隨機發放給某一個subtask), - RescalePartitioner
基于上下游并行度,如果上游并行度為2,下游并行度為4,則上游一個subtask對應下游兩個subtask,然后把資料回圈發送給下游對應的subtask,如果上游并行度為4,下游并行度為2,則上游的兩個subtask對應下游的一個subtask,然后把資料發送給下游對應的subtask, - ForwardPartitioner
將資料分發到下游算子第一個subtask(此時上游的一個subtask對應下游的一個subtask,上游和下游的并行度之比肯定是1:1,要不然就不是ForwardPartitioner) - KeyGroupStreamPartitioner
根據key的分組索引選擇相應的subtask - CustomPartitionerWarpper
自定義磁區器,實作Partitioner介面,重寫partition方法進行自定義磁區,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423711.html
標籤:其他
