目錄
- ETL的架構
- ETL架構的優勢:
- 離線 ETL 的架構設計
- 離線 ETL 的模塊實作
- 資料分片(Split)
- 資料決議清洗(Read)
- 多檔案落地(Write)
- 檢測資料消費完整性 (Commit)
- 參考鏈接
ETL的架構
ETL架構的優勢:
ETL相對于EL-T架構可以實作更為復雜的資料轉化邏輯
ETL采用單獨的硬體服務器,可以分擔資料庫系統的負載
ETL與底層的資料庫資料存盤無關,可以保持所有的資料始終在資料庫當中,避免資料的加載和匯出,從而保證效率,提高系統的可監控性,
ELT主要通過資料庫引擎來實作系統的可擴展性(尤其是當資料加工程序在晚上時,可以充分利用資料庫引擎的資源)
ELT可以根據資料的分布情況進行并行處理優化,并可以利用資料庫的固有功能優化磁盤I/O,
ELT的可擴展性取決于資料庫引擎和其硬體服務器的可擴展性,
通過對相關資料庫進行性能調優,ETL程序獲得3到4倍的效率提升一般不是特別困難,
離線 ETL 的架構設計
離線 ETL 采用 MapReduce 框架處理清洗不同業務的資料,主要是采用了分而治之的思想,能夠水平擴展資料清洗的能力;
graph LR 1[Input] --> 2[Map] --> 3[Output]如上圖所示,離線 ETL 分為三個模塊:
- Input(InputFormat):主要對資料來源(Kafka 資料)進行決議分片,按照一定策略分配到不同的 Map 行程處理;創建 RecordReader,用于對分片資料讀取決議,生成 key-value 傳送給下游處理,
- Map(Mapper):對 key-value 資料進行加工處理,
- Output (OutputFormat):創建 RecordWriter 將處理過的 key-value 資料按照庫、表、磁區落地;最后在 commit 階段檢測訊息處理的完整性,
離線 ETL 的模塊實作
資料分片(Split)
我們從 kafka 獲取當前 topic&partition 最大的 offset 以及上次消費的截止 offset ,組成本次要消費的[beginOffset、endOffset]kafkaEvent,kafkaEvent 會打散到各個 Mapper 進行處理,最終這些 offset 資訊持久化到 mysql 表中,
那么如何保證資料不傾斜呢?首先通過配置自定義 mapper 個數,并創建對應個數的 ETLSplit,由于 kafkaEevent 包含了單個 topic&partition 之前消費的 Offset 以及將要消費的最大 Offset,即可獲得每個 kafkaEvent 需要消費的訊息總量,最后遍歷所有的 kafkaEevent,將當前 kafkaEevent 加入當前最小的 ETLSplit(通過比較需要消費的資料量總和,即可得出),通過這樣生成的 ETLSplit 能盡量保證資料均衡,
資料決議清洗(Read)
如上圖所示,首先每個分片會有對應的 RecordReader 去決議,RecordReade 內包含多個 KafkaConsumerReader ,就是對每個 KafkaEevent 進行消費,每個 KafkaEevent 會對應一個 KafkaConsumer,拉取了位元組資料訊息之后需要對此進行 decode 反序列化,此時就涉及到 MessageDecoder 的結構,MessageDecoder 目前支持三種格式:
| 格式 | 涉及 topic |
|---|---|
| Avro | android、ios、ad_sdk_android... |
| Json | app-server-meipai、anti-spam... |
| DelimiterText | app-server-youyan、app-server-youyan-im... |
MessageDecoder 接收到 Kafka 的 key 和 value 時會對它們進行反序列化,最后生成 ETLKey 和 ETLValue,同時 MessageDecoder 內包含了 Injector,它主要做了如下事情:
- 注入 Aid:針對 arachnia agent 采集的日志資料,決議 KafkaKey 注入日志唯一標識 Aid;
- 注入 GeoIP 資訊:根據 GeoIP 決議 ip 資訊注入地理資訊(如 country_id、province_id、city_id);
- 注入 SdkDeviceInfo: 本身實時流 ETL 會做注入 gid、is_app_new 等資訊,但是離線 ETL 檢測這些資訊是否完整,做進一步保障,
程序中還有涉及到 DebugFilter,它將 SDK 除錯設備的日志過濾,不落地到 HDFS,
多檔案落地(Write)
由于 MapReduce 本身的 RecordWriter 不支持單個落地多個檔案,需要特殊處理,并且 HDFS 檔案是不支持多個行程(執行緒)writer、append,于是我們將KafkaKey+ 業務磁區+ 時間磁區 + Kafka partition定義一個唯一的檔案,每個檔案都是會到帶上 kafka partition 資訊,同時對每個檔案創建一個RecordWriter,
每個 RecordWriter 包含多個 Writer ,每個 Writer 對應一個檔案,這樣可以避免同一個檔案多執行緒讀寫,目前是通過 guava cache 維護 writer 的數量,如果 writer 太多或者太長時間沒有寫訪問就會觸發 close 動作,待下批有對應目錄的 kafka 訊息在創建 writer 進行 append 操作,這樣我們可以做到在同一個 map 內對多個檔案進行寫入追加,
檢測資料消費完整性 (Commit)
MapReduce Counter 為提供我們一個視窗,觀察統計 MapReduce job 運行期的各種細節資料,并且它自帶了許多默認 Counter,可以檢測資料是否完整消費:
reader_records: 決議成功的訊息條數;
decode_records_error: 決議失敗的訊息條數;
writer_records: 寫入成功的訊息條數;
...
最后通過本次要消費 topic offset 數量、reader_records 以及 writer_records 數量是否一致,來確認訊息消費是否完整,
允許一定比例的臟資料,若超出限度會生成短信告警
參考鏈接
https://blog.csdn.net/javastart/article/details/113838240
美圖離線ETL實踐 - 掘金 (juejin.cn)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/544528.html
標籤:其他
上一篇:獲取樹形資料的全路徑
