主頁 >  其他 > Magnet: Push-based Shuffle Service for Large-scale Data Processing

Magnet: Push-based Shuffle Service for Large-scale Data Processing

2022-10-22 07:59:45 其他

本文是閱讀 LinkedIn 公司2020年發表的論文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 一點筆記,

什么是Shuffle

image.png
以上圖為例,在一個DAG的執行圖中,節點與節點之間的資料交換就是Shuffle的程序,雖然Shuffle的程序很簡單,但是不同的引擎有不同的實作,
以shuffle資料傳輸的介質來看

  • 有基于磁盤的shuffle,例如Map/Reduce ,Spark,Flink Batch中,上下游之前的資料都是需要落盤后來進行傳輸,這類通常是離線處理框架,對延遲不敏感,基于磁盤更加可靠穩定,
  • 有基于記憶體的pipeline模式的shuffle方案,例如Presto/Flink Streaming中,主要是對時延比較敏感的場景,基于記憶體Shuffle,通過網路rpc直接傳輸記憶體資料

而基于本地磁盤的Shuffle實作中又有很多種不同的實作

  • 有基于Hash的方案,每個map端的task為每個reduce task 產生一個 shuffle檔案
  • 有基于Sort方案,每個map端的task按照 partitionId + hash(key) 排序,并最終merge成一個檔案以及一個index檔案,在reduce端讀取時根據每個task的index檔案來讀取相應segment的資料

以部署方式來看

  • 有基于worker的本地shuffle的方案,直接通過worker來提供讀寫的功能
  • 有基于external shuffle的實作,通常托管于資源管理框架,在Yarn框架中就可以實作這種輔助服務,這樣就可以及時的釋放worker計算資源
  • 有基于Remote shuffle的實作,在云計算時代逐漸成為主流,因為其存算分離的架構往往能帶來更好的可擴展性并且網路帶寬的提高使得co-locate_也許_不再那么重要,

Spark Shuffle實作

image.png
這里再大致介紹下spark原生的external sort shuffle的詳細流程

  1. 每個spark executor啟動后和本地節點的external shuffle service注冊,同一個機器的多個executor會共享這個機器上的shuffle service服務,
  2. map stage處理完資料之后會產出兩個檔案 shuffle data 和 index檔案,map task會按照partition key 來進行排序,屬于同一個reduce 的資料作為一個Shuffle Block,而index檔案中則會記錄不同的Shuffle Block 之間的邊界offset,輔助下游讀取
  3. 當下游reduce task開始運行,首先會查詢Spark driver 得到input shuffle blocks的位置資訊,然后開始和spark ESS建立鏈接開始讀取資料,讀取資料時就會根據index檔案來skip讀取自己task那個shuffle blocks

痛點

在LinkedIn公司主要采用了Spark自帶的基于Yarn的External sorted shuffle實作,主要遇到痛點:

All-To-All Connections

map 和 reduce task之間需要維護all-to-all 的鏈接,以M個Map端task,R和Reducer端task為例,理論上就會建立M * R 個connection,
在實際實作中,一個executor上的reducer可以共享一個和ess的tcp鏈接,因此實際上的鏈接數是和executor個數 E 和ess節點數 S相關,但是在生產集群中 E 和 S 可能都會達到上千,這時鏈接數就會非常的客觀,很容易帶來穩定性的問題,如果建立鏈接失敗可能會導致相關stage進行重跑,失敗代價很高,

Random IO

從上面的讀取流程我們可以看到因為多個reduce task資料在同一個檔案中,很容易產生隨機讀取的問題,并且從linkedin公司觀察到的這些block通常都比較小,平均只有10KB,而LinkedIn shuffle集群主要使用的HDD磁盤,這個問題就會更大,并且隨機讀取以及大量的網路小包會帶來性能的損失,

也許我們會想到說是否可以有辦法來通過調參來讓Shuffle Block 變大而減輕隨機小IO的問題呢?比如把reduce task端的并發調小,這樣每個task的資料量必然就變大了,
論文中也對此做了闡述,沒法通過簡單的調整reduce task的并發來增大shuffle block size的大小,

假設有一個M個mapper,R個reducer的任務,總的shuffle資料量為D,為了保持每個task處理的資料量恒定,當總資料量增長的時候,map和reduce的并發都要等比增長,
而shuffle block 大小就是 , 為什么 是 呢,從上面的流程中可以看到每個map端可以近似看做是維護了R個reduce的block,所以總的block數是
那么當資料量增長時,并且為了保證每個task處理的資料量恒定,即性能不下降,那么shuffle block size必然會減小,最后也因為reduce端資料分散在所有的map端的task,導致不太能利用data locality的特性,

Magent 設計概要

image.png
總體架構

Push Merge Shuffle

Mapper 端的shuffle資料會push到遠程的 shuffle service,并按照reduce端合并成一個檔案,這樣shuffle 檔案的大小就可以提高到MB級別,
這里Magnet主要考慮盡可能避免給shuffle service帶來過大的壓力(為了穩定性和可擴展性考慮),因此在Magent中,在mapper端,依然會將shuffle資料,首先保存到本地,然后再按照以下的演算法,將shuffle blocks打包成一個個chunks發送到shuffle service,
image.png
計算blocks劃分到chunks演算法
這個演算法的含義如下:

  1. 按照 計算 第 i 個 reduce 資料所應該發送的shuffle service的下標,表示每臺shuffle service機器所需要分配的Reduce task的數量,當其大于 k 時表示需要發送到下一個機器,則更新 k 的值為 k++
  2. 當chunk長度沒有超過限制L,將(長度為 )append到chunk中,并將chunk長度更新為
  3. 當chunk長度超過了限制L,那么就把 append 到 下一個 chunk中,并將chunk 長度置為 , shuffe service 機器還是為 k,

演算法最終輸出的是每個 shuffle service 機器和對應的所需要接收的chunk的集合,

這個演算法保證,每個chunks只包含一個shuffle file中連續的不同shuffle partition 的 shuffle blocks,當達到一定大小后會另外創建一個chunk,但是不同mapper上的同一個shuffle parititon的資料最侄訓路由到同一個shuffle service節點上,

并且為了避免同時mapper端都按照同一順序往shuffle service 節點寫資料造成擠兌和merge時的檔案并發鎖,所以在mapper端處理chunk的順序上做了隨機化,

在完成打包chunk和隨機化之后,就交由一個專門的執行緒池來將資料從按照chunk順序從本地磁盤load出來,所以這里就是順序的讀取本地磁盤再push到遠程的shuffle service,Push操作是和Mapper端的task解耦的,push操作失敗不會影響map端的task,

Magnet Metadata

當magnet收到打包發送來的chunks,首先會根據block的元資料獲取他的磁區資訊,然后根據shuffle service本地維護的元資料做處理,shuffle service本地為每個Shuffle partition (reduce partition)維護了以下元資訊

  • bitmap 存盤了以及merge的mapper的id
  • position offset 記錄了merge 檔案中最近一次成功merge的 offset
  • currentMapId 記錄了當前正在merge的 mapper的 shuffle block id

image.png

這樣首先可以根據發送來的shuffle blocks的元資料判斷資料是否已經merge過了,避免重復存盤,通過currentMapId來避免多個mapper端資料同時往一個檔案merge的問題,而position offset 則可以用作在merge 失敗的時候可以依舊保持檔案能讀到最近一次成功的位置,下一次重寫的時候會依舊從position offset進行覆寫寫入,通過這幾個元資料管理,就可以很優雅的處理在檔案merge程序中的寫重復,寫沖突和寫失敗的問題,

Best effort

在Magent的設計中,push/merge的失敗,并不會影響整個任務的流程,可以fallback到讀取mapper端未merge的資料,

  1. 如果map task 在寫入本地shuffle資料完成之前失敗了,那么map端task會進行重跑
  2. 如果map端push/merge失敗,那么這部分資料就會直接從mapper端讀取
  3. 如果reduce fetch merge block失敗,那么也會fallback到從mapper端讀取

我理解要實作這樣的目的,原始資料就需要被保留,所以可以看到在架構圖中Magent Shuffle Service實際上會和executor一起部署(還支持其他的部署形式),在executor端作為external shuffle service的角色存在,mapper端的資料產出完之后就由本地的shuffle service 節點托管了,所以他可以在以上2、3兩種失敗場景下提供fallback的讀取能力,
同時資料是否Merge完的資訊是在Spark Driver中通過MapStatusMergeStatus兩個結構來進行維護的,下游讀取資料時就是由driver來進行是否fallback的邏輯,
從整體上看Push/Merge 的操作可以理解為完全由Magent Shuffle Service節點托管的資料搬遷合并的動作(將各個mapper處的資料搬遷合并成redcuer端的資料),通過資料寫兩次的行為使得mapper端寫資料和合并解耦,并且在fault tolerance的設計中也利用了寫兩次這個行為所帶來的備份的好處,
同時我們需要關注到雖然通過這個操作,將mapper端的隨機讀取轉化成了順序讀取,但是在shuffle service時merge時,其實還是random write,這在資料重組的程序中是必然的,但是由于os cache 和 disk buffer的存在,會使得random write的吞吐比random read的吞吐大很多,

Flexible Deployment Strategy

Magnet支持兩種模式的部署

  • on-perm 表示和Spark計算集群一起部署,作為external shuffle service的方式存在,
  • cloud-based 表示以存算分離的模式部署,這樣就是以Remote shuffle service的方式部署,

在on-perm的集群中,Spark driver可以很好的利用data locality的特性,在push/merge節點結束后,可以將reduce task盡可能調度到資料所在的節點上,可以直接讀取本地資料,效率更高,減少了網路的傳輸也不容易失敗,

Handling Stragglers and Data Skews

image.png
因為Spark計算引擎是BSP模型,所以在map端階段全部完成之前reduce端不會開始計算,因此在Push/Megre階段,為了防止部分Push/Merge較慢影響下游reduce task開始執行,Magnet支持了最大的超時機制,利用上面提到的fallback行為,在超時之后就標記該map端的磁區為unmerged,這樣就跳過了這部分慢節點,直接開始reduce階段,
而針對資料傾斜場景,為了避免reduce端合并的檔案過大,這時Magent的解法是和Spark的Adaptive execution 相結合,根據運行時采集到的每個block的大小,當block 大于某個閾值時,就在合并chunk的階段跳過這種block,還是通過fallback行為直接讀取原來mapper端較大的資料塊

Parallelizing Data Transfer and Task Execution

image.png
在Hadoop的Map-Reduce模型中,通過 "Slow start" 技術可以在Map task都完成之前,部分Reduce task可以先開始進行資料預拉,實作了比較有限的并行化
而在Spark中,通過資料拉取和資料處理的執行緒解耦,這兩者有點類似于一組生產者和消費者,
而在Magnet中也采用了類似的技術,在mapper端Push task 和 mapper task解耦,但是這里不太理解這個mapper端解耦的收益,因為本身就是在mapper task結束之后才開始進行push task,也就不存在計算執行緒和io執行緒并行的說法,可以理解的是可以通過這個方式和mapper task的框架執行緒解耦,
然后在reduce端,為了最大化并行讀取的能力,不會將reduce端的資料只合并成一個檔案,而是切成多個MB大小的slice,然后reduce task可以發起并行讀取的請求最大化的提高吞吐,

小結

從上面可以看出Magent的幾個設計宗旨

  • 盡可能的避免給shuffle service 增大負載
    • 所有的排序的動作只會發生在mapper端或者reducer端,所以排序占用的資源是executor節點的
    • merge時不會有資料buffer的動作,資料buffer在executor端完成,在Shuffle Service側只要直接進行資料appen,
  • 盡力而為,資料備份讀取提供更好的容錯特性,并很好的利用了這兩份資料做了更多的設計
  • 盡管如今普遍都是存算分離的架構,但是在Magent的設計中data locality的特性還是占據的很重要的位置

How to evaluate

很多系統設計最后對于系統的測驗設計其實也很有看點,在論文里提到了Magent采用了模擬和生產集群兩個模式來最終衡量新的Shuffle Service的效果,

Magnet 開發了一個分布式的壓測框架,主要可以模擬以下幾個維度

  • 模擬shuffle service集群所會創建的總的連接數
  • 每個block塊的大小
  • 總的shuffle的資料量

并且可以模擬fetch和push的請求

  • fetch請求會從一個Shuffle serice節點將block發送到多個客戶端
  • push請求會從多個客戶端將資料發送到一個shuffle service節點

那衡量的指標有哪些

  • 在不同的block大小下, Magnet完成Push Merge和Reduce fetch的時間已經Spark 原生Shuffle Service完成fetch的時間比較
  • Disk IO 衡量在fetch 和 push的場景下,不同的block大小對于磁盤吞吐能力的影響
  • Shuffle Service的資源開銷 主要是測驗單機的shuffle service,這里看到一個比較驚奇的資料,在測驗的程序中的資源消耗為0.5c 300M,開銷的確很小,

其他的指標資料就不一一列舉了,可以查看原文相關章節獲取

最后上線后的優化效果
image.png
Figure 1: Shuffle locality ratio increase over past 6 months

image.png

參考

https://mp.weixin.qq.com/s/8Fhn24vbZdt6zmCZRvhdOg Magent shuffle 解讀
https://zhuanlan.zhihu.com/p/397391514 Magnet shuffle解讀
https://zhuanlan.zhihu.com/p/67061627 spark shuffle 發展
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://www.databricks.com/session_na21/magnet-shuffle-service-push-based-shuffle-at-linkedin
https://issues.apache.org/jira/browse/SPARK-30602
https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen

本文來自博客園,作者:Aitozi,轉載請注明原文鏈接:https://www.cnblogs.com/Aitozi/p/16813183.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/518861.html

標籤:其他

上一篇:reportportal 集成 robotframework 自動化執行及結果可視化

下一篇:微服務組件--限流框架Spring Cloud Hystrix分析

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more