本文是閱讀 LinkedIn 公司2020年發表的論文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 一點筆記,
什么是Shuffle

以上圖為例,在一個DAG的執行圖中,節點與節點之間的資料交換就是Shuffle的程序,雖然Shuffle的程序很簡單,但是不同的引擎有不同的實作,
以shuffle資料傳輸的介質來看
- 有基于磁盤的shuffle,例如Map/Reduce ,Spark,Flink Batch中,上下游之前的資料都是需要落盤后來進行傳輸,這類通常是離線處理框架,對延遲不敏感,基于磁盤更加可靠穩定,
- 有基于記憶體的pipeline模式的shuffle方案,例如Presto/Flink Streaming中,主要是對時延比較敏感的場景,基于記憶體Shuffle,通過網路rpc直接傳輸記憶體資料
而基于本地磁盤的Shuffle實作中又有很多種不同的實作
- 有基于Hash的方案,每個map端的task為每個reduce task 產生一個 shuffle檔案
- 有基于Sort方案,每個map端的task按照 partitionId + hash(key) 排序,并最終merge成一個檔案以及一個index檔案,在reduce端讀取時根據每個task的index檔案來讀取相應segment的資料
以部署方式來看
- 有基于worker的本地shuffle的方案,直接通過worker來提供讀寫的功能
- 有基于external shuffle的實作,通常托管于資源管理框架,在Yarn框架中就可以實作這種輔助服務,這樣就可以及時的釋放worker計算資源
- 有基于Remote shuffle的實作,在云計算時代逐漸成為主流,因為其存算分離的架構往往能帶來更好的可擴展性并且網路帶寬的提高使得
co-locate_也許_不再那么重要,
Spark Shuffle實作

這里再大致介紹下spark原生的external sort shuffle的詳細流程
- 每個spark executor啟動后和本地節點的external shuffle service注冊,同一個機器的多個executor會共享這個機器上的shuffle service服務,
- map stage處理完資料之后會產出兩個檔案 shuffle data 和 index檔案,map task會按照partition key 來進行排序,屬于同一個reduce 的資料作為一個Shuffle Block,而index檔案中則會記錄不同的Shuffle Block 之間的邊界offset,輔助下游讀取
- 當下游reduce task開始運行,首先會查詢Spark driver 得到input shuffle blocks的位置資訊,然后開始和spark ESS建立鏈接開始讀取資料,讀取資料時就會根據index檔案來skip讀取自己task那個shuffle blocks
痛點
在LinkedIn公司主要采用了Spark自帶的基于Yarn的External sorted shuffle實作,主要遇到痛點:
All-To-All Connections
map 和 reduce task之間需要維護all-to-all 的鏈接,以M個Map端task,R和Reducer端task為例,理論上就會建立M * R 個connection,
在實際實作中,一個executor上的reducer可以共享一個和ess的tcp鏈接,因此實際上的鏈接數是和executor個數 E 和ess節點數 S相關,但是在生產集群中 E 和 S 可能都會達到上千,這時鏈接數就會非常的客觀,很容易帶來穩定性的問題,如果建立鏈接失敗可能會導致相關stage進行重跑,失敗代價很高,
Random IO
從上面的讀取流程我們可以看到因為多個reduce task資料在同一個檔案中,很容易產生隨機讀取的問題,并且從linkedin公司觀察到的這些block通常都比較小,平均只有10KB,而LinkedIn shuffle集群主要使用的HDD磁盤,這個問題就會更大,并且隨機讀取以及大量的網路小包會帶來性能的損失,
也許我們會想到說是否可以有辦法來通過調參來讓Shuffle Block 變大而減輕隨機小IO的問題呢?比如把reduce task端的并發調小,這樣每個task的資料量必然就變大了,
論文中也對此做了闡述,沒法通過簡單的調整reduce task的并發來增大shuffle block size的大小,
假設有一個M個mapper,R個reducer的任務,總的shuffle資料量為D,為了保持每個task處理的資料量恒定,當總資料量增長的時候,map和reduce的并發都要等比增長,
而shuffle block 大小就是 , 為什么 是
呢,從上面的流程中可以看到每個map端可以近似看做是維護了R個reduce的block,所以總的block數是
,
那么當資料量增長時,并且為了保證每個task處理的資料量恒定,即性能不下降,那么shuffle block size必然會減小,最后也因為reduce端資料分散在所有的map端的task,導致不太能利用data locality的特性,
Magent 設計概要

總體架構
Push Merge Shuffle
Mapper 端的shuffle資料會push到遠程的 shuffle service,并按照reduce端合并成一個檔案,這樣shuffle 檔案的大小就可以提高到MB級別,
這里Magnet主要考慮盡可能避免給shuffle service帶來過大的壓力(為了穩定性和可擴展性考慮),因此在Magent中,在mapper端,依然會將shuffle資料,首先保存到本地,然后再按照以下的演算法,將shuffle blocks打包成一個個chunks發送到shuffle service,

計算blocks劃分到chunks演算法
這個演算法的含義如下:
- 按照
計算 第 i 個 reduce 資料所應該發送的shuffle service的下標,
表示每臺shuffle service機器所需要分配的Reduce task的數量,當其大于 k 時表示需要發送到下一個機器,則更新 k 的值為
k++ - 當chunk長度沒有超過限制L,將
(長度為
)append到chunk中,并將chunk長度更新為
- 當chunk長度超過了限制L,那么就把
append 到 下一個 chunk中,并將chunk 長度置為
, shuffe service 機器還是為 k,
演算法最終輸出的是每個 shuffle service 機器和對應的所需要接收的chunk的集合,
這個演算法保證,每個chunks只包含一個shuffle file中連續的不同shuffle partition 的 shuffle blocks,當達到一定大小后會另外創建一個chunk,但是不同mapper上的同一個shuffle parititon的資料最侄訓路由到同一個shuffle service節點上,
并且為了避免同時mapper端都按照同一順序往shuffle service 節點寫資料造成擠兌和merge時的檔案并發鎖,所以在mapper端處理chunk的順序上做了隨機化,
在完成打包chunk和隨機化之后,就交由一個專門的執行緒池來將資料從按照chunk順序從本地磁盤load出來,所以這里就是順序的讀取本地磁盤再push到遠程的shuffle service,Push操作是和Mapper端的task解耦的,push操作失敗不會影響map端的task,
Magnet Metadata
當magnet收到打包發送來的chunks,首先會根據block的元資料獲取他的磁區資訊,然后根據shuffle service本地維護的元資料做處理,shuffle service本地為每個Shuffle partition (reduce partition)維護了以下元資訊
- bitmap 存盤了以及merge的mapper的id
- position offset 記錄了merge 檔案中最近一次成功merge的 offset
- currentMapId 記錄了當前正在merge的 mapper的 shuffle block id

這樣首先可以根據發送來的shuffle blocks的元資料判斷資料是否已經merge過了,避免重復存盤,通過currentMapId來避免多個mapper端資料同時往一個檔案merge的問題,而position offset 則可以用作在merge 失敗的時候可以依舊保持檔案能讀到最近一次成功的位置,下一次重寫的時候會依舊從position offset進行覆寫寫入,通過這幾個元資料管理,就可以很優雅的處理在檔案merge程序中的寫重復,寫沖突和寫失敗的問題,
Best effort
在Magent的設計中,push/merge的失敗,并不會影響整個任務的流程,可以fallback到讀取mapper端未merge的資料,
- 如果map task 在寫入本地shuffle資料完成之前失敗了,那么map端task會進行重跑
- 如果map端push/merge失敗,那么這部分資料就會直接從mapper端讀取
- 如果reduce fetch merge block失敗,那么也會fallback到從mapper端讀取
我理解要實作這樣的目的,原始資料就需要被保留,所以可以看到在架構圖中Magent Shuffle Service實際上會和executor一起部署(還支持其他的部署形式),在executor端作為external shuffle service的角色存在,mapper端的資料產出完之后就由本地的shuffle service 節點托管了,所以他可以在以上2、3兩種失敗場景下提供fallback的讀取能力,
同時資料是否Merge完的資訊是在Spark Driver中通過MapStatus和 MergeStatus兩個結構來進行維護的,下游讀取資料時就是由driver來進行是否fallback的邏輯,
從整體上看Push/Merge 的操作可以理解為完全由Magent Shuffle Service節點托管的資料搬遷合并的動作(將各個mapper處的資料搬遷合并成redcuer端的資料),通過資料寫兩次的行為使得mapper端寫資料和合并解耦,并且在fault tolerance的設計中也利用了寫兩次這個行為所帶來的備份的好處,
同時我們需要關注到雖然通過這個操作,將mapper端的隨機讀取轉化成了順序讀取,但是在shuffle service時merge時,其實還是random write,這在資料重組的程序中是必然的,但是由于os cache 和 disk buffer的存在,會使得random write的吞吐比random read的吞吐大很多,
Flexible Deployment Strategy
Magnet支持兩種模式的部署
- on-perm 表示和Spark計算集群一起部署,作為external shuffle service的方式存在,
- cloud-based 表示以存算分離的模式部署,這樣就是以Remote shuffle service的方式部署,
在on-perm的集群中,Spark driver可以很好的利用data locality的特性,在push/merge節點結束后,可以將reduce task盡可能調度到資料所在的節點上,可以直接讀取本地資料,效率更高,減少了網路的傳輸也不容易失敗,
Handling Stragglers and Data Skews

因為Spark計算引擎是BSP模型,所以在map端階段全部完成之前reduce端不會開始計算,因此在Push/Megre階段,為了防止部分Push/Merge較慢影響下游reduce task開始執行,Magnet支持了最大的超時機制,利用上面提到的fallback行為,在超時之后就標記該map端的磁區為unmerged,這樣就跳過了這部分慢節點,直接開始reduce階段,
而針對資料傾斜場景,為了避免reduce端合并的檔案過大,這時Magent的解法是和Spark的Adaptive execution 相結合,根據運行時采集到的每個block的大小,當block 大于某個閾值時,就在合并chunk的階段跳過這種block,還是通過fallback行為直接讀取原來mapper端較大的資料塊
Parallelizing Data Transfer and Task Execution

在Hadoop的Map-Reduce模型中,通過 "Slow start" 技術可以在Map task都完成之前,部分Reduce task可以先開始進行資料預拉,實作了比較有限的并行化
而在Spark中,通過資料拉取和資料處理的執行緒解耦,這兩者有點類似于一組生產者和消費者,
而在Magnet中也采用了類似的技術,在mapper端Push task 和 mapper task解耦,但是這里不太理解這個mapper端解耦的收益,因為本身就是在mapper task結束之后才開始進行push task,也就不存在計算執行緒和io執行緒并行的說法,可以理解的是可以通過這個方式和mapper task的框架執行緒解耦,
然后在reduce端,為了最大化并行讀取的能力,不會將reduce端的資料只合并成一個檔案,而是切成多個MB大小的slice,然后reduce task可以發起并行讀取的請求最大化的提高吞吐,
小結
從上面可以看出Magent的幾個設計宗旨
- 盡可能的避免給shuffle service 增大負載
- 所有的排序的動作只會發生在mapper端或者reducer端,所以排序占用的資源是executor節點的
- merge時不會有資料buffer的動作,資料buffer在executor端完成,在Shuffle Service側只要直接進行資料appen,
- 盡力而為,資料備份讀取提供更好的容錯特性,并很好的利用了這兩份資料做了更多的設計
- 盡管如今普遍都是存算分離的架構,但是在Magent的設計中data locality的特性還是占據的很重要的位置
How to evaluate
很多系統設計最后對于系統的測驗設計其實也很有看點,在論文里提到了Magent采用了模擬和生產集群兩個模式來最終衡量新的Shuffle Service的效果,
Magnet 開發了一個分布式的壓測框架,主要可以模擬以下幾個維度
- 模擬shuffle service集群所會創建的總的連接數
- 每個block塊的大小
- 總的shuffle的資料量
并且可以模擬fetch和push的請求
- fetch請求會從一個Shuffle serice節點將block發送到多個客戶端
- push請求會從多個客戶端將資料發送到一個shuffle service節點
那衡量的指標有哪些
- 在不同的block大小下, Magnet完成Push Merge和Reduce fetch的時間已經Spark 原生Shuffle Service完成fetch的時間比較
- Disk IO 衡量在fetch 和 push的場景下,不同的block大小對于磁盤吞吐能力的影響
- Shuffle Service的資源開銷 主要是測驗單機的shuffle service,這里看到一個比較驚奇的資料,在測驗的程序中的資源消耗為0.5c 300M,開銷的確很小,
其他的指標資料就不一一列舉了,可以查看原文相關章節獲取
最后上線后的優化效果

Figure 1: Shuffle locality ratio increase over past 6 months

參考
https://mp.weixin.qq.com/s/8Fhn24vbZdt6zmCZRvhdOg Magent shuffle 解讀
https://zhuanlan.zhihu.com/p/397391514 Magnet shuffle解讀
https://zhuanlan.zhihu.com/p/67061627 spark shuffle 發展
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://www.databricks.com/session_na21/magnet-shuffle-service-push-based-shuffle-at-linkedin
https://issues.apache.org/jira/browse/SPARK-30602
https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen
本文來自博客園,作者:Aitozi,轉載請注明原文鏈接:https://www.cnblogs.com/Aitozi/p/16813183.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/518861.html
標籤:其他
