概要
資料湖的業務場景主要包括對資料庫、日志、檔案的分析,而管理資料湖有兩點比較重要:寫入的吞吐量和查詢性能,這里主要說明以下問題:
1、為了獲得更好的寫入吞吐量,通常把資料直接寫入檔案中,這種情況下會產生很多小的資料檔案,雖然小檔案的使用可以增加寫入的并行度,且能夠并行讀取檔案以提高讀取速度,但會出現一個資料量很小,需要從多個小檔案中讀取資料,增加了很多IO,
2、資料按照進入資料湖的方式寫入到檔案中,在同一個檔案上,資料區域性不是最佳的, 資料之間,與傳入批次相關,相近的批次的資料會相關聯,而不是與經常要查詢的資料相關聯,所以小檔案的大小和缺乏資料區域性會降低查詢性能,
3、此外,許多檔案系統(包括 hdfs),當有很多小檔案時,性能會下降,
hudi clustering
hudi支持clustering功能,在不影響查詢性能的情況下提高寫入吞吐量,該功能可以以不同方式重寫資料:
1、資料先寫入小檔案,在滿足某些條件后(例如經過的時間、小檔案數量、commit次數等),將小檔案拼接成大檔案,
2、通過對不同列上的資料進行排序,來更改磁盤上的資料布局,已提高資料間的相關性,可以提高查詢性能,
實作
(用戶可以將小檔案的限制 hoodie.parquet.small.file.limit 配置為 0,這樣可以強制將資料進入新的檔案組,)
cow表的timeline

在上面的示例流程圖中,顯示了隨時間(t5 到 t9)的磁區狀態, 主要有以下步驟:
- 在 t5,表中的一個磁區有 5 個檔案組 f0、f1、f2、f3、f4,分別在 t0、t1、t2、t3、t4時刻被創建, 假設每個檔案組為 100MB, 所以磁區中的總資料為 500MB,
- 在 t6 請求 clustering 操作, 與壓縮類似,我們在帶有“ClusteringPlan”的元資料中創建了一個“t6.clustering.requested”檔案,其中包含跨所有磁區的集群操作涉及的所有檔案組,例如:{ partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”] }
- 假設clustering后的最大檔案大小配置為 250MB, 集群會將磁區中的所有資料重新分配到兩個檔案組中:c1、c2, 此時這些檔案組是“虛假”的,在 t8 clustering 完成之前,對查詢不可見,
- 請注意,檔案組中的記錄可以拆分為多個檔案組, 在此示例中,來自 f4 檔案組的一些記錄同時轉到了新檔案組 c1、c2,
- 當集群正在進行時(t6 到 t8),任何涉及到這些檔案組的更新插入都會被拒絕,
- 在寫入新的資料檔案 c1-t6.parquet 和 c2-t6.parquet 后,如果配置了全域索引,我們會在記錄級索引中為所有具有新位置的鍵添加條目, 新的索引條目對其他寫入將不可見,因為還沒有關聯的提交,
- 最后,我們創建一個提交元資料檔案“t6.commit”,其中包含由此次提交修改的檔案組(f0、f1、f2、f3、f4),
- 注:檔案組(f0 到 f4)不會立即從磁盤中洗掉, cleaner 會在歸檔 t6.commit 之前清理這些檔案, 并且,clustering 還會更新所有視圖和源資料檔案,
mor表的時間線

這種方法同樣支持mor表,且程序與cow 表非常相似,
clustering 的為 parquet 格式檔案,
Clustering 操作步驟
總體來說,需要兩步:
- clustering 調度:創建 clustering 計劃
- 執行 clustering:執行計劃,創建新的檔案,并替換舊的檔案,
clustering 調度
- 識別符合集群條件的檔案
- 過濾特定磁區(根據配置優先考慮最新磁區或舊磁區)
- 任何大小 > targetFileSize 的檔案都不符合條件
- 任何有待定壓縮/clustering計劃的檔案都不符合條件
- 任何具有日志檔案的檔案組都不符合集群條件(該限制以后可能會被取消)
- 根據特定條件對符合聚類條件的檔案進行分組, 每個組的資料大小預計是“targetFileSize”的倍數, 分組是作為計劃中定義的“策略”的一部分完成的:
- 根據記錄鍵范圍對檔案進行分組,因為鍵值范圍存盤在parquet footer中,這個可用于某些查詢/更新,
- 根據提交時間對檔案進行分組,
- 對自定義列,且具有重疊值的檔案進行分組(指定列進行排序)
- 分組隨機檔案
- 我們可以限制組大小以提高并行性
- 根據特定條件過濾組(類似于 CompactionStrategy 中的 orderAndFilter)
- 最后,clustering計劃被保存到timeline中,
執行 clustering
- 讀取clustering計劃,查看“clusteringGroups”的數量(用于并行性),
- 創建 inflight狀態的 clustering 檔案
- 對于每組:
- 使用 strategyParams 實體化適當的策略類(例如:sortColumns)
- 策略類定義了磁區器,我們可以用它來創建桶并寫入資料,
- 創建 replacecommit:
- operationType 設定為“clustering”,
- 擴展元資料,并存盤附加欄位以跟蹤重要資訊(策略類可以回傳這些額外的元資料資訊)
- 用于合并檔案的策略
- 跟蹤替換檔案
【參考】
https://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategyclass
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/355442.html
標籤:大數據
