Spark 3.0 為我們帶來了許多令人期待的特性,動態磁區裁剪(dynamic partition pruning)就是其中之一,本文將通過圖文的形式來帶大家理解什么是動態磁區裁剪,
Spark 中的靜態磁區裁剪
在介紹動態磁區裁剪之前,有必要對 Spark 中的靜態磁區裁剪進行介紹,在標準資料庫術語中,裁剪意味著優化器將避免讀取不包含我們正在查找的資料的檔案,例如我們有以下的查詢 SQL:
Select * from iteblog.Students where subject = 'English';
在這個簡單的查詢中,我們試圖匹配和識別 Students 表中 subject = English 的記錄,比較愚蠢的做法是先把資料全部 scan 出來,然后再使用 subject = 'English' 去過濾,如下圖所示:
比較好的實作是查詢優化器將過濾器下推到資料源,以便能夠避免掃描整個資料集,Spark 就是這么來做的,如下圖所示:
在靜態磁區裁剪技術中,我們的表首先是磁區的,磁區過濾下推的思想和上面的 filter push down 一致,因為在這種情況下,如果我們的查詢有一個針對磁區列的過濾,那么在實際的查詢中可以跳過很多不必要的磁區,從而大大減少資料的掃描,減少磁盤I/O,從而提升計算的性能,
然而,在現實中,我們的查詢陳述句不會是這么簡單的,通常情況下,我們會有多張維表,小表需要與大的事實表進行 join,因此,在這種情況下,我們不能再應用靜態磁區裁剪,因為 filter 條件在 join 表的一側,而對裁剪有用的表在 Join 的另一側,比如我們有以下的查詢陳述句:
Select * from iteblog.Students join iteblog.DailyRoutine
where iteblog.DailyRoutine.subject = 'English';
對于上面的查詢,比較垃圾的查詢引擎最后的執行計劃如下:
它把兩張表的資料進行關聯,然后再過濾,在資料量比較大的情況下效率可想而知,一些比較好的計算引擎可以進行一些優化,比如:
其能夠在一張表里面先過濾一些無用的資料,再進行 Join,效率自然比前面一種好,但是如果是我們人來弄,其實我們可以把 subject = 'English' 過濾條件下推到 iteblog.Students 表里面,這個正是 Spark 3.0 給我們帶來的動態磁區裁剪優化,
動態磁區裁剪
在 Spark SQL 中,用戶通常用他們喜歡的編程語言并選擇他們喜歡的 API 來提交查詢,這也就是為什么有 DataFrames 和 DataSet,Spark 將這個查詢轉化為一種易于理解的形式,我們稱它為查詢的邏輯計劃(logical plan),在此階段,Spark 通過應用一組基于規則(rule based)的轉換(如列修剪、常量折疊、算子下推)來優化邏輯計劃,然后,它才會進入查詢的實際物理計劃(physical planning),在物理規劃階段 Spark 生成一個可執行的計劃(executable plan),該計劃將計算分布在集群中,本文我將解釋如何在邏輯計劃階段實作動態磁區修剪,然后,我們將研究如何在物理計劃階段中進一步優化它,
邏輯計劃階段優化
假設我們有一個具有多個磁區的事實表(fact table),為了方便說明,我們用不同顏色代表不同的磁區,另外,我們還有一個比較小的維度表(dimension table),我們的維度表不是磁區表,然后我們在這些資料集上進行典型的掃描操作,在我們的例子里面,假設我們只讀取維度表里面的兩行資料,而這兩行資料其實對于另外一張表的兩個磁區,所以最后執行 Join 操作時,帶有磁區的事實表只需要讀取兩個磁區的資料就可以,
因此,我們不需要實際掃描整個事實表,為了做到這種優化,一種簡單的方法是通過維度表構造出一個過濾子查詢(比如上面例子為 select subject from iteblog.DailyRoutine where subject = 'English'),然后在掃描事實表之前加上這個過濾子查詢,
通過這種方式,我們在邏輯計劃階段就知道事實表需要掃描哪些磁區,
但是,上面的物理計劃執行起來還是比較低效,因為里面有重復的子查詢,我們需要找出一種方法來消除這個重復的子查詢,為了做到這一點,Spark 在物理計劃階段做了一些優化,
物理計劃階段優化
如果維度表很小,那么 Spark 很可能會以 broadcast hash join 的形式執行這個 Join,Broadcast Hash Join 的實作是將小表的資料廣播(broadcast)到 Spark 所有的 Executor 端,這個廣播程序和我們自己去廣播資料沒什么區別,先利用 collect 算子將小表的資料從 Executor 端拉到 Driver 端,然后在 Driver 端呼叫 sparkContext.broadcast 廣播到所有 Executor 端;另一方面,大表也會構建 hash table(稱為 build relation),之后在 Executor 端這個廣播出去的資料會和大表的對應的磁區進行 Join 操作,這種 Join 策略避免了 Shuffle 操作,具體如下:
我們已經知道了 broadcast hash join 實作原理,其實動態磁區裁剪優化就是在 broadcast hash join 中大表進行 build relation 的時候拿到維度表的廣播結果(broadcast results),然后在 build relation 的時候(Scan 前)進行動態過濾,從而達到避免掃描無用的資料效果,具體如下:
好了,以上就是動態磁區裁剪在邏輯計劃和物理計劃的優化,
動態磁區裁剪適用條件
并不是什么查詢都會啟用動態裁剪優化的,必須滿足以下幾個條件:
?spark.sql.optimizer.dynamicPartitionPruning.enabled 引數必須設定為 true,不過這個值默認就是啟用的;?需要裁減的表必須是磁區表,而且磁區欄位必須在 join 的 on 條件里面;?Join 型別必須是 INNER, LEFT SEMI (左表是磁區表), LEFT OUTER (右表是磁區表), or RIGHT OUTER (左表是磁區表),?滿足上面的條件也不一定會觸發動態磁區裁減,還必須滿足 spark.sql.optimizer.dynamicPartitionPruning.useStats 和 spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio 兩個引數綜合評估出一個進行動態磁區裁減是否有益的值,滿足了才會進行動態磁區裁減,評估函式實作請參見 org.apache.spark.sql.dynamicpruning.PartitionPruning#pruningHasBenefit,
本文主要翻譯自:https://blog.knoldus.com/dynamic-partition-pruning-in-spark-3-0/
Java與大資料架構
7年老碼農,10W+關注者,【Java與大資料架構】全面分享Java編程、Spark、Flink、Kafka、Elasticsearch、資料湖等干貨,歡迎掃碼關注!
CSDN認證博客專家
過往記憶大資料
大資料
iteblog
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/246505.html
標籤:AI
下一篇:微信、支付寶簡單問題解決鏈接
