原文:Flink 流式聚合性能調優指南
SQL 是資料分析中使用最廣泛的語言,Flink Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應用程式,此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和算子優化,但并不是所有的優化都是默認開啟的,因此對于某些作業負載,可以通過打開某些選項來提高性能,
這里將介紹一些實用的優化選項以及流式聚合的內部原理,它們在某些情況下能帶來很大的提升,
注意:(1)目前,這里提到的優化選項僅支持 Blink planner,(2)目前,流聚合優化僅支持無界聚合,視窗聚合優化將在未來支持,
默認情況下,無界聚合算子是逐條處理輸入的記錄,即:(1)從狀態中讀取累加器,(2)累加/撤回記錄至累加器,(3)將累加器寫回狀態,(4)下一條記錄將再次從(1)開始處理,這種處理模式可能會增加 StateBackend 開銷(尤其是對于 RocksDB StateBackend ),此外,生產中非常常見的資料傾斜會使這個問題惡化,并且容易導致 job 發生反壓,
MiniBatch 聚合
MiniBatch 聚合的核心思想是將一組輸入的資料快取在聚合算子內部的緩沖區中,當輸入的資料被觸發處理時,每個 key 只需一個操作即可訪問狀態,這樣可以大大減少狀態開銷并獲得更好的吞吐量,但是,這可能會增加一些延遲,因為它會緩沖一些記錄而不是立即處理它們,這是吞吐量和延遲之間的權衡,
下圖說明了 mini-batch 聚合如何減少狀態操作,
Flink 流式聚合性能調優指南
默認情況下 mini-batch 優化是被禁用的,開啟這項優化,需要設定選項
table.exec.mini-batch.enabled、
table.exec.mini-batch.allow-latency 和
table.exec.mini-batch.size,
下面的例子顯示如何啟用這些選項,
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
Local-Global 聚合
Local-Global 聚合是為解決資料傾斜問題提出的,通過將一組聚合分為兩個階段,首先在上游進行本地聚合,然后在下游進行全域聚合,類似于 MapReduce 中的 Combine + Reduce 模式,例如,就以下 SQL 而言:
SELECT color, sum(id)
FROM T
GROUP BY color
資料流中的記錄可能會傾斜,因此某些聚合算子的實體必須比其他實體處理更多的記錄,這會產生熱點問題,本地聚合可以將一定數量具有相同 key 的輸入資料累加到單個累加器中,全域聚合將僅接收 reduce 后的累加器,而不是大量的原始輸入資料,這可以大大減少網路 shuffle 和狀態訪問的成本,每次本地聚合累積的輸入資料量基于 mini-batch 間隔,這意味著 local-global 聚合依賴于啟用了 mini-batch 優化,
下圖顯示了 local-global 聚合如何提高性能,
Flink 流式聚合性能調優指南
下面的例子顯示如何啟用 local-global 聚合,
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
Local-Global 優化可有效消除常規聚合的資料傾斜,例如 SUM、COUNT、MAX、MIN、AVG,但是在處理 distinct 聚合時,其性能并不令人滿意,
例如,如果我們要分析今天有多少唯一用戶登錄,我們可能有以下查詢:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果 distinct key (即 user_id)的值分布稀疏,則 COUNT DISTINCT 不適合減少資料,即使啟用了 local-global 優化也沒有太大幫助,因為累加器仍然包含幾乎所有原始記錄,并且全域聚合將成為瓶頸(大多數繁重的累加器由一個任務處理,即同一天),
這個優化的想法是將不同的聚合(例如 COUNT(DISTINCT col))分為兩個級別,第一次聚合由 group key 和額外的 bucket key 進行 shuffle,bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 計算的,BUCKET_NUM 默認為1024,可以通過
table.optimizer.distinct-agg.split.bucket-num 選項進行配置,第二次聚合是由原始 group key 進行 shuffle,并使用 SUM 聚合來自不同 buckets 的 COUNT DISTINCT 值,由于相同的 distinct key 將僅在同一 bucket 中計算,因此轉換是等效的,bucket key 充當附加 group key 的角色,以分擔 group key 中熱點的負擔,bucket key 使 job 具有可伸縮性來解決不同聚合中的資料傾斜/熱點,
拆分 distinct 聚合后,以上查詢將被自動改寫為以下查詢:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下圖顯示了拆分 distinct 聚合如何提高性能(假設顏色表示 days,字母表示 user_id),
Flink 流式聚合性能調優指南
注意:上面是可以從這個優化中受益的最簡單的示例,除此之外,Flink 還支持拆分更復雜的聚合查詢,例如,多個具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以與其他非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )一起使用,
注意 當前,拆分優化不支持包含用戶定義的 AggregateFunction 聚合,
下面的例子顯示了如何啟用拆分 distinct 聚合優化,
// instantiate table environment
TableEnvironment tEnv = ...
tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修飾符
在某些情況下,用戶可能需要從不同維度計算 UV(獨立訪客)的數量,例如來自 Android 的 UV、iPhone 的 UV、Web 的 UV 和總 UV,很多人會選擇 CASE WHEN,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在這種情況下,建議使用 FILTER 語法而不是 CASE WHEN,因為 FILTER 更符合 SQL 標準,并且能獲得更多的性能提升,FILTER 是用于聚合函式的修飾符,用于限制聚合中使用的值,將上面的示例替換為 FILTER 修飾符,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 優化器可以識別相同的 distinct key 上的不同過濾器引數,例如,在上面的示例中,三個 COUNT DISTINCT 都在 user_id 一列上,Flink 可以只使用一個共享狀態實體,而不是三個狀態實體,以減少狀態訪問和狀態大小,在某些作業負載下,可以獲得顯著的性能提升,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/4556.html
標籤:大數據
上一篇:Kafka訊息送達語意說明
