1. 前言
計算機的基本作業就是處理資料,包括磁盤檔案中的資料,通過網路傳輸的資料流或資料包,資料庫中的結構化資料等,隨著互聯網、物聯網等技術得到越來越廣泛的應用,資料規模不斷增加,TB、PB量級成為常態,對資料的處理已無法由單臺計算機完成,而只能由多臺機器共同承擔計算任務,而在分布式環境中進行大資料處理,除了與存盤系統打交道外,還涉及計算任務的分工,計算負荷的分配,計算機之間的資料遷移等作業,并且要考慮計算機或網路發生故障時的資料安全,情況要復雜得多,
舉一個簡單的例子,假設我們要從銷售記錄中統計各種商品銷售額,在單機環境中,我們只需把銷售記錄掃描一遍,對各商品的銷售額進行累加即可,如果銷售記錄存放在關系資料庫中,則更省事,執行一個SQL陳述句就可以了,現在假定銷售記錄實在太多,需要設計出由多臺計算機來統計銷售額的方案,為保證計算的正確、可靠、高效及方便,這個方案需要考慮下列問題:
- 如何為每臺機器分配任務,是先按商品種類對銷售記錄分組,不同機器處理不同商品種類的銷售記錄,還是隨機向各臺機器分發一部分銷售記錄進行統計,最后把各臺機器的統計結果按商品種類合并?
- 上述兩種方式都涉及資料的排序問題,應選擇哪種排序演算法?應該在哪臺機器上執行排序程序?
- 如何定義每臺機器處理的資料從哪里來,處理結果到哪里去?資料是主動發送,還是接收方申請時才發送?如果是主動發送,接收方處理不過來怎么辦?如果是申請時才發送,那發送方應該保存資料多久?
- 會不會任務分配不均,有的機器很快就處理完了,有的機器一直忙著?甚至,閑著的機器需要等忙著的機器處理完后才能開始執行?
- 如果增加一臺機器,它能不能減輕其他機器的負荷,從而縮短任務執行時間?
- 如果一臺機器掛了,它沒有完成的任務該交給誰?會不會遺漏統計或重復統計?
- 統計程序中,機器之間如何協調,是否需要專門的一臺機器指揮調度其他機器?如果這臺機器掛了呢?
- (可選)如果銷售記錄在源源不斷地增加,統計還沒執行完新記錄又來了,如何保證統計結果的準確性?能不能保證結果是實時更新的?再次統計時能不能避免大量重復計算?
- (可選)能不能讓用戶執行一句SQL就可以得到結果?
上述問題中,除了第1個外,其余的都與具體任務無關,在其他分布式計算的場合也會遇到,而且解決起來都相當棘手,即使第1個問題中的分組、統計,在很多資料處理場合也會涉及,只是具體方式不同,如果能把這些問題的解決方案封裝到一個計算框架中,則可大大簡化這類應用程式的開發,
2004年前后,Google先后發表三篇論文分別介紹分布式檔案系統GFS、并行計算模型MapReduce、非關系資料存盤系統BigTable,第一次提出了針對大資料分布式處理的可重用方案,在Google論文的啟發下,Yahoo的工程師Doug Cutting和Mike Cafarella開發了Hadoop,在借鑒和改進Hadoop的基礎上,又先后誕生了數十種應用于分布式環境的大資料計算框架,本文在參考業界慣例的基礎上,對這些框架按下列標準分類:
- 如果不涉及上面提出的第8、9兩個問題,則屬于批處理框架,批處理框架重點關心資料處理的吞吐量,又可分為非迭代式和迭代式兩類,迭代式包括DAG(有向無環圖)、圖計算等模型,
- 若針對第8個問題提出來應對方案,則分兩種情況:如果重點關心處理的實時性,則屬于流計算框架;如果側重于避免重復計算,則屬于增量計算框架,
- 如果重點關注的是第9個問題,則屬于互動式分析框架,
本文下面分別討論批處理、流計算、互動式分析三種類別的框架,然后簡要介紹大資料計算框架的一些發展趨勢,文章最后介紹這一領域的學習資料,

圖1. 大資料計算框架全景圖
2. 批處理框架
2.1. Hadoop
Hadoop最初主要包含分布式檔案系統HDFS和計算框架MapReduce兩部分,是從Nutch中獨立出來的專案,在2.0版本中,又把資源管理和任務調度功能從MapReduce中剝離形成YARN,使其他框架也可以像MapReduce那樣運行在Hadoop之上,與之前的分布式計算框架相比,Hadoop隱藏了很多繁瑣的細節,如容錯、負載均衡等,更便于使用,
Hadoop也具有很強的橫向擴展能力,可以很容易地把新計算機接入到集群中參與計算,在開源社區的支持下,Hadoop不斷發展完善,并集成了眾多優秀的產品如非關系資料庫HBase、資料倉庫Hive、資料處理工具Sqoop、機器學習演算法庫Mahout、一致性服務軟體ZooKeeper、管理工具Ambari等,形成了相對完整的生態圈和分布式計算事實上的標準,

圖2. Hadoop生態圈(刪減版)
MapReduce可以理解為把一堆雜亂無章的資料按照某種特征歸并起來,然后處理并得到最后的結果,基本處理步驟如下:
- 把輸入檔案按照一定的標準分片,每個分片對應一個map任務,一般情況下,MapReduce和HDFS運行在同一組計算機上,也就是說,每臺計算機同時承擔存盤和計算任務,因此分片通常不涉及計算機之間的資料復制,
- 按照一定的規則把分片中的內容決議成鍵值對,通常選擇一種預定義的規則即可,
- 執行map任務,處理每個鍵值對,輸出零個或多個鍵值對,
- MapReduce獲取應用程式定義的分組方式,并按分組對map任務輸出的鍵值對排序,默認每個鍵名一組,
- 待所有節點都執行完上述步驟后,MapReduce啟動Reduce任務,每個分組對應一個Reduce任務,
- 執行reduce任務的行程通過網路獲取指定組的所有鍵值對,
- 把鍵名相同的值合并為串列,
- 執行reduce任務,處理每個鍵對應的串列,輸出結果,

圖3. MapReduce處理程序
在上面的步驟中,應用程式主要負責設計map和reduce任務,其他作業均由框架負責,在定義map任務輸出資料的方式時,鍵的選擇至關重要,除了影響結果的正確性外,也決定資料如何分組、排序、傳輸,以及執行reduce任務的計算機如何分工,前面提到的商品銷售統計的例子,可選擇商品種類為鍵,MapReduce執行商品銷售統計的程序大致如下:
- 把銷售記錄分片,分配給多臺機器,
- 每條銷售記錄被決議成鍵值對,其中值為銷售記錄的內容,鍵可忽略,
- 執行map任務,每條銷售記錄被轉換為新的鍵值對,其中鍵為商品種類,值為該條記錄中商品的銷售額,
- MapReduce把map任務生成的資料按商品種類排序,
- 待所有節點都完成排序后,MapReduce啟動reduce任務,每個商品種類對應一個reduce任務,
- 執行reduce任務的行程通過網路獲取指定商品種類的各次銷售額,
- MapReduce把同一種商品下的各次銷售額合并到串列中,
- 執行reduce任務,累加各次銷售額,得到該種商品的總銷售額,
上面的程序還有優化的空間,在傳輸各種商品每次的銷售額資料前,可先在map端對各種商品的銷售額進行小計,由此可大大減少網路傳輸的負荷,MapReduce通過一個可選的combine任務支持該型別的優化,
2.2. DAG模型
現在假設我們的目標更進一步,希望知道銷售得最好的前10種商品,我們可以分兩個環節來計算:
- 統計各種商品的銷售額,通過MapReduce實作,這在前面已經討論過,
- 對商品種類按銷售額排名,可以通過一個排序程序完成,假定商品種類非常多,需要通過多臺計算機來加快計算速度的話,我們可以用另一個MapReduce程序來實作,其基本思路是把map和reduce分別當作小組賽和決賽,先計算各分片的前10名,匯總后再計算總排行榜的前10名,
從上面的例子可以看出,通過多個MapReduce的組合,可以表達復雜的計算問題,不過,組合程序需要人工設計,比較麻煩,另外,每個階段都需要所有的計算機同步,影響了執行效率,
為克服上述問題,業界提出了DAG(有向無環圖)計算模型,其核心思想是把任務在內部分解為若干存在先后順序的子任務,由此可更靈活地表達各種復雜的依賴關系,Microsoft Dryad、Google FlumeJava、Apache Tez是最早出現的DAG模型,Dryad定義了串接、全連接、融合等若干簡單的DAG模型,通過組合這些簡單結構來描述復雜的任務,FlumeJava、Tez則通過組合若干MapReduce形成DAG任務,

圖4. MapReduce(左)與Tez(右)
執行復雜任務時對比
MapReduce的另一個不足之處是使用磁盤存盤中間結果,嚴重影響了系統的性能,這在機器學習等需要迭代計算的場合更為明顯,加州大學伯克利分校AMP實驗室開發的Spark克服了上述問題,Spark對早期的DAG模型作了改進,提出了基于記憶體的分布式存盤抽象模型RDD(Resilient Distributed Datasets,可恢復分布式資料集),把中間資料有選擇地加載并駐留到記憶體中,減少磁盤IO開銷,與Hadoop相比,Spark基于記憶體的運算要快100倍以上,基于磁盤的運算也要快10倍以上,

圖5. MapReduce與Spark中間結果
保存方式對比
Spark為RDD提供了豐富的操作方法,其中map、 filter、 flatMap、 sample、groupByKey、 reduceByKey、union、join、cogroup、mapValues、sort、partionBy用于執行資料轉換,生成新的RDD,而count、collect、 reduce、lookup、save用于收集或輸出計算結果,如前面統計商品銷售額的例子,在Spark中只需要呼叫map和reduceByKey兩個轉換操作就可以實作,整個程式包括加載銷售記錄和保存統計結果在內也只需要寥寥幾行代碼,并且支持Java、Scala、Python、R等多種開發語言,比MapReduce編程要方便得多,下圖說明reduceByKey的內部實作,

圖6. RDD reduceByKey內部實作
RDD由于把資料存放在記憶體中而不是磁盤上,因此需要比Hadoop更多地考慮容錯問題,分布式資料集的容錯有兩種方式:資料檢查點和記錄資料的更新,處理海量資料時,資料檢查點操作成本很高, 因此Spark默認選擇記錄更新的方式,不過如果更新粒度太細太多,記錄更新成本也不低,因此,RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操作,然后將創建RDD的一系列變換序列記錄下來,類似于資料庫中的日志,
當RDD的部分磁區資料丟失時,Spark根據之前記錄的演變程序重新運算,恢復丟失的資料磁區,Spark生態圈的另一專案Alluxio(原名Tachyon)也采用類似的思路,使資料寫入速度比HDFS有數量級的提升,
下面總結Spark對MapReduce的改進:
- MapReduce抽象層次低,需要手工撰寫代碼完成;Spark基于RDD抽象,使資料處理邏輯的代碼非常簡短,
- MapReduce只提供了map和reduce兩個操作,表達力欠缺;Spark提供了很多轉換和動作,很多關系資料庫中常見的操作如JOIN、GROUP BY已經在RDD中實作,
- MapReduce中,只有map和reduce兩個階段,復雜的計算需要大量的組合,并且由開發者自己定義組合方式;Spark中,RDD可以連續執行多個轉換操作,如果這些操作對應的RDD磁區不變的話,還可以放在同一個任務中執行,
- MapReduce處理邏輯隱藏在代碼中,不直觀;Spark代碼不包含操作細節,邏輯更清晰,
- MapReduce中間結果放在HDFS中;Spark中間結果放在記憶體中,記憶體放不下時才寫入本地磁盤而不是HDFS,這顯著提高了性能,特別是在迭代式資料處理的場合,
- MapReduce中,reduce任務需要等待所有map任務完成后才可以開始;在Spark中,磁區相同的轉換構成流水線放到同一個任務中運行,
3. 流計算框架
3.1. 流計算概述
在大資料時代,資料通常都是持續不斷動態產生的,在很多場合,資料需要在非常短的時間內得到處理,并且還要考慮容錯、擁塞控制等問題,避免資料遺漏或重復計算,流計算框架則是針對這一類問題的解決方案,流計算框架一般采用DAG(有向無環圖)模型,圖中的節點分為兩類:一類是資料的輸入節點,負責與外界互動而向系統提供資料;另一類是資料的計算節點,負責完成某種處理功能如過濾、累加、合并等,從外部系統不斷傳入的實時資料則流經這些節點,把它們串接起來,如果把資料流比作水的話,輸入節點好比是噴頭,源源不斷地出水,計算節點則相當于水管的轉介面,如下圖所示,

圖7. 流計算DAG模型示意圖
為提高并發性,每一個計算節點對應的資料處理功能被分配到多個任務(相同或不同計算機上的執行緒),在設計DAG時,需要考慮如何把待處理的資料分發到下游計算節點對應的各個任務,這在實時計算中稱為分組(Grouping),最簡單的方案是為每個任務復制一份,不過這樣效率很低,更好的方式是每個任務處理資料的不同部分,隨機分組能達到負載均衡的效果,應優先考慮,不過在執行累加、資料關聯等操作時,需要保證同一屬性的資料被固定分發到對應的任務,這時應采用定向分組,在某些情況下,還需要自定義分組方案,

圖8. 流計算分組
由于應用場合的廣泛性,目前市面上已經有不少流計算平臺,包括Google MillWheel、Twitter Heron和Apache專案Storm、Samza、S4、Flink、Apex、Gearpump,
3.2. Storm及Trident
在流計算框架中,目前人氣最高,應用最廣泛的要數Storm,這是由于Storm具有簡單的編程模型,且支持Java、Ruby、Python等多種開發語言,Storm也具有良好的性能,在多節點集群上每秒可以處理上百萬條訊息,Storm在容錯方面也設計得很優雅,下面介紹Storm確保訊息可靠性的思路,
在DAG模型中,確保訊息可靠的難點在于,原始資料被當前的計算節點成功處理后,還不能被丟棄,因為它生成的資料仍然可能在后續的計算節點上處理失敗,需要由該訊息重新生成,而如果要對訊息在各個計算節點的處理情況都作跟蹤記錄的話,則會消耗大量資源,
Storm的解決思路,是為每條訊息分派一個ID作為唯一性標識,并在訊息中包含原始輸入訊息的ID,同時用一個回應中心(Acker)維護每條原始輸入訊息的狀態,狀態的初值為該原始輸入訊息的ID,每個計算節點成功執行后,則把輸入和輸出訊息的ID進行異或,再異或對應的原始輸入訊息的狀態,由于每條訊息在生成和處理時分別被異或一次,則成功執行后所有訊息均被異或兩次,對應的原始輸入訊息的狀態為0,因此當狀態為0后可安全清除原始輸入訊息的內容,而如果超過指定時間間隔后狀態仍不為0,則認為處理該訊息的某個環節出了問題,需要重新執行,

圖9. Storm保證訊息可靠性程序示意圖
Storm還實作了更高層次的抽象框架Trident,Trident以微批處理的方式處理資料流,比如每次處理100條記錄,Trident提供了過濾、分組、連接、視窗操作、聚合、狀態管理等操作,支持跨批次進行聚合處理,并對執行程序進行優化,包括多個操作的合并、資料傳輸前的本地聚合等,以微批處理方式處理資料流的框架還有Spark Streaming,

(1) 實時流處理

(2) 微批處理
圖10. 實時流處理與微批處理比較
下面是Storm、Trident與另外幾種流計算框架的對比:

4. 互動式分析框架
4.1. 概述
在解決了大資料的可靠存盤和高效計算后,如何為資料分析人員提供便利日益受到關注,而最便利的分析方式莫過于互動式查詢,這幾年互動式分析技術發展迅速,目前這一領域知名的平臺有十余個,包括Google開發的Dremel和PowerDrill,Facebook開發的Presto, Hadoop服務商Cloudera和HortonWorks分別開發的Impala和Stinger,以及Apache專案Hive、Drill、Tajo、Kylin、MRQL等,
一些批處理和流計算平臺如Spark和Flink也分別內置了互動式分析框架,由于SQL已被業界廣泛接受,目前的互動式分析框架都支持用類似SQL的語言進行查詢,早期的互動式分析平臺建立在Hadoop的基礎上,被稱作SQL-on-Hadoop,后來的分析平臺改用Spark、Storm等引擎,不過SQL-on-Hadoop的稱呼還是沿用了下來,SQL-on-Hadoop也指為分布式資料存盤提供SQL查詢功能,
4.2. Hive
Apache Hive是最早出現的架構在Hadoop基礎之上的大規模資料倉庫,由Facebook設計并開源,Hive的基本思想是,通過定義模式資訊,把HDFS中的檔案組織成類似傳統資料庫的存盤系統,Hive 保持著 Hadoop 所提供的可擴展性和靈活性,Hive支持熟悉的關系資料庫概念,比如表、列和磁區,包含對非結構化資料一定程度的 SQL 支持,它支持所有主要的原語型別(如整數、浮點數、字串)和復雜型別(如字典、串列、結構),它還支持使用類似 SQL 的宣告性語言 Hive Query Language (HiveQL) 表達的查詢,任何熟悉 SQL 的人都很容易理解它,HiveQL被編譯為MapReduce程序執行,下圖說明如何通過MapReduce實作JOIN和GROUP BY,

(1) 實作JOIN

(2) 實作GROUP BY
圖11. 部分HiveQL操作的實作方式
Hive與傳統關系資料庫對比如下:

Hive的主要弱點是由于建立在MapReduce的基礎上,性能受到限制,很多互動式分析平臺基于對Hive的改進和擴展,包括Stinger、Presto、Kylin等,其中Kylin是中國團隊提交到Apache上的專案,其與眾不同的地方是提供多維分析(OLAP)能力,Kylin對多維分析可能用到的度量進行預計算,供查詢時直接訪問,由此提供快速查詢和高并發能力,Kylin在eBay、百度、京東、網易、美團均有應用,
4.3. SQL引擎Calcite
對于互動式分析,SQL查詢引擎的優劣對性能的影響舉足輕重,Spark開發了自己的查詢引擎Catalyst,而包括Hive、Drill、Kylin、Flink在內的很多互動式分析平臺及資料倉庫使用Calcite(原名optiq)作為SQL引擎,Calcite是一個Apache范訓專案,其創建者Julian Hyde曾是Oracle資料庫SQL引擎的主要開發者,Calcite具有下列幾個技術特點:
- 支持標準SQL語言,
- 支持OLAP,
- 支持對流資料的查詢,
- 獨立于編程語言和資料源,可以支持不同的前端和后端,
- 支持關系代數、可定制的邏輯規劃規則和基于成本模型優化的查詢引擎,
- 支持物化視圖(materialized view)的管理,
由于分布式場景遠比傳統的資料存盤環境更復雜,Calcite和Catalyst都還處于向Oracle、MySQL等經典關系資料庫引擎學習的階段,在性能優化的道路上還有很長的路要走,
5. 其他型別的框架
除了上面介紹的幾種型別的框架外,還有一些目前還不太熱門但具有重要潛力的框架型別,圖計算是DAG之外的另一種迭代式計算模型,它以圖論為基礎對現實世界建模和計算,擅長表達資料之間的關聯性,適用于PageRank計算、社交網路分析、推薦系統及機器學習,這一類框架有Google Pregel、Apache Giraph、Apache Hama、PowerGraph、,其中PowerGraph是這一領域目前最杰出的代表,很多圖資料庫也內置圖計算框架,
另一類是增量計算框架,探討如何只對部分新增資料進行計算來極大提升計算程序的效率,可應用到資料增量或周期性更新的場合,這一類框架包括Google Percolator、Microsoft Kineograph、阿里Galaxy等,
另外還有像Apache Ignite、Apache Geode(GemFire的開源版本)這樣的高性能事務處理框架,
6. 總結與展望
從Hadoop橫空出世到現在10余年的時間中,大資料分布式計算技術得到了迅猛發展,不過由于歷史尚短,這方面的技術遠未成熟,各種框架都還在不斷改進,并相互競爭,
性能優化毫無疑問是大資料計算框架改進的重點方向之一,而性能的提高很大程度上取決于記憶體的有效利用,這包括前面提到的記憶體計算,現已在各種型別的框架中廣泛采用,記憶體資源的分配管理對性能也有重要影響,JVM垃圾回收在給開發人員帶來便利的同時,也制約了記憶體的有效利用,另外,Java的物件創建及序列化也比較浪費資源,在記憶體優化方面做足功夫的代表是Flink,出于性能方面的考慮,Flink很多組件自行管理記憶體,無需依賴JVM垃圾回識訓制,Flink還用到開辟記憶體池、用二進制資料代替物件、量身定制序列化、定制快取友好的演算法等優化手段,Flink還在任務的執行方面進行優化,包括多階段并行執行和增量迭代,
擁抱機器學習和人工智能也是大資料計算的潮流之一,Spark和Flink分別推出機器學習庫Spark ML和Flink ML,更多的平臺在第三方大資料計算框架上提供機器學習,如Mahout、Oryx及一干Apache范訓專案SystemML、HiveMall、PredictionIO、SAMOA、MADLib,這些機器學習平臺一般都同時支持多個計算框架,如Mahout同時以Spark、Flink、H2O為引擎,SAMOA則使用S4、Storm、Samza,在深度學習掀起熱潮后,又有社區探索把深度學習框架與現有分布式計算框架結合起來,這樣的專案有SparkNet、Caffe on Spark、TensorFrames等,
在同一平臺上支持多種框架也是發展趨勢之一,尤其對于那些開發實力較為雄厚的社區,Spark以批處理模型為核心,實作了互動式分析框架Spark SQL、流計算框架Spark Streaming(及正在實作的Structured Streaming)、圖計算框架GraphX、機器學習庫Spark ML,而Flink在提供低延遲的流計算的同時,批處理、關系計算、圖計算、機器學習,一個也沒落下,目標直奔大資料通用計算平臺,Google的BEAM(意為Batch+strEAM)則試圖把Spark、Flink、Apex這樣的計算框架納入自己制定的標準之下,頗有號令江湖之意,

圖12. BEAM的統一模型
7. 學習資料
最后介紹一下大資料計算方面的學習資料,入門前的了解、知識面的拓展及知識的零散積累靠長期訪問相關的網站、論壇、微信訂閱號,問題解答則靠對搜索引擎的熟練駕馭,需要指出的是,網上的內容良萎不齊,很多資料是過時的,以訛傳訛也是常有的事,要注意鑒別,
論壇首推知乎、Quora、Stack Overflow,運氣好的話開發者親自給你解答,其他值得關注的網站或論壇包括煉數成金、人大經濟論壇、CSDN、博客園、云棲社區、360大資料、推酷、伯樂在線、小象學院等,微信訂閱號中,InfoQ是最權威的,其他還有THU資料派、大資料雜談、CSDN大資料、資料猿、Hadoop技術博文等,各人根據偏好取舍,
若要進行系統的學習,則首先應參考官方網站檔案,不少大資料平臺的官方檔案內容都比較詳實,勝過多數教材,另外,官方檔案與產品通常同步更新,這個優勢是其他資料無法做到的,不過要說可讀性,書籍或視頻教程要強得多,視頻資料可以從上文提到的部分網站論壇下載,
書籍方面,國外O'Reilly、Manning兩家出版社在大資料領域出版了不少優秀書籍,特別是Manning的In Action系列和O'Reilly的Definitive Guide系列,前者側重提高動手能力,后者則知識比較全面,In Action和Definitive Guide系列的書籍很多已翻譯為中文,一般分別譯為xxx實戰、xxx權威指南,另外一家出版社Packt也值得關注,Packt的書比較薄,適合入門,至于中文原創書籍,推薦張俊林的《大資料日知錄》,該書是對大資料存盤和處理技術的全面梳理,系統性強,其他書籍不逐一點評,若想購買或閱讀可參考豆瓣對該書的評分,

圖13. 部分推薦書籍
對希望對大資料框架內部機制有深入的理解的讀者,建議首先檢索相關論文來閱讀,
Google的那幾篇論文這里就不一一列出了,網上很容易搜到,其他推薦的論文如下:

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/48465.html
標籤:大數據
上一篇:Kylin構建Cube程序詳解
