主頁 > 資料庫 > DataLeap的Catalog系統近實時訊息同步能力優化

DataLeap的Catalog系統近實時訊息同步能力優化

2022-09-21 11:15:05 資料庫

更多技術交流、求職機會,歡迎關注位元組跳動資料平臺微信公眾號,回復【1】進入官方交流群

 

摘要

位元組資料中臺DataLeap的Data Catalog系統通過接收MQ中的近實時訊息來同步部分元資料,Apache Atlas對于實時訊息的消費處理不滿足性能要求,內部使用Flink任務的處理方案在ToB場景中也存在諸多限制,所以團隊自研了輕量級異步訊息處理框架,很好的支持了位元組內部和火山引擎上同步元資料的訴求,本文定義了需求場景,并詳細介紹框架的設計與實作,

背景

動機

位元組資料中臺DataLeap的Data Catalog系統基于Apache Atlas搭建,其中Atlas通過Kafka獲取外部系統的元資料變更訊息,在開源版本中,每臺服務器支持的Kafka Consumer數量有限,在每日百萬級訊息體量下,經常有長延時等問題,影響用戶體驗,

在2020年底,我們針對Atlas的訊息消費部分做了重構,將訊息的消費和處理從后端服務中剝離出來,并撰寫了Flink任務承擔這部分作業,比較好的解決了擴展性和性能問題,然而,到2021年年中,團隊開始重點投入私有化部署和火山公有云支持,對于Flink集群的依賴引入了可維護性的痛點,

在仔細的分析了使用場景和需求,并調研了現成的解決方案后,我們決定投入人力自研一個訊息處理框架,當前這個框架很好的支持了位元組內部以及ToB場景中Data Catalog對于訊息消費和處理的場景,

本文會詳細介紹框架解決的問題,整體的設計,以及實作中的關鍵決定,

需求定義

使用下面的表格將具體場景定義清楚,

相關作業

在啟動自研之前,我們評估了兩個比較相關的方案,分別是Flink和Kafka Streaming,

Flink是我們之前生產上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性,在公有云場景,那個階段Flink服務在火山云上還沒有發布,我們自己的服務又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環境一定有Flink集群,即使部署的資料底座中帶有Flink,后續的維護也是個頭疼的問題,另外一個角度,作為通用流式處理框架,Flink的大部分功能其實我們并沒有用到,對于單條訊息的流轉路徑,其實只是簡單的讀取和處理,使用Flink有些“殺雞用牛刀”了,

另外一個比較標準的方案是Kafka Streaming,作為Kafka官方提供的框架,對于流式處理的語意有較好的支持,也滿足我們對于輕量的訴求,最終沒有采用的主要考慮點是兩個:

  • 對于Offset的維護不夠靈活:我們的場景不能使用自動提交(會丟訊息),而對于同一個Partition中的資料又要求一定程度的并行處理,使用Kafka Streaming的原生介面較難支持,

  • 與Kafka強系結:大部分場景下,我們團隊不是元資料訊息佇列的擁有者,也有團隊使用RocketMQ等提供元資料變更,在應用層,我們希望使用同一套框架兼容,

設計

概念說明

  • MQ Type:Message Queue的型別,比如Kafka與RocketMQ,后續內容以Kafka為主,設計一定程度兼容其他MQ,

  • Topic:一批訊息的集合,包含多個Partition,可以被多個Consumer Group消費,

  • Consumer Group:一組Consumer,同一Group內的Consumer資料不會重復消費,

  • Consumer:消費訊息的最小單位,屬于某個Consumer Group,

  • Partition:Topic中的一部分資料,同一Partition內訊息有序,同一Consumer Group內,一個Partition只會被其中一個Consumer消費,

  • Event:由Topic中的訊息轉換而來,部分屬性如下,

    Event Type:訊息的型別定義,會與Processor有對應關系;

    Event Key:包含訊息Topic、Partition、Offset等元資料,用來對訊息進行Hash操作;

  • Processor:訊息處理的單元,針對某個Event Type定制的業務邏輯,

  • Task:消費訊息并處理的一條Pipeline,Task之間資源是相互獨立的,

框架架構

整個框架主要由MQ Consumer, Message Processor和State Manager組成,

  • MQ Consumer:負責從Kafka Topic拉取訊息,并根據Event Key將訊息投放到內部佇列,如果訊息需要延時消費,會被投放到對應的延時佇列;該模塊還負責定時查詢State Manager中記錄的訊息狀態,并根據回傳提交訊息Offset;上報與訊息消費相關的Metric,

  • Message Processor:負責從佇列中拉取訊息并異步進行處理,它會將訊息的處理結果更新給State Manager,同時上報與訊息處理相關的Metric,

  • State Manager:負責維護每個Kafka Partition的訊息狀態,并暴露當前應提交的Offset資訊給MQ Consumer,

實作

執行緒模型

 

 

每個Task可以運行在一臺或多臺實體,建議部署到多臺機器,以獲得更好的性能和容錯能力,

每臺實體中,存在兩組執行緒池:

  • Consumer Pool:負責管理MQ Consumer Thread的生命周期,當服務啟動時,根據配置拉起一定規模的執行緒,并在服務關閉時確保每個Thread安全退出或者超時停止,整體有效Thread的上限與Topic的Partition的總數有關,

  • Processor Pool:負責管理Message Processor Thread的生命周期,當服務啟動時,根據配置拉起一定規模的執行緒,并在服務關閉時確保每個Thread安全退出或者超時停止,可以根據Event Type所需要處理的并行度來靈活配置,

兩類Thread的性質分別如下:

  • Consumer Thread:每個MQ Consumer會封裝一個Kafka Consumer,可以消費0個或者多個Partition,根據Kafka的機制,當MQ Consumer Thread的個數超過Partition的個數時,當前Thread不會有實際流量,

  • Processor Thread:唯一對應一個內部的佇列,并以FIFO的方式消費和處理其中的訊息,

StateManager

 

 

在State Manager中,會為每個Partition維護一個優先佇列(最小堆),佇列中的資訊是Offset,兩個優先佇列的職責如下:

  • 處理中的佇列:一條訊息轉化為Event后,MQ Consumer會呼叫StateManager介面,將訊息Offset 插入該佇列,

  • 處理完的佇列:一條訊息處理結束或最終失敗,Message Processor會呼叫StateManager介面,將訊息Offset插入該佇列,

  1. MQ Consumer會周期性的檢查當前可以Commit的Offset,情況列舉如下:

  • 處理中的佇列堆頂 < 處理完的佇列堆頂或者處理完的佇列為空:代表當前消費回來的訊息還在處理程序中,本輪不做Offset提交,

  • 處理中的佇列堆頂 = 處理完的佇列堆頂:表示當前訊息已經處理完,兩邊同時出隊,并記錄當前堆頂為可提交的Offset,重復檢查程序,

  • 處理中的佇列堆頂 > 處理完的佇列堆頂:例外情況,通常是資料回放到某些中間狀態,將處理完的佇列堆頂出堆,

注意:當發生Consumer的Rebalance時,需要將對應Partition的佇列清空

KeyBy與Delay Processing的支持

因源頭的Topic和訊息格式有可能不可控制,所以MQ Consumer的職責之一是將訊息統一封裝為Event,

根據需求,會從原始訊息中拼裝出Event Key,對Key取Hash后,相同結果的Event會進入同一個佇列,可以保證磁區內的此類事件處理順序的穩定,同時將訊息的消費與處理解耦,支持增大內部佇列數量來增加吞吐,

Event中也支持設定是否延遲處理屬性,可以根據Event Time延遲固定時間后處理,需要被延遲處理的事件會被發送到有界延遲佇列中,有界延遲佇列的實作繼承了DelayQueue,限制DelayQueue長度, 達到限定值入隊會被阻塞,

例外處理

Processor在訊息處理程序中,可能遇到各種例外情況,設計框架的動機之一就是為業務邏輯的撰寫者屏蔽掉這種復雜度,Processor相關框架的邏輯會與State Manager協作,處理例外并充分暴露狀態,比較典型的例外情況以及處理策略如下:

  • 處理訊息失敗:自動觸發重試,重試到用戶設定的最大次數或默認值后會將訊息失敗狀態通知State Manager,

  • 處理訊息超時:超時對于吞吐影響較大,且通常重試的效果不明顯,因此當前策略是不會對訊息重試,直接通知State Manager 訊息處理失敗,

  • 處理訊息較慢:上游Topic存在Lag,Message Consumer消費速率大于Message Processor處理速率時,訊息會堆積在佇列中,達到佇列最大長度,Message Consumer 會被阻塞在入隊操作,停止拉取訊息,類似Flink框架中的背壓,

監控

為了方便運維,在框架層面暴露了一組監控指標,并支持用戶自定義Metrics,其中默認支持的Metrics如下表所示:

線上運維Case舉例

實際生產環境運行時,偶爾需要做些運維操作,其中最常見的是訊息堆積和訊息重放,

  1. 對于Conusmer Lag這類問題的處理步驟大致如下:

  • 查看Enqueue Time,Queue Length的監控確定服務內佇列是否有堆積,

  • 如果佇列有堆積,查看Process Time指標,確定是否是某個Processor處理慢,如果是,根據指標中的Tag 確定事件型別等屬性特征,判斷業務邏輯或者Key設定是否合理;全部Processor 處理慢,可以通過增加Processor并行度來解決,

  • 如果佇列無堆積,排除網路問題后,可以考慮增加Consumer并行度至Topic Partition 上限,

訊息重放被觸發的原因通常有兩種,要么是業務上需要重放部分資料做補全,要么是遇到了事故需要修復資料,為了應對這種需求,我們在框架層面支持了根據時間戳重置Offset的能力,具體操作時的步驟如下:

  • 使用服務測暴露的API,啟動一臺實體使用新的Consumer GroupId: {newConsumerGroup} 從某個startupTimestamp開始消費

  • 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}

  • 分批重啟所有實體

總結

為了解決位元組資料中臺DataLeap中Data Catalog系統消費近實時元資料變更的業務場景,我們自研了輕量級訊息處理框架,當前該框架已在位元組內部生產環境穩定運行超過1年,并支持了火山引擎上的資料地圖服務的元資料同步場景,滿足了我們團隊的需求,

下一步會根據優先級排期支持RocketMQ等其他訊息佇列,并持續優化配置動態更新,監控報警,運維自動化等方面,

 

立即跳轉火山引擎大資料研發治理套件DataLeap官網了解詳情

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/509201.html

標籤:大數據

上一篇:Java學習-第一部分-第三階段-第三節:MySQL基礎

下一篇:開源直播課丨高效穩定易用的資料集成框架——ChunJun類加載原理與實作

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more