文章目錄
- 前言
- 一、離線資料的主要挑戰:“資料傾斜”
- 二、Hive 的優化
- 三、Join 無關的優化
- 3.1 group by 引起的傾斜優化
- 3.2 count distinct 優化
- 四、大表 Join 小表優化
- 五、大表 Join 大表優化
- 5.1 問題場景
- 方案 1:轉化為 mapjoin
- 方案 2:join 時用 case when 陳述句
- 方案 3:倍數B表,再取模join
- 方案 4:動態一分為二
前言
1024,1GB,一級棒!程式仔們節日快樂!

指尖流動的 1024 行代碼,到底是什么?
是10行的迷茫?
是101行的叛逆?
是202行的理性思考?
是307行對渴望的沖動?
還是404行對未知的追尋?
你心中,一定會有答案!
前面,我們陸陸續續聊過了 Hadoop原理實戰、 Hive 的底層原理實踐,今天就來聊一聊大家最關心的 Hive 優化實踐,
實際搞過離線資料處理的同學都知道,Hive SQL 的各種優化方法都是和資料傾斜密切相關的,所以我會先來聊一聊 “資料傾斜” 的基本概念,然后再在此基礎上為大家介紹各種場景下的 Hive 優化方案,
Hive 的優化分為 join 相關的優化 和 join 無關的優化 ,
從專案實際來說, join 相關的優化其實占據了 Hive 優化的大部分內容,而 join 相關的優化又分為 mapjoin 可以解決的 join 優化和 mapjoin 無法解決的 join 優化,
一、離線資料的主要挑戰:“資料傾斜”
首先介紹 “資料傾斜” 的概念,
“傾斜”應該來自于統計學里的的偏態分布,資料處理種的傾斜和此相關,
對于分布式資料處理來說,我們希望資料平均分布到每個處理節點,如下圖所示:

但是實際上由于業務資料本身的問題或者分布演算法的問題,每個節點分配到的資料量很可能并不是我們預想的那樣,比如:

甚至還會出現更極端的情況:

也就是說,只有待分到最多資料的節點處理完資料,整個資料處理任務才能完成,時分布式的意義就大打折扣 ,想想那個卡死的 99% ,
實際上,即使每個節點分配到的資料量大致相同,資料仍可能傾斜,比如考慮統計詞頻的極端問題,如果某個節點分配到的詞都是一個詞,那么顯此節點需要的耗時將很長,即使其資料量和其他節點的資料量相同,
Hive 的優化正是采用各種措施和方法對上述場景的傾斜問題進行優化和處理,
二、Hive 的優化
其實在實際 Hive SQL 開發的程序中, Hive SQL 性能的問題上實際只有一小部分和資料傾相關,
很多時候, Hive SQL 運行得慢是由開發人員對于使用的資料了解不夠以及一些不良的使用習慣引起的,
開發人員 要確定以下幾點:
- 需要計算的指標真的需要從資料倉庫的公共明細層來自行匯總么? 是不是資料公共層團隊開發的公共匯總層已經可以滿足自己的需求?對于大眾的、 KPI 相關的指標等通常設計良好的資料倉庫公共層肯定已經包含了,直接使用即可,
- 真的需要掃描這么多磁區么? 比如對于銷售明細事務表來說,掃描一年的磁區和掃描一周的磁區所帶來的計算、 IO 開銷完全是兩個量級,所耗費的時間肯定也是不同的,作為開發人員,我們需要仔細考慮業務的需求,盡量不要浪費計算和存盤資源!
- 盡量不要使用
select * from your_table這樣的方式,用到哪些列就指定哪些列, 如select coll, col2 from your_table,另外, where 條件中也盡量添加過濾條件,以去掉無關的資料行,從而減少整個 MapReduce 任務中需要處理、分發的資料量, - 輸入檔案不要是大量的小檔案, Hive 的默認 Input Split 是 128MB (可配置),小檔案可先合并成大檔案,
在保證了上述幾點之后,有的時候發現 Hive SQL 還是要運行很長時間,甚至運行不出來, 這時就需要真正的 Hive 優化技術了!

三、Join 無關的優化
Hive SQL 性能問題基本上大部分都和 join 相關,對于和 join 無關的問題主要有 groupby 相關的傾斜和 count distinct 相關的優化,
3.1 group by 引起的傾斜優化
group by 引起的傾斜主要是輸入資料行按照 group by 列分布不均勻 引起的,
比如,假設按照供應商對銷售明細事實表來統計訂單數,那么部分大供應商的訂單量顯然非常多,而多數供應商的訂單量就一般,由于 group by 的時候是按照供應商的 ID 分發到每個 Reduce Task ,那么此時分配到大供應商的 Reduce Task 就分配了更多的訂單,從而導致資料傾斜,
對于 group by 引起的傾斜,優化措施非常簡單,只需設定下面引數即可:
set hive.map.aggr = true
set hive.groupby.skewindata=true
此時 Hive 在資料傾斜的時候會進行負載均衡,生成的查詢計劃會有兩個 MapReduce Job,
- 第一個 MapReduce Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個Reduce 做部分聚合操作并輸出結果,這樣處理的結果是相同的 GroupBy Key 有可能被分布到不同的 Reduce 中,從而達到負載均衡的目的;
- 第二個 MapReduce Job 再根據預處理的資料結果,按照 GroupBy Key 分布到 Reduce 中(這程序可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作,
3.2 count distinct 優化
在 Hive 開發程序中,應該小心使用 count distinct ,因為很容易引起性能問題,比如下面的 SQL:
select count(distinct user) from some_table
由于必須去重,因此 Hive 將會把 Map 階段的輸出全部分布到 Reduce Task 上,此時很容易引起性能問題,對于這種情況,可以通過先 group by 再 count 的方式來優化,優化后的 SQL 如下:
select count(*) from (
select user
from some_table
group by user
) tmp;
原理為:先利用 group by 去重,再統計 group by 的行數目,
四、大表 Join 小表優化
join 相關的優化主要分為 mapjoin 可以解決的優化 ( 即大表 join 小表) 和 mapjoin 無法解決的優化( 即大表 join 大表 ), 大表 join 小表相對容易解決,大表 join 大表相對復雜和難以解決,但也不是不可解決的,只是相對比較麻煩而已,
首先介紹大表 join 小表優化 ,仍以銷售明細事實表為例來說明大表 join 小表的場景,
假如供應商會進行評級,比如(五星、四星、 兩星、 一星),此時業務人員希望能夠分析各供應商星級的每天銷售情況及其占比,
開發人員一般會寫出如下 SQL:
select Seller_srar, count(order_id) as ordre_cnt
from (
select order_id,seller_id
from dwd_sls_fact_detail_table
where partition_value ='20170101'
) a
Left outer join(
select seller_id,seller_star
from dim_seller
where partition_value='20170101'
) b
on a.seller_id = b.seller_id
group by b.seller_star;
但正如上述所言,現實世界的二八準則將導致訂單集中在部分供應商上,而好的供應商的評級通常會更高,此時更加劇了資料傾斜的程度,如果不加以優化,上述 SQL 將會耗費很長時間,甚至運行不出結果!
通常來說,供應商是有限的,比如上千家、上萬家,資料量不會很大,而銷售明細事實表比較大,這就是典型的大表 join 小表問題,可以通過 mapjoin 的方式來優化,只需添加 mapjoin hint 即可,優化后的 SQL 如下:
select /*+mapjoin(b)*/ Seller_srar, count(order_id) as ordre_cnt
from (
select order_id,seller_id
from dwd_sls_fact_detail_table
where partition_value ='20170101'
) a
Left outer join(
select seller_id,seller_star
from dim_seller
where partition_value='20170101'
) b
on a.seller_id = b.seller_id
group by b.seller_star;
/*+mapjoin(b)*/ 即 mapjoin himt,如果需要 mapjoin 多個表,則格式為/*+mapjoin(b,c,d)*/ ,
Hive 對于 mapjoin 是默認開啟的,設定引數為:
Set hive.auto.convert.join=ture;
mapjoin 優化是在 Map 階段進行 join ,而不是像通常那樣在 Reduce 階段按照 join 列進行分發后在每個 Reduce 任務節點上進行 join ,不需要分發也就沒有傾斜的問題,相反 Hive 會將小表全量復制到每個 Map 任務節點(對于本例是 dim_seller ,當然僅全量復制 b表 sql 指定的列),然后每個 Map 任務節點執行 lookup 小表即可,
從上述分析可以看出,小表不能太大,否則全量復制分發得不償失,
-
實際上 Hive 根據引數
hive.mapjoin.smalltable.filesize( 0.11.0 本后是hive.auto.convert.join.noconditionaltask.size)來確定小表的大小是否滿足條件(默認 25M), -
實際中此引數值所允許的最大值可以修改,但是一般最大不能超過 1GB (太大的話 Map 任務所在的節點記憶體會撐爆, Hive 會報錯 ,另外需要注意的是, HDFS 顯示的檔案大小是壓縮后的大小, 當實際加載到記憶體的時候,容量會增大很多,很多場景下可能會膨脹 10 倍),
五、大表 Join 大表優化
如果上述 mapjoin 中小表 dim_seller 很大呢?比如超過了 1GB 的大小?這種就是大表join 大表的問題 ,
這類問題相對比較復雜,我們首先引入具體的問題場景,然后基于此介紹各種優化方案,
5.1 問題場景
我們先假設一個問題場景:
A 表為一個匯總表,匯總的是賣家買家最近 N 天交易匯總資訊,即對于每個賣家最近 N 天,其每個買家共成交了多少單、總金額是多少,我們這里 N 先只取 90 天,匯總值僅取成交單數 ,A 表的欄位有:
buyer_id 、seller_id 和 pay_cnt_90d,
B 表為賣家基本資訊表,其中包含賣家的一個分層評級資訊,比如把賣家分為 6 個級別: S0、S1、S2、S3、S4、S5、S6 ,
要獲得的結果是每個買家在各個級別賣家的成交比例資訊,比如:
某買家 S0:10%; S1:20%; S2:20%; S3:10%; S4:20%; S4:10%; S5:10%,
B表的欄位有: seller_id 和 s_level,
正如 mapjoin 中的例子一樣,我們的第一反應是直接 join 表并統計:
select
m.buyer_id
,sum(pay_cnt_90d) as pay_cnt_90d
,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0
,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2
,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4
,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5
from
(
select
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
from
(
select buyer_id ,seller_id,pay_cnt_90d
from table A
) a
join
(
select seller_id,s_level
from table B
) b
on a.seller_id=b.seller_id
) m
group by m.buyer_id
但是此 SQL 會引起資料傾斜,原因在于賣家的二八準則,某些賣家 90 天內會有幾百萬甚至上千萬的買家,但是大部分賣家 90 天內的買家數目并不多, join table_A 和table_B 的時候 ODPS 會按照 Seller_id 進行分發, table_A 的大賣家引起了資料傾斜,
但是本資料傾斜問題無法用 mapjoin table_B 解決,因為賣家有超過千萬條、檔案大小幾個GB ,超過了 mapjoin 表最大 1GB 的限制,
方案 1:轉化為 mapjoin
大表無法直接mapjoin,那么是否可以間接呢?實際上此思路有兩種途徑:限制行和限制列,
- 限制行: 不需要join B全表,只需要join其在A表中存在的,對于本問題場景,就是過濾掉 90 天內沒有成交的賣家,
- 限制列: 只取需要的欄位,
select
m.buyer_id
,sum(pay_cnt_90d) as pay_cnt_90d
,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0
,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2
,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4
,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5
from
(
select /*+mapjoin(b)*/
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
from
(
select buyer_id ,seller_id,pay_cnt_90d
from table_A
) a
join
(
select b0.seller id,s_level
from table_B b0
join
(select seller_id from table_A group by seller_id) a0
on b0.seller_id=a0.seller_id
) b
on a.seller_id=b.seller_id
) m
group by m.buyer_id
此方案在一些情況下可以起作用,但很多時候還是無法解決上述問題,因為大部分賣家盡管 90 買家不多 ,但還是有一些的,過濾后的 B
表仍然很大,
方案 2:join 時用 case when 陳述句
應用場景為: 傾斜的值是明確的而且數量很少,比如null值引起的傾斜,
將這些引起傾斜的值隨機分發到Reduce,其主要核心邏輯在于 join 時對這些特殊值concat 亂數,從而達到隨機分發的目的,核心邏輯如下:
Select a.user_id,a.order_id,b.user_id
From table_a a
Join table_b b
On (case when a.user_id is null then concat ('hive' ,rand()) else a.user_id end)=b.user_id
Hive已對此進行了優化,不需要修改SQL,只需要設定引數;比如 table_B 的值 “0” 和 “1” 引起傾斜,只需要如下設定:
set hive.optimize.skewinfo=table_B:(seller_id)[("0")("1")];
set hive.optimize.skewjoin=true;
但是方案二還是不能解決上述問題,因為傾斜的賣家大量存在而且動態變化,
方案 3:倍數B表,再取模join
- 通用方案
是建立一個numbers表,其值只有一列int行,比如從1到10(具體根據傾斜程度確定),然后放大B表10倍,再取模join,
select
m,buer_id
,sum(pay_cnt_90d) as pay_cnt_90d
,sum(case when m.s_level=O then pay_cnt_90d end) as pay cnt 90d so
,sum(case when m.s_level=l then pay cnt 90d end) as pay cnt 90d_sl
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d s2
,sum(case when m.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay cnt 90d s4
,sum(case when m.s level=S then pay cnt 90d end) as pay cnt 90d s5
from
(
select
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
from
(
select buyer_id,seller_id,pay_cnt_90d
from table_A
) a
JOin
(
select /*+mapjoin(members)*/
seller_id,s_level,member
from table_B
join
numbers
) b
on a.seller_id=b.seller_id
and mod(a.pay_cnt_90d,10)+1=b.number
) m
group by m.buyer_id
思路核心在于:既然按照seller_id分發會傾斜,那么再人工增加一列進行分發,這樣之前傾斜的值的傾斜程度會減少為原來的1/10,可以通過配置numbers表修改放大倍數來降低傾斜程度,但弊端就是B表會膨脹N倍,
- 專有方案
通用方案思路是把B表的每條資料都放大了相同的倍數,實際上只需要把大賣家放大倍數即可,
首先需要知道大賣家的名單,即先建立一個臨時表動態存放每日最新的大賣家(比如dim_big_seller),同時此表的大賣家要膨脹預先設定的倍數(比如1000倍),
在A表和 B表中分別新建一個 join 列,其邏輯為:如果是大賣家,那么 concat 一個隨
機分配正整數(0到預定義的倍數之間,本例為0~1000 );如果不是,保持不變,

相比通用方案,專用方案的運行效率明顯好了很多,因為只是將B表中大賣家的行數放大了 1000 倍,其他賣家的行數保持不變,但同時也可以看到代碼也復雜了很多,而且必須首先建立大賣家表,
方案 4:動態一分為二
實際上方案 2 和 3 都用到了一分為二的思想,但是都不徹底,對于 mapjoin 不能解決的
問題,終極解決方案就是動態一分為 ,即對傾斜的鍵值和不傾斜的鍵值分開處理,不傾
斜的正常 join 即可,傾斜的把它們找出來然后做 mapjoin ,最后 union all 其結果即可,
但是此種解決方案比較麻煩,代碼會變得復雜而且需要一個臨時表存放傾斜的鍵值,

-- 對于 90 天買家數超過 10000 的賣家直接 map join ,對于其他賣家正常 join 即可
select
m.buyer_id
,surn(pay_cnt_90d) as pay_cnt_90d
,surn(case when rn.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0
,surn(case when rn.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl
,surn(case when rn.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2
,surn(case when rn.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3
,surn(case when rn.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4
,surn(case when rn.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5
from
(
select
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
from
(
select buyer_id,seller_id,pay_cnt_90d
from table_A
) a
join
(
select seller_id ,a.s_level
from table_A a
left outer join tmp_table_B b
on a.user_id = b.seller_id
where b.seller_id is null
) b
on a.seller id=b.seller id
union all
select /*+mapjoin(b)*/
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
from
select buyer_id,seller_id,pay_cnt_90d
from table A
) a
join
select seller_id,s_level
from table B
) b
on a.seller id=b.seller id
) m group by m.buyer_id
) m
group by m.byer_id
總結起來,方案 1、2 以及方案 3 中的通用方案不能保證解決大表 join 大表問題,因為它們都存在種種不同的限制和特定的使用場景,
而方案 3 的專用方案和方案 4 是比較推薦的優化方案,但是它們都需要新建一個臨時表來存放每日動態變化的大賣家 ,
相對方案 4 來說,方案 3 的專用方案不需要對代碼框架進行修改,但是 B 表會被放大,所以一定要是維度表,不然統計結果會是錯誤的 , 方案 4 的解決方案最通用,自由度最高,但是對代碼的更改也最大,甚至需要更改代碼框架,可作為終極方案來使用,
我是「云祁」,一枚熱愛技術、會寫詩的大資料開發猿,
CSDN認證博客專家
Flink
Spark
資料中臺
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/191014.html
標籤:其他
