主頁 > 資料庫 > 5個互聯網大廠實時數倉建設實體,例例皆經典

5個互聯網大廠實時數倉建設實體,例例皆經典

2022-03-17 07:33:47 資料庫

一、實時數倉建設背景

1. 實時需求日趨迫切

目前各大公司的產品需求和內部決策對于資料實時性的要求越來越迫切,需要實時數倉的能力來賦能,傳統離線數倉的資料時效性是 T+1,調度頻率以天為單位,無法支撐實時場景的資料需求,即使能將調度頻率設定成小時,也只能解區域分時效性要求不高的場景,對于實效性要求很高的場景還是無法優雅的支撐,因此實時使用資料的問題必須得到有效解決,

2. 實時技術日趨成熟

實時計算框架已經經歷了三代發展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟,一方面,實時任務的開發已經能通過撰寫 SQL 的方式來完成,在技術層面能很好地繼承離線數倉的架構設計思想;另一方面,在線資料開發平臺所提供的功能對實時任務開發、除錯、運維的支持也日漸趨于成熟,開發成本逐步降低,有助于去做這件事,

二、實時數倉建設目的

1. 解決傳統數倉的問題

從目前數倉建設的現狀來看,實時數倉是一個容易讓人產生混淆的概念,根據傳統經驗分析,數倉有一個重要的功能,即能夠記錄歷史,通常,數倉都是希望從業務上線的第一天開始有資料,然后一直記錄到現在,但實時流處理技術,又是強調當前處理狀態的一個技術,結合當前一線大廠的建設經驗和滴滴在該領域的建設現狀,我們嘗試把公司內實時數倉建設的目的定位為,以數倉建設理論和實時技術,解決由于當前離線數倉資料時效性低解決不了的問題,

現階段我們要建設實時數倉的主要原因是:

  • 公司業務對于資料的實時性越來越迫切,需要有實時資料來輔助完成決策;
  • 實時資料建設沒有規范,資料可用性較差,無法形成數倉體系,資源大量浪費;
  • 資料平臺工具對整體實時開發的支持也日漸趨于成熟,開發成本降低,

2. 實時數倉的應用場景

  • 實時 OLAP 分析;
  • 實時資料看板;
  • 實時業務監控;
  • 實時資料介面服務,

三、實時數倉建設方案

接下來我們分析下目前實時數倉建設比較好的幾個案例,希望這些案例能夠給大家帶來一些啟發,

1. 滴滴順風車實時數倉案例

滴滴資料團隊建設的實時數倉,基本滿足了順風車業務方在實時側的各類業務需求,初步建立起順風車實時數倉,完成了整體資料分層,包含明細資料和匯總資料,統一了 DWD 層,降低了大資料資源消耗,提高了資料復用性,可對外輸出豐富的資料服務,

數倉具體架構如下圖所示:

從資料架構圖來看,順風車實時數倉和對應的離線數倉有很多類似的地方,例如分層結構;比如 ODS 層,明細層,匯總層,乃至應用層,他們命名的模式可能都是一樣的,但仔細比較不難發現,兩者有很多區別:

  • 與離線數倉相比,實時數倉的層次更少一些:
  • 從目前建設離線數倉的經驗來看,數倉的資料明細層內容會非常豐富,處理明細資料外一般還會包含輕度匯總層的概念,另外離線數倉中應用層資料在數倉內部,但實時數倉中,app 應用層資料已經落入應用系統的存盤介質中,可以把該層與數倉的表分離;
  • 應用層少建設的好處:實時處理資料的時候,每建一個層次,資料必然會產生一定的延遲;
  • 匯總層少建的好處:在匯總統計的時候,往往為了容忍一部分資料的延遲,可能會人為的制造一些延遲來保證資料的準確,舉例,在統計跨天相關的訂單事件中的資料時,可能會等到 00:00:05 或者 00:00:10 再統計,確保 00:00 前的資料已經全部接受到位了,再進行統計,所以,匯總層的層次太多的話,就會更大的加重人為造成的資料延遲,
  • 與離線數倉相比,實時數倉的資料源存盤不同:
  • 在建設離線數倉的時候,目前滴滴內部整個離線數倉都是建立在 Hive 表之上,但是,在建設實時數倉的時候,同一份表,會使用不同的方式進行存盤,比如常見的情況下,明細資料或者匯總資料都會存在 Kafka 里面,但是像城市、渠道等維度資訊需要借助 Hbase,mysql 或者其他 KV 存盤等資料庫來進行存盤,

接下來,根據順風車實時數倉架構圖,對每一層建設做具體展開:

1) ODS 貼源層建設

根據順風車具體場景,目前順風車資料源主要包括訂單相關的 binlog 日志,冒泡和安全相關的 public 日志,流量相關的埋點日志等,這些資料部分已采集寫入 kafka 或 ddmq 等資料通道中,部分資料需要借助內部自研同步工具完成采集,最侄訓于順風車數倉 ods 層建設規范分主題統一寫入 kafka 存盤介質中,

命名規范:ODS 層實時資料源主要包括兩種,

一種是在離線采集時已經自動生產的 DDMQ 或者是 Kafka topic,這型別的資料命名方式為采集系統自動生成規范為:cn-binlog-資料庫名-資料庫名 eg:cn-binlog-ihap_fangyuan-ihap_fangyuan

一種是需要自己進行采集同步到 kafka topic 中,生產的 topic 命名規范同離線類似:ODS 層采用:realtime_ods_binlog_{源系統庫/表名}/ods_log_{日志名} eg: realtime_ods_binlog_ihap_fangyuan

2)DWD 明細層建設

根據順風車業務程序作為建模驅動,基于每個具體的業務程序特點,構建最細粒度的明細層事實表;結合順風車分析師在離線側的資料使用特點,將明細事實表的某些重要維度屬性欄位做適當冗余,完成寬表化處理,之后基于當前順風車業務方對實時資料的需求重點,重點建設交易、財務、體驗、安全、流量等幾大模塊;該層的資料來源于 ODS 層,通過大資料架構提供的 Stream SQL 完成 ETL 作業,對于 binlog 日志的處理主要進行簡單的資料清洗、處理資料漂移和資料亂序,以及可能對多個 ODS 表進行 Stream Join,對于流量日志主要是做通用的 ETL 處理和針對順風車場景的資料過濾,完成非結構化資料的結構化處理和資料的分流;該層的資料除了存盤在訊息佇列 Kafka 中,通常也會把資料實時寫入 Druid 資料庫中,供查詢明細資料和作為簡單匯總資料的加工資料源,

命名規范:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwd_{業務/pub}_{資料域縮寫}_[{業務程序縮寫}]_[{自定義表命名標簽縮寫}]

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • 樣例:realtime_dwd_trip_trd_order_base

3) DIM 層

  • 公共維度層,基于維度建模理念思想,建立整個業務程序的一致性維度,降低資料計算口徑和演算法不統一風險;
  • DIM 層資料來源于兩部分:一部分是 Flink 程式實時處理 ODS 層資料得到,另外一部分是通過離線任務出倉得到;
  • DIM 層維度資料主要使用 MySQL、Hbase、fusion(滴滴自研 KV 存盤) 三種存盤引擎,對于維表資料比較少的情況可以使用 MySQL,對于單條資料大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 存盤,降低機器記憶體資源占用,對于資料量比較大,對維表資料變化不是特別敏感的場景,可以使用 HBase 存盤,

命名規范:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字符,并且應遵循下述規則:dim_{業務/pub}_{維度定義}[_{自定義命名標簽}]:

  • {業務/pub}:參考業務命名
  • {維度定義}:參考維度命名
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • 樣例:dim_trip_dri_base

4) DWM 匯總層建設

在建設順風車實時數倉的匯總層的時候,跟順風車離線數倉有很多一樣的地方,但其具體技術實作會存在很大不同,

第一:對于一些共性指標的加工,比如 pv,uv,訂單業務程序指標等,我們會在匯總層進行統一的運算,確保關于指標的口徑是統一在一個固定的模型中完成,對于一些個性指標,從指標復用性的角度出發,確定唯一的時間欄位,同時該欄位盡可能與其他指標在時間維度上完成拉齊,例如行中例外訂單數需要與交易域指標在事件時間上做到拉齊,

第二:在順風車匯總層建設中,需要進行多維的主題匯總,因為實時數倉本身是面向主題的,可能每個主題會關心的維度都不一樣,所以需要在不同的主題下,按照這個主題關心的維度對資料進行匯總,最后來算業務方需要的匯總指標,在具體操作中,對于 pv 類指標使用 Stream SQL 實作 1 分鐘匯總指標作為最小匯總單位指標,在此基礎上進行時間維度上的指標累加;對于 uv 類指標直接使用 druid 資料庫作為指標匯總容器,根據業務方對匯總指標的及時性和準確性的要求,實作相應的精確去重和非精確去重,

第三:匯總層建設程序中,還會涉及到衍生維度的加工,在順風車券相關的匯總指標加工中我們使用 Hbase 的版本機制來構建一個衍生維度的拉鏈表,通過事件流和 Hbase 維表關聯的方式得到實時資料當時的準確維度

命名規范:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwm_{業務/pub}_{資料域縮寫}_{資料主粒度縮寫}_[{自定義表命名標簽縮寫}]_{統計時間周期范圍縮寫}:

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {資料主粒度縮寫}:指資料主要粒度或資料域的縮寫,也是聯合主鍵中的主要維度
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • {統計時間周期范圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量)
  • 樣例:realtime_dwm_trip_trd_pas_bus_accum_1min

(5)APP 應用層

該層主要的作業是把實時匯總資料寫入應用系統的資料庫中,包括用于大屏顯示和實時 OLAP 的 Druid 資料庫(該資料庫除了寫入應用資料,也可以寫入明細資料完成匯總指標的計算)中,用于實時資料介面服務的 Hbase 資料庫,用于實時資料產品的 mysql 或者 redis 資料庫中,

命名規范:基于實時數倉的特殊性不做硬性要求,

2. 快手實時數倉場景化案例

1) 目標及難點

① 目標

首先由于是做數倉,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的資料差異在 1% 以內,這是最低標準,

其次是資料延遲,其 SLA 標準是活動期間所有核心報表場景的資料延遲不能超過 5 分鐘,這 5 分鐘包括作業掛掉之后和恢復時間,如果超過則意味著 SLA 不達標,

最后是穩定性,針對一些場景,比如作業重啟后,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的例外,

②難點

第一個難點是資料量大,每天整體的入口流量資料量級大概在萬億級,在活動如春晚的場景,QPS 峰值能達到億 / 秒,

第二個難點是組件依賴比較復雜,可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存盤、RPC 介面、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常作業,

第三個難點是鏈路復雜,目前我們有 200+ 核心業務作業,50+ 核心資料源,整體作業超過 1000,

2) 實時數倉 - 分層模型

基于上面三個難點,來看一下數倉架構:

如上所示:

最下層有三個不同的資料源,分別是客戶端日志、服務端日志以及 Binlog 日志;在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細資料,另一個是 DWS 層,做公共聚合資料,DIM 是我們常說的維度,我們有一個基于離線數倉的主題預分層,這個主題預分層可能包括流量、用戶、設備、視頻的生產消費、風控、社交等,DWD 層的核心作業是標準化的清洗;DWS 層是把維度的資料和 DWD 層進行關聯,關聯之后生成一些通用粒度的聚合層次,再往上是應用層,包括一些大盤的資料,多維分析的模型以及業務專題資料;最上面是場景,整體程序可以分為三步:

第一步是做業務資料化,相當于把業務的資料接進來;第二步是資料資產化,意思是對資料做很多的清洗,然后形成一些規則有序的資料;第三步是資料業務化,可以理解資料在實時資料層面可以反哺業務,為業務資料價值建設提供一些賦能,

3) 實時數倉 - 保障措施

基于上面的分層模型,來看一下整體的保障措施:

保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障,

我們先看藍色部分的質量保障,針對質量保障,可以看到在資料源階段,做了如資料源的亂序監控,這是我們基于自己的 SDK 的采集做的,以及資料源和離線的一致性校準,研發階段的計算程序有三個階段,分別是研發階段、上線階段和服務階段,研發階段可能會提供一個標準化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監控和指標監控;在服務階段,如果出現一些例外情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體資料修復,

第二個是時效性保障,針對資料源,我們把資料源的延遲情況也納入監控,在研發階段其實還有兩個事情:首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之后,會有一些任務上線和重啟性能評估,相當于按照 CP 恢復之后,重啟的性能是什么樣子,

最后一個是穩定保障,這在大型活動中會做得比較多,比如切換演練和分級保障,我們會基于之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況,之后我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房,冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當于同樣一份邏輯在兩個機房各部署一次,以上就是我們整體的保障措施,

4)快手場景問題及解決方案

① PV/UV 標準化

  •  場景

第一個問題是 PV/UV 標準化,這里有三個截圖:

第一張圖是春晚活動的預熱場景,相當于是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖,

在活動進行程序中,我們發現 60~70% 的需求是計算頁面里的資訊,如:

這個頁面來了多少人,或者有多少人點擊進入這個頁面;

活動一共來了多少人;

頁面里的某一個掛件,獲得了多少點擊、產生了多少曝光,

  • 方案

抽象一下這個場景就是下面這種 SQL:

簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產生一些 Count 或者 Sum 操作,

基于這種場景,我們最開始的解決方案如上圖右邊所示,

我們用到了 Flink SQL 的 Early Fire 機制,從 Source 資料源取資料,之后做了 DID 的分桶,比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題,分桶之后會有一個叫做 Local Window Agg 的東西,相當于資料分完桶之后把相同型別的資料相加,Local Window Agg 之后再按照維度進行 Global Window Agg 的合桶,合桶的概念相當于按照維度計算出最終的結果,Early Fire 機制相當于在 Local Window Agg 開一個天級的視窗,然后每分鐘去對外輸出一次,

這個程序中我們遇到了一些問題,如上圖左下角所示,

在代碼正常運行的情況下是沒有問題的,但如果整體資料存在延遲或者追溯歷史資料的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候資料量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的資料,而 14:01 的那個點就被丟掉了,丟掉了以后會發生什么?

在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史資料的結果,橫坐標是分鐘,縱坐標是截止到當前時刻的頁面 UV,我們發現有些點是橫著的,意味著沒有資料結果,然后一個陡增,然后又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線,

為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的,

資料開一個大的天級視窗,大視窗下又開了一個小的分鐘級視窗,資料按資料本身的 Row Time 落到分鐘級視窗,

Watermark 推進過了視窗的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,資料本身落在真實的視窗, Watermark 推進,在視窗結束后觸發,此外,這種方式在一定程度上能夠解決亂序的問題,比如它的亂序資料本身是一個不丟棄的狀態,會記錄到最新的累計資料,最后是語意一致性,它會基于事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的,以上是 PV/UV 一個標準化的解決方案,

② DAU 計算

  • 背景介紹

下面介紹一下 DAU 計算:

我們對于整個大盤的活躍設備、新增設備和回流設備有比較多的監控,

活躍設備指的是當天來過的設備;新增設備指的是當天來過且歷史沒有來過的設備;回流設備指的是當天來過且 N 天內沒有來過的設備,但是我們計算程序之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標,

我們看一下離執行緒序中,邏輯應該怎么算,

首先我們先算活躍設備,把這些合并到一起,然后做一個維度下的天級別去重,接著再去關聯維度表,這個維度表包括設備的首末次時間,就是截止到昨天設備首次訪問和末次訪問的時間,

得到這個資訊之后,我們就可以進行邏輯計算,然后我們會發現新增和回流的設備其實是活躍設備里打的一個子標簽,新增設備就是做了一個邏輯處理,回流設備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?

其實我們最開始是這么做的,但遇到了一些問題:

第一個問題是:資料源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的程序之中都要改,單作業的穩定性會非常差;第二個問題是:資料量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 存盤,任何一個這樣的 RPC 服務介面,都不可能在萬億級資料量的場景下保證服務穩定性;第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘,整個鏈路要避免批處理,如果出現了一些任務性能的單點問題,我們還要保證高性能和可擴容,

  • 技術方案

針對以上問題,介紹一下我們是怎么做的:

如上圖的例子,第一步是對 A B C 這三個資料源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個分鐘級別去重的資料源,接著把它們 Union 到一起,然后再進行同樣的邏輯操作,

這相當于我們資料源的入口從萬億變到了百億的級別,分鐘級別去重之后再進行一個天級別的去重,產生的資料源就可以從百億變成了幾十億的級別,

在幾十億級別資料量的情況下,我們再去關聯資料服務化,這就是一種比較可行的狀態,相當于去關聯用戶畫像的 RPC 介面,得到 RPC 介面之后,最終寫入到了目標 Topic,這個目標 Topic 會匯入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等,

這個方案有三個方面的優勢,分別是穩定性、時效性和準確性,

首先是穩定性,松耦合可以簡單理解為當資料源 A 的邏輯和資料源 B 的邏輯需要修改時,可以單獨修改,第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務化后置和狀態可控,其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合,最后是準確性,我們支持資料驗證、實時監控、模型出口統一等,此時我們遇到了另外一個問題 - 亂序,對于上方三個不同的作業,每一個作業重啟至少會有兩分鐘左右的延遲,延遲會導致下游的資料源 Union 到一起就會有亂序,

  • 延遲計算方案

遇到上面這種有亂序的情況下,我們要怎么處理?

我們總共有三種處理方案:

第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”,比如同一個 did,04:01 來了一條,它會進行結果輸出,同樣的,04:02 和 04:04 也會進行結果輸出,但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出,

這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態大小是存 10 分鐘的兩倍,到后面這個狀態大小有點不太可控,因此我們又換了解決方案 2,

第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在資料源亂序的情況,在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示,04:01 來了一條資料,進行結果輸出,04:02 來了一條資料,如果是同一個 did,那么它會更新時間戳,然后仍然做結果輸出,04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的資料,它發現時間戳已經更新到 04:04,它會丟棄這條資料,這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由于我們不好解決這個問題,因此我們又想出了解決方案 3,

方案 3 是在方案 2 時間戳的基礎之上,加了一個類似于環形緩沖區,在緩沖區之內允許亂序,

比如 04:01 來了一條資料,進行結果輸出;04:02 來了一條資料,它會把時間戳更新到 04:02,并且會記錄同一個設備在 04:01 也來過,如果 04:04 再來了一條資料,就按照相應的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序,

綜合來看這三個方案:

方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀態大小在 480G 左右,這種情況雖然保證了準確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;

方案 2 是 30G 左右的狀態大小,對于亂序 0 容忍,但是資料不準確,由于我們對準確性的要求非常高,因此也放棄了這個方案;

方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果,方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3,

③ 運營場景

  •  背景介紹

運營場景可分為四個部分:

第一個是資料大屏支持,包括單直播間的分析資料和大盤的分析資料,需要做到分鐘級延遲,更新要求比較高;

第二個是直播看板支持,直播看板的資料會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;

第三個是資料策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的資料,更新要求比較低;

第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定,

下面進行分析這 4 種不同的狀態產生的一些不同的場景,

前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景,

針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,程序之中的曲線也不要求有一致性,第 4 種查詢模式更多的是單物體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高,

  •  技術方案

針對上方 4 種不同的場景,我們是如何去做的?

首先看一下基礎明細層 (圖中左方),資料源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費資訊,還有觀看 / 點贊 / 評論,經過一輪基礎清洗,然后做維度管理,上游的這些維度資訊來源于 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 存盤里邊,包括一些用戶的維度,

這些維度關聯了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級快取的操作,

如圖中上方,我們讀取 DWD 層的資料然后做基礎匯總,核心是視窗維度聚合生成 4 種不同粒度的資料,分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的資料,

如圖中下方,基于這些通用維度資料,我們再去加工個性化維度的資料,也就是 ADS 層,拿到了這些資料之后會有維度擴展,包括內容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic,

分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度,通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯,如果這兩個耦合在一起的話,會發現任務經常出問題,并且分不清楚哪個任務的職責是什么,構建不出這樣的一個穩定層,

如圖中右方,最終我們用到了三種不同的引擎,簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景,

3. 騰訊看點實時數倉案例

騰訊看點業務為什么要構建實時數倉,因為原始的上報資料量非常大,一天上報峰值就有上萬億條,而且上報格式混亂,缺乏內容維度資訊、用戶畫像資訊,下游沒辦法直接使用,而我們提供的實時數倉,是根據騰訊看點資訊流的業務場景,進行了內容維度的關聯,用戶畫像的關聯,各種粒度的聚合,下游可以非常方便的使用實時資料,

1) 方案選型

那就看下我們多維實時資料分析系統的方案選型,選型我們對比了行業內的領先方案,選擇了最符合我們業務場景的方案,

第一塊是實時數倉的選型,我們選擇的是業界比較成熟的 Lambda 架構,他的優點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線資料用兩套代碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做資料對賬的作業,如果有例外會進行告警,

第二塊是實時計算引擎選型,因為 Flink 設計之初就是為了流處理,SparkStreaming 嚴格來說還是微批處理,Strom 用的已經不多了,再看 Flink 具有 Exactly-once 的準確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎,

第三塊是實時存盤引擎,我們的要求就是需要有維度索引、支持高并發、預聚合、高性能實時多維 OLAP 查詢,可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內容,存放在同一個 Segment 上,計算全域 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 資料庫引擎 ClickHouse,

2) 設計目標與設計難點

我們多維實時資料分析系統分為三大模塊:

  • 實時計算引擎
  • 實時存盤引擎
  • App 層

難點主要在前兩個模塊:實時計算引擎和實時存盤引擎,

  • 千萬級/s 的海量資料如何實時接入,并且進行極低延遲維表關聯,
  • 實時存盤引擎如何支持高并發寫入、高可用分布式和高性能索引查詢,是比較難的,

這幾個模塊的具體實作,看一下我們系統的架構設計,

3) 架構設計

前端采用的是開源組件 Ant Design,利用了 Nginx 服務器,部署靜態頁面,并反向代理了瀏覽器的請求到后臺服務器上,

后臺服務是基于騰訊自研的 RPC 后臺服務框架寫的,并且會進行一些二級快取,

實時數倉部分,分為了接入層、實時計算層和實時數倉存盤層,

  • 接入層主要是從千萬級/s 的原始訊息佇列中,拆分出不同行為資料的微佇列,拿看點的視頻來說,拆分過后,資料就只有百萬級/s 了;
  • 實時計算層主要負責,多行行為流水資料進行行轉列,實時關聯用戶畫像資料和內容維度資料;
  • 實時數倉存盤層主要是設計出符合看點業務的,下游好用的實時訊息佇列,我們暫時提供了兩個訊息佇列,作為實時數倉的兩層,一層 DWM 層是內容 ID-用戶 ID 粒度聚合的,就是一條資料包含內容 ID-用戶 ID 還有 B 側內容資料、C 側用戶資料和用戶畫像資料;另一層是 DWS 層,是內容 ID 粒度聚合的,一條資料包含內容 ID,B 側資料和 C 側資料,可以看到內容 ID-用戶 ID 粒度的訊息佇列流量進一步減小到十萬級/s,內容 ID 粒度的更是萬級/s,并且格式更加清晰,維度資訊更加豐富,

實時存盤部分分為實時寫入層、OLAP 存盤層和后臺介面層,

  • 實時寫入層主要是負責 Hash 路由將資料寫入;
  • OLAP 存盤層利用 MPP 存盤引擎,設計符合業務的索引和物化視圖,高效存盤海量資料;
  • 后臺介面層提供高效的多維實時查詢介面,

4) 實時計算

這個系統最復雜的兩塊,實時計算和實時存盤,

先介紹實時計算部分:分為實時關聯和實時數倉,

① 實時高性能維表關聯

實時維表關聯這一塊難度在于 百萬級/s 的實時資料流,如果直接去關聯 HBase,1 分鐘的資料,關聯完 HBase 耗時是小時級的,會導致資料延遲嚴重,

我們提出了幾個解決方案:

  • 第一,在 Flink 實時計算環節,先按照 1 分鐘進行了視窗聚合,將視窗內多行行為資料轉一行多列的資料格式,經過這一步操作,原本小時級的關聯耗時下降到了十幾分鐘,但是還是不夠的,
  • 第二,在訪問 HBase 內容之前設定一層 Redis 快取,因為 1000 條資料訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍,為了防止過期的資料浪費快取,快取過期時間設定成 24 小時,同時通過監聽寫 HBase Proxy 來保證快取的一致性,這樣將訪問時間從十幾分鐘變成了秒級,
  • 第三,上報程序中會上報不少非常規內容 ID,這些內容 ID 在內容 HBase 中是不存盤的,會造成快取穿透的問題,所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止快取穿透,又減少一些時間,
  • 第四,因為設定了定時快取,會引入一個快取雪崩的問題,為了防止雪崩,我們在實時計算中,進行了削峰填谷的操作,錯開設定快取的時間,

可以看到,優化前后,資料量從百億級減少到了十億級,耗時從小時級減少到了數十秒,減少 99%,

②下游提供服務

實時數倉的難度在于:它處于比較新的領域,并且各個公司各個業務差距比較大,怎么能設計出方便,好用,符合看點業務場景的實時數倉是有難度的,

先看一下實時數倉做了什么,實時數倉對外就是幾個訊息佇列,不同的訊息佇列里面存放的就是不同聚合粒度的實時資料,包括內容 ID、用戶 ID、C 側行為資料、B 側內容維度資料和用戶畫像資料等,

我們是怎么搭建實時數倉的,就是上面介紹的實時計算引擎的輸出,放到訊息佇列中保存,可以提供給下游多用戶復用,

我們可以看下,在我們建設實時資料倉庫前后,開發一個實時應用的區別,沒有數倉的時候,我們需要消費千萬級/s 的原始佇列,進行復雜的資料清洗,然后再進行用戶畫像關聯、內容維度關聯,才能拿到符合要求格式的實時資料,開發和擴展的成本都會比較高,如果想開發一個新的應用,又要走一遍這個流程,有了數倉之后,如果想開發內容 ID 粒度的實時應用,就直接申請 TPS 萬級/s 的 DWS 層的訊息佇列,開發成本變低很多,資源消耗小很多,可擴展性也強很多,

看個實際例子,開發我們系統的實時資料大屏,原本需要進行如上所有操作,才能拿到資料,現在只需要消費 DWS 層訊息佇列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 記憶體,

可以看到,以 50 個消費者為例,建立實時數倉前后,下游開發一個實時應用,可以減少 98%的資源消耗,包括計算資源,存盤資源,人力成本和開發人員學習接入成本等等,并且消費者越多,節省越多,就拿 Redis 存盤這一部分來說,一個月就能省下上百萬人民幣,

5) 實時存盤

介紹完實時計算,再來介紹實時存盤,

這塊分為四個部分來介紹:

  • 分布式-高可用
  • 海量資料-寫入
  • 高性能-查詢
  • 擴容

① 分布式-高可用

我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實作高可用的方案,資料寫入一個分片,僅寫入一個副本,然后再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取資料,保證資料一致性,

這里沒有選用訊息佇列進行資料同步,是因為 ZK 更加輕量級,而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的資料,而且就算其它節點第一次來獲取資料失敗了,后面只要發現它跟 ZK 上記錄的資料不一致,就會再次嘗試獲取資料,保證一致性,

②海量資料-寫入

資料寫入遇到的第一個問題是,海量資料直接寫入 Clickhouse 的話,會導致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入,Batch 設定多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游記憶體壓力太大,通過實驗,最終我們選用了大小幾十萬的 Batch,

第二個問題是,隨著資料量的增長,單 QQ 看點的視頻內容每天可能寫入百億級的資料,默認方案是寫一張分布式表,這就會造成單臺機器出現磁盤的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似于 HBase、RocketsDB 的底層 LSM-Tree,在合并的程序中會存在寫放大的問題,加重磁盤壓力,峰值每分鐘幾千萬條資料,寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢,我們做的兩個優化方案:一是對磁盤做 Raid,提升磁盤的 IO;二是在寫入之前進行分表,直接分開寫入到不同的分片上,磁盤壓力直接變為 1/N,

第三個問題是,雖然我們寫入按照分片進行了劃分,但是這里引入了一個分布式系統常見的問題,就是區域的 Top 并非全域 Top 的問題,比如同一個內容 ID 的資料落在了不同的分片上,計算全域 Top100 閱讀的內容 ID,有一個內容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導致匯總的時候,會丟失一部分資料,影響最終結果,我們做的優化是在寫入之前加上一層路由,將同一個內容 ID 的記錄,全部路由到同一個分片上,解決了該問題,

介紹完寫入,下一步介紹 Clickhouse 的高性能存盤和查詢,

③ 高性能-存盤-查詢

Clickhouse 高性能查詢的一個關鍵點是稀疏索引,稀疏索引這個設計就很有講究,設計得好可以加速查詢,設計不好反而會影響查詢效率,我根據我們的業務場景,因為我們的查詢大部分都是時間和內容 ID 相關的,比如說,某個內容,過去 N 分鐘在各個人群表現如何?我按照日期,分鐘粒度時間和內容 ID 建立了稀疏索引,針對某個內容的查詢,建立稀疏索引之后,可以減少 99%的檔案掃描,

還有一個問題就是,我們現在資料量太大,維度太多,拿 QQ 看點的視頻內容來說,一天流水有上百億條,有些維度有幾百個類別,如果一次性把所有維度進行預聚合,資料量會指數膨脹,查詢反而變慢,并且會占用大量記憶體空間,我們的優化,針對不同的維度,建立對應的預聚合物化視圖,用空間換時間,這樣可以縮短查詢的時間,

分布式表查詢還會有一個問題,查詢單個內容 ID 的資訊,分布式表會將查詢下發到所有的分片上,然后再回傳查詢結果進行匯總,實際上,因為做過路由,一個內容 ID 只存在于一個分片上,剩下的分片都在空跑,針對這類查詢,我們的優化是后臺按照同樣的規則先進行路由,直接查詢目標分片,這樣減少了 N-1/N 的負載,可以大量縮短查詢時間,而且由于我們是提供的 OLAP 查詢,資料滿足最終一致性即可,通過主從副本讀寫分離,可以進一步提升性能,

我們在后臺還做了一個 1 分鐘的資料快取,針對相同條件查詢,后臺就直接回傳了,

④ 擴容

這里再介紹一下我們的擴容的方案,調研了業內的一些常見方案,

比如 HBase,原始資料都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始資料的遷移,但是 Clickhouse 的每個分片資料都是在本地,是一個比較底層存盤引擎,不能像 HBase 那樣方便擴容,

Redis 是哈希槽這種類似一致性哈希的方式,是比較經典分布式快取的方案,Redis slot 在 Rehash 的程序中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最后再洗掉 h[0],但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由于列式存盤,不支持洗掉的特性,一致性哈希的方案不是很適合,

目前擴容的方案是,另外消費一份資料,寫入新 Clickhouse 集群,兩個集群一起跑一段時間,因為實時資料就保存 3 天,等 3 天之后,后臺服務直接訪問新集群,

4. 有贊實時數倉案例

1) 分層設計

傳統離線數倉的分層設計大家都很熟悉,為了規范的組織和管理資料,層級劃分會比較多,在一些復雜邏輯處理場景還會引入臨時層落地中間結果以方便下游加工處理,實時數倉考慮到時效性問題,分層設計需要盡量精簡,降低中間流程出錯的可能性,不過總體而言,實時數倉還是會參考離線數倉的分層思想來設計,

實時數倉分層架構如下圖所示 :

① ODS(實時資料接入層)

ODS 層,即實時資料接入層,通過資料采集工具收集各個業務系統的實時資料,對非結構化的資料進行結構化處理,保存原始資料,幾乎不過濾資料;該層資料的主要來源有三個部分:第一部分是業務方創建的 NSQ 訊息,第二部分是業務資料庫的 Binlog 日志,第三部分是埋點日志和應用程式日志,以上三部分的實時資料最終統一寫入 Kafka 存盤介質中,

ODS 層表命名規范:部門名稱.應用名稱.數倉層級主題域前綴資料庫名/訊息名

例如:接入業務庫的 Binlog

實時數倉表命名:deptname.appname.ods_subjectname_tablename

例如:接入業務方的 NSQ 訊息

實時數倉表命名:deptname.appname.ods_subjectname_msgname

② DWS(實時明細中間層)

DWS 層,即實時明細中間層,該層以業務程序作為建模驅動,基于每個具體的業務程序事件來構建最細粒度的明細層事實表;比如交易程序,有下單事件、支付事件、發貨事件等,我們會基于這些獨立的事件來進行明細層的構建,在這層,事實明細資料同樣是按照離線數倉的主題域來進行劃分,也會采用維度建模的方式組織資料,對于一些重要的維度欄位,會做適當冗余,基于有贊實時需求的場景,重點建設交易、營銷、客戶、店鋪、商品等主題域的資料,該層的資料來源于 ODS 層,通過 FlinkSQL 進行 ETL 處理,主要作業有規范命名、資料清洗、維度補全、多流關聯,最終統一寫入 Kafka 存盤介質中,

DWS 層表命名規范:部門名稱.應用名稱.數倉層級_主題域前綴_數倉表命名

例如:實時事件 A 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameA

例如:實時事件 B 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameB

③ DIM(實時維表層)

DIM 層,即實時維表層,用來存放維度資料,主要用于實時明細中間層寬化處理時補全維度使用,目前該層的資料主要存盤于 HBase 中,后續會基于 QPS 和資料量大小提供更多合適型別的存盤介質,

DIM 層表命名規范:應用名稱_數倉層級_主題域前綴_數倉表命名

例如:HBase 存盤,實時維度表

實時數倉表命名:appname_dim_tablename

④ DWA(實時匯總層)

DWA 層,即實時匯總層,該層通過 DWS 層資料進行多維匯總,提供給下游業務方使用,在實際應用程序中,不同業務方使用維度匯總的方式不太一樣,根據不同的需求采用不同的技術方案去實作,第一種方式,采用 FlinkSQL 進行實時匯總,將結果指標存入 HBase、MySQL 等資料庫,該種方式是我們早期采用的方案,優點是實作業務邏輯比較靈活,缺點是聚合粒度固化,不易擴展;第二種方式,采用實時 OLAP 工具進行匯總,該種方式是我們目前常用的方案,優點是聚合粒度易擴展,缺點是業務邏輯需要在中間層預處理,

DWA 層表命名規范:應用名稱_數倉層級_主題域前綴_聚合粒度_資料范圍

例如:HBase 存盤,某域當日某粒度實時匯總表

實時數倉表命名:appname_dwa_subjectname_aggname_daily

⑤ APP(實時應用層)

APP 層,即實時應用層,該層資料已經寫入應用系統的存盤中,例如寫入 Druid 作為 BI 看板的實時資料集;寫入 HBase、MySQL 用于提供統一資料服務介面;寫入 ClickHouse 用于提供實時 OLAP 服務,因為該層非常貼近業務,在命名規范上實時數倉不做統一要求,

2) 實時 ETL

實時數倉 ETL 處理程序所涉及的組件比較多,接下來盤點構建實時數倉所需要的組件以及每個組件的應用場景,如下圖所示:

具體實時 ETL 處理流程如下圖所示:

① 維度補全

創建呼叫 Duboo 介面的 UDF 函式在實時流里補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 介面壓力會過大,在實際應用場景補全維度首選還是關聯維度表,但關聯也存在一定概率的丟失問題,為了彌補這種丟失,可以采用 Duboo 介面呼叫兜底的方式來補全,偽代碼如下:

create function call_dubbo as 'XXXXXXX';

create function get_json_object as 'XXXXXXX';

case

when cast( b.column as bigint) is not null

then cast( b.column as bigint)

else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'

,'serviceName'

,'methodName'

,cast(concat('[',cast(a.column as varchar),']') as varchar)

,'key'

,'rootId')

as bigint)

,a.column)

as bigint) end

② 冪等處理

實時任務在運行程序中難免會遇到執行例外的情況,當任務例外重啟的時候會導致部分訊息重新發送和消費,從而引發下游實時統計資料不準確,為了有效避免這種情況,可以選擇對實時訊息流做冪等處理,當消費完一條訊息,將這條訊息的 Key 存入 KV,如果任務例外重啟導致訊息重新發送的時候,先從 KV 判斷該訊息是否已被消費,如果已消費就不再往下發送,偽代碼如下:

create function idempotenc as 'XXXXXXX';

insert into table

select

order_no

from

select

a.orderNo as order_no

, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid

from

table1

) t

where

t.rid = 0;

③ 資料驗證

由于實時數倉的資料是無邊界的流,相比于離線數倉固定不變的資料更難驗收,基于不同的場景,我們提供了 2 種驗證方式,分別是:抽樣驗證與全量驗證,如圖 3.3 所示

  • 抽樣驗證方案

該方案主要應用在資料準確性驗證上,實時匯總結果是基于存盤在 Kafka 的實時明細中間層計算而來,但 Kafka 本身不支持按照特定條件檢索,不支持寫查詢陳述句,再加上訊息的無邊界性,統計結果是在不斷變化的,很難尋找參照物進行比對,鑒于此,我們采用了持久化訊息的方法,將訊息落盤到 TiDB 存盤,基于 TiDB 的能力對落盤的訊息進行檢索、查詢、匯總,撰寫固定時間邊界的測驗用例與相同時間邊界的業務庫資料或者離線數倉資料進行比對,通過以上方式,抽樣核心店鋪的資料進行指標準確性驗證,確保測驗用例全部通過,

  • 全量驗證方案

該方案主要應用在資料完整性和一致性驗證上,在實時維度表驗證的場景使用最多,大體思路:將存盤實時維度表的在線 HBase 集群中的資料同步到離線 HBase 集群中,再將離線 HBase 集群中的資料匯入到 Hive 中,在限定實時維度表的時間邊界后,通過資料平臺提供的資料校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的資料完全一致,

④ 資料恢復

實時任務一旦上線就要求持續不斷的提供準確、穩定的服務,區別于離線任務按天調度,如果離線任務出現 Bug,會有充足的時間去修復,如果實時任務出現 Bug,必須按照提前制定好的流程,嚴格按照步驟執行,否則極易出現問題,造成 Bug 的情況有非常多,比如代碼 Bug、例外資料 Bug、實時集群 Bug,如下圖展示了修復實時任務 Bug 并恢復資料的流程,

5. 騰訊全場景實時數倉建設案例

在數倉體系中會有各種各樣的大資料組件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據不同的需求,用戶會構建大資料存盤和處理平臺,資料在平臺經過處理和分析,結果資料會保存到 MySQL、Elasticsearch 等支持快速查詢的關系型、非關系型資料庫中,接下來應用層就可以基于這些資料進行 BI 報表開發、用戶畫像,或基于 Presto 這種 OLAP 工具進行互動式查詢等,

1) Lambda 架構的痛點

在整個程序中我們常常會用一些離線的調度系統,定期的(T+1 或者每隔幾小時)去執行一些 Spark 分析任務,做一些資料的輸入、輸出或是 ETL 作業,離線資料處理的整個程序中必然存在資料延遲的現象,不管是資料接入還是中間的分析,資料的延遲都是比較大的,可能是小時級也有可能是天級別的,另外一些場景中我們也常常會為了一些實時性的需求去構建一個實時處理程序,比如借助 Flink+Kafka 去構建實時的流處理系統,

整體上,數倉架構中有非常多的組件,大大增加了整個架構的復雜性和運維的成本,

如下圖,這是很多公司之前或者現在正在采用的 Lambda 架構,Lambda 架構將數倉分為離線層和實時層,相應的就有批處理和流處理兩個相互獨立的資料處理流程,同一份資料會被處理兩次以上,同一套業務邏輯代碼需要適配性的開發兩次,Lambda 架構大家應該已經非常熟悉了,下面我就著重介紹一下我們采用 Lambda 架構在數倉建設程序中遇到的一些痛點問題,

例如在實時計算一些用戶相關指標的實時場景下,我們想看到當前 pv、uv 時,我們會將這些資料放到實時層去做一些計算,這些指標的值就會實時呈現出來,但同時想了解用戶的一個增長趨勢,需要把過去一天的資料計算出來,這樣就需要通過批處理的調度任務來實作,比如凌晨兩三點的時候在調度系統上起一個 Spark 調度任務把當天所有的資料重新跑一遍,

很顯然在這個程序中,由于兩個程序運行的時間是不一樣的,跑的資料卻相同,因此可能造成資料的不一致,因為某一潭訓幾條資料的更新,需要重新跑一遍整個離線分析的鏈路,資料更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發流程和運維成本其實都是非常高的,

為了解決 Lambda 架構帶來的各種問題,就誕生了 Kappa 架構,這個架構大家應該也非常的熟悉,

2) Kappa 架構的痛點

我們來講一下 Kappa 架構,如下圖,它中間其實用的是訊息佇列,通過用 Flink 將整個鏈路串聯起來,Kappa 架構解決了 Lambda 架構中離線處理層和實時處理層之間由于引擎不一樣,導致的運維成本和開發成本高昂的問題,但 Kappa 架構也有其痛點,

首先,在構建實時業務場景時,會用到 Kappa 去構建一個近實時的場景,但如果想對數倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進一步的資料處理時,如將資料寫到 DWD 層的 Kafka,則需要另外接入 Flink,同時,當需要從 DWD 層的 Kafka 把資料再匯入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做進一步的分析時,顯然就增加了整個架構的復雜性,

其次,Kappa 架構是強烈依賴訊息佇列的,我們知道訊息佇列本身在整個鏈路上資料計算的準確性是嚴格依賴它上游資料的順序,訊息佇列接的越多,發生亂序的可能性就越大,ODS 層資料一般是絕對準確的,把 ODS 層的資料發送到下一個 kafka 的時候就有可能發生亂序,DWD 層再發到 DWS 的時候可能又亂序了,這樣資料不一致性就會變得很嚴重,

第三,Kafka 由于它是一個順序存盤的系統,順序存盤系統是沒有辦法直接在其上面利用 OLAP 分析的一些優化策略,例如謂詞下推這類的優化策略,在順序存盤的 Kafka 上來實作是比較困難的事情,

那么有沒有這樣一個架構,既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發的成本,解決通過訊息佇列構建 Kappa 架構程序中遇到的一些痛點?答案是肯定的,后面的篇幅會詳細論述,

3) 痛點總結

4) Flink+Iceberg 構建實時數倉

① 近實時的資料接入

前面介紹了 Iceberg 既支持讀寫分離,又支持并發讀、增量讀、小檔案合并,還可以支持秒級到分鐘級的延遲,基于這些優勢我們嘗試采用 Iceberg 這些功能來構建基于 Flink 的實時全鏈路批流一體化的實時數倉架構,

如下圖所示,Iceberg 每次的 commit 操作,都是對資料的可見性的改變,比如說讓資料從不可見變成可見,在這個程序中,就可以實作近實時的資料記錄,

② 實時數倉 - 資料湖分析系統

此前需要先進行資料接入,比如用 Spark 的離線調度任務去跑一些資料,拉取,抽取最后再寫入到 Hive 表里面,這個程序的延時比較大,有了 Iceberg 的表結構,可以中間使用 Flink,或者 spark streaming,完成近實時的資料接入,

基于以上功能,我們再來回顧一下前面討論的 Kappa 架構,Kappa 架構的痛點上面已經描述過,Iceberg 既然能夠作為一個優秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?

Iceberg 底層依賴的存盤是像 HDFS 或 S3 這樣的廉價存盤,而且 Iceberg 是支持 parquet、orc、Avro 這樣的列式存盤,有列式存盤的支持,就可以對 OLAP 分析進行基本的優化,在中間層直接進行計算,例如謂詞下推最基本的 OLAP 優化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務天級別到小時級別的延遲大大的降低,改造成一個近實時的資料湖分析系統,

在中間處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支持 Streaming read,所以在系統的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務,把中間結果做進一步計算后輸出到下游,

總的來說,Iceberg 替換 Kafka 的優勢主要包括:

優勢:

  • 實作存盤層的流批統一;
  • 中間層支持 OLAP 分析;
  • 完美支持高效回溯;
  • 存盤成本降低,

劣勢:

  • 資料延遲從實時變成近實時;
  • 對接其他資料系統需要額外開發作業,

秒級分析 - 資料湖加速:

由于 Iceberg 本身是將資料檔案全部存盤在 HDFS 上的,HDFS 讀寫這塊對于秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支持 Alluxio 這樣一個快取,借助于快取的能力可以實作資料湖的加速,這塊的架構也在我們未來的一個規劃和建設中,

作者丨五分鐘學大資料

一、實時數倉建設背景

1. 實時需求日趨迫切

目前各大公司的產品需求和內部決策對于資料實時性的要求越來越迫切,需要實時數倉的能力來賦能,傳統離線數倉的資料時效性是 T+1,調度頻率以天為單位,無法支撐實時場景的資料需求,即使能將調度頻率設定成小時,也只能解區域分時效性要求不高的場景,對于實效性要求很高的場景還是無法優雅的支撐,因此實時使用資料的問題必須得到有效解決,

2. 實時技術日趨成熟

實時計算框架已經經歷了三代發展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟,一方面,實時任務的開發已經能通過撰寫 SQL 的方式來完成,在技術層面能很好地繼承離線數倉的架構設計思想;另一方面,在線資料開發平臺所提供的功能對實時任務開發、除錯、運維的支持也日漸趨于成熟,開發成本逐步降低,有助于去做這件事,

二、實時數倉建設目的

1. 解決傳統數倉的問題

從目前數倉建設的現狀來看,實時數倉是一個容易讓人產生混淆的概念,根據傳統經驗分析,數倉有一個重要的功能,即能夠記錄歷史,通常,數倉都是希望從業務上線的第一天開始有資料,然后一直記錄到現在,但實時流處理技術,又是強調當前處理狀態的一個技術,結合當前一線大廠的建設經驗和滴滴在該領域的建設現狀,我們嘗試把公司內實時數倉建設的目的定位為,以數倉建設理論和實時技術,解決由于當前離線數倉資料時效性低解決不了的問題,

現階段我們要建設實時數倉的主要原因是:

  • 公司業務對于資料的實時性越來越迫切,需要有實時資料來輔助完成決策;
  • 實時資料建設沒有規范,資料可用性較差,無法形成數倉體系,資源大量浪費;
  • 資料平臺工具對整體實時開發的支持也日漸趨于成熟,開發成本降低,

2. 實時數倉的應用場景

  • 實時 OLAP 分析;
  • 實時資料看板;
  • 實時業務監控;
  • 實時資料介面服務,

三、實時數倉建設方案

接下來我們分析下目前實時數倉建設比較好的幾個案例,希望這些案例能夠給大家帶來一些啟發,

1. 滴滴順風車實時數倉案例

滴滴資料團隊建設的實時數倉,基本滿足了順風車業務方在實時側的各類業務需求,初步建立起順風車實時數倉,完成了整體資料分層,包含明細資料和匯總資料,統一了 DWD 層,降低了大資料資源消耗,提高了資料復用性,可對外輸出豐富的資料服務,

數倉具體架構如下圖所示:

從資料架構圖來看,順風車實時數倉和對應的離線數倉有很多類似的地方,例如分層結構;比如 ODS 層,明細層,匯總層,乃至應用層,他們命名的模式可能都是一樣的,但仔細比較不難發現,兩者有很多區別:

  • 與離線數倉相比,實時數倉的層次更少一些:
  • 從目前建設離線數倉的經驗來看,數倉的資料明細層內容會非常豐富,處理明細資料外一般還會包含輕度匯總層的概念,另外離線數倉中應用層資料在數倉內部,但實時數倉中,app 應用層資料已經落入應用系統的存盤介質中,可以把該層與數倉的表分離;
  • 應用層少建設的好處:實時處理資料的時候,每建一個層次,資料必然會產生一定的延遲;
  • 匯總層少建的好處:在匯總統計的時候,往往為了容忍一部分資料的延遲,可能會人為的制造一些延遲來保證資料的準確,舉例,在統計跨天相關的訂單事件中的資料時,可能會等到 00:00:05 或者 00:00:10 再統計,確保 00:00 前的資料已經全部接受到位了,再進行統計,所以,匯總層的層次太多的話,就會更大的加重人為造成的資料延遲,
  • 與離線數倉相比,實時數倉的資料源存盤不同:
  • 在建設離線數倉的時候,目前滴滴內部整個離線數倉都是建立在 Hive 表之上,但是,在建設實時數倉的時候,同一份表,會使用不同的方式進行存盤,比如常見的情況下,明細資料或者匯總資料都會存在 Kafka 里面,但是像城市、渠道等維度資訊需要借助 Hbase,mysql 或者其他 KV 存盤等資料庫來進行存盤,

接下來,根據順風車實時數倉架構圖,對每一層建設做具體展開:

1) ODS 貼源層建設

根據順風車具體場景,目前順風車資料源主要包括訂單相關的 binlog 日志,冒泡和安全相關的 public 日志,流量相關的埋點日志等,這些資料部分已采集寫入 kafka 或 ddmq 等資料通道中,部分資料需要借助內部自研同步工具完成采集,最侄訓于順風車數倉 ods 層建設規范分主題統一寫入 kafka 存盤介質中,

命名規范:ODS 層實時資料源主要包括兩種,

一種是在離線采集時已經自動生產的 DDMQ 或者是 Kafka topic,這型別的資料命名方式為采集系統自動生成規范為:cn-binlog-資料庫名-資料庫名 eg:cn-binlog-ihap_fangyuan-ihap_fangyuan

一種是需要自己進行采集同步到 kafka topic 中,生產的 topic 命名規范同離線類似:ODS 層采用:realtime_ods_binlog_{源系統庫/表名}/ods_log_{日志名} eg: realtime_ods_binlog_ihap_fangyuan

2)DWD 明細層建設

根據順風車業務程序作為建模驅動,基于每個具體的業務程序特點,構建最細粒度的明細層事實表;結合順風車分析師在離線側的資料使用特點,將明細事實表的某些重要維度屬性欄位做適當冗余,完成寬表化處理,之后基于當前順風車業務方對實時資料的需求重點,重點建設交易、財務、體驗、安全、流量等幾大模塊;該層的資料來源于 ODS 層,通過大資料架構提供的 Stream SQL 完成 ETL 作業,對于 binlog 日志的處理主要進行簡單的資料清洗、處理資料漂移和資料亂序,以及可能對多個 ODS 表進行 Stream Join,對于流量日志主要是做通用的 ETL 處理和針對順風車場景的資料過濾,完成非結構化資料的結構化處理和資料的分流;該層的資料除了存盤在訊息佇列 Kafka 中,通常也會把資料實時寫入 Druid 資料庫中,供查詢明細資料和作為簡單匯總資料的加工資料源,

命名規范:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwd_{業務/pub}_{資料域縮寫}_[{業務程序縮寫}]_[{自定義表命名標簽縮寫}]

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • 樣例:realtime_dwd_trip_trd_order_base

3) DIM 層

  • 公共維度層,基于維度建模理念思想,建立整個業務程序的一致性維度,降低資料計算口徑和演算法不統一風險;
  • DIM 層資料來源于兩部分:一部分是 Flink 程式實時處理 ODS 層資料得到,另外一部分是通過離線任務出倉得到;
  • DIM 層維度資料主要使用 MySQL、Hbase、fusion(滴滴自研 KV 存盤) 三種存盤引擎,對于維表資料比較少的情況可以使用 MySQL,對于單條資料大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 存盤,降低機器記憶體資源占用,對于資料量比較大,對維表資料變化不是特別敏感的場景,可以使用 HBase 存盤,

命名規范:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字符,并且應遵循下述規則:dim_{業務/pub}_{維度定義}[_{自定義命名標簽}]:

  • {業務/pub}:參考業務命名
  • {維度定義}:參考維度命名
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • 樣例:dim_trip_dri_base

4) DWM 匯總層建設

在建設順風車實時數倉的匯總層的時候,跟順風車離線數倉有很多一樣的地方,但其具體技術實作會存在很大不同,

第一:對于一些共性指標的加工,比如 pv,uv,訂單業務程序指標等,我們會在匯總層進行統一的運算,確保關于指標的口徑是統一在一個固定的模型中完成,對于一些個性指標,從指標復用性的角度出發,確定唯一的時間欄位,同時該欄位盡可能與其他指標在時間維度上完成拉齊,例如行中例外訂單數需要與交易域指標在事件時間上做到拉齊,

第二:在順風車匯總層建設中,需要進行多維的主題匯總,因為實時數倉本身是面向主題的,可能每個主題會關心的維度都不一樣,所以需要在不同的主題下,按照這個主題關心的維度對資料進行匯總,最后來算業務方需要的匯總指標,在具體操作中,對于 pv 類指標使用 Stream SQL 實作 1 分鐘匯總指標作為最小匯總單位指標,在此基礎上進行時間維度上的指標累加;對于 uv 類指標直接使用 druid 資料庫作為指標匯總容器,根據業務方對匯總指標的及時性和準確性的要求,實作相應的精確去重和非精確去重,

第三:匯總層建設程序中,還會涉及到衍生維度的加工,在順風車券相關的匯總指標加工中我們使用 Hbase 的版本機制來構建一個衍生維度的拉鏈表,通過事件流和 Hbase 維表關聯的方式得到實時資料當時的準確維度

命名規范:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwm_{業務/pub}_{資料域縮寫}_{資料主粒度縮寫}_[{自定義表命名標簽縮寫}]_{統計時間周期范圍縮寫}:

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {資料主粒度縮寫}:指資料主要粒度或資料域的縮寫,也是聯合主鍵中的主要維度
  • {自定義表命名標簽縮寫}:物體名稱可以根據資料倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述物體所代表的業務含義
  • {統計時間周期范圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量)
  • 樣例:realtime_dwm_trip_trd_pas_bus_accum_1min

(5)APP 應用層

該層主要的作業是把實時匯總資料寫入應用系統的資料庫中,包括用于大屏顯示和實時 OLAP 的 Druid 資料庫(該資料庫除了寫入應用資料,也可以寫入明細資料完成匯總指標的計算)中,用于實時資料介面服務的 Hbase 資料庫,用于實時資料產品的 mysql 或者 redis 資料庫中,

命名規范:基于實時數倉的特殊性不做硬性要求,

2. 快手實時數倉場景化案例

1) 目標及難點

① 目標

首先由于是做數倉,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的資料差異在 1% 以內,這是最低標準,

其次是資料延遲,其 SLA 標準是活動期間所有核心報表場景的資料延遲不能超過 5 分鐘,這 5 分鐘包括作業掛掉之后和恢復時間,如果超過則意味著 SLA 不達標,

最后是穩定性,針對一些場景,比如作業重啟后,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的例外,

②難點

第一個難點是資料量大,每天整體的入口流量資料量級大概在萬億級,在活動如春晚的場景,QPS 峰值能達到億 / 秒,

第二個難點是組件依賴比較復雜,可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存盤、RPC 介面、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常作業,

第三個難點是鏈路復雜,目前我們有 200+ 核心業務作業,50+ 核心資料源,整體作業超過 1000,

2) 實時數倉 - 分層模型

基于上面三個難點,來看一下數倉架構:

如上所示:

最下層有三個不同的資料源,分別是客戶端日志、服務端日志以及 Binlog 日志;在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細資料,另一個是 DWS 層,做公共聚合資料,DIM 是我們常說的維度,我們有一個基于離線數倉的主題預分層,這個主題預分層可能包括流量、用戶、設備、視頻的生產消費、風控、社交等,DWD 層的核心作業是標準化的清洗;DWS 層是把維度的資料和 DWD 層進行關聯,關聯之后生成一些通用粒度的聚合層次,再往上是應用層,包括一些大盤的資料,多維分析的模型以及業務專題資料;最上面是場景,整體程序可以分為三步:

第一步是做業務資料化,相當于把業務的資料接進來;第二步是資料資產化,意思是對資料做很多的清洗,然后形成一些規則有序的資料;第三步是資料業務化,可以理解資料在實時資料層面可以反哺業務,為業務資料價值建設提供一些賦能,

3) 實時數倉 - 保障措施

基于上面的分層模型,來看一下整體的保障措施:

保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障,

我們先看藍色部分的質量保障,針對質量保障,可以看到在資料源階段,做了如資料源的亂序監控,這是我們基于自己的 SDK 的采集做的,以及資料源和離線的一致性校準,研發階段的計算程序有三個階段,分別是研發階段、上線階段和服務階段,研發階段可能會提供一個標準化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監控和指標監控;在服務階段,如果出現一些例外情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體資料修復,

第二個是時效性保障,針對資料源,我們把資料源的延遲情況也納入監控,在研發階段其實還有兩個事情:首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之后,會有一些任務上線和重啟性能評估,相當于按照 CP 恢復之后,重啟的性能是什么樣子,

最后一個是穩定保障,這在大型活動中會做得比較多,比如切換演練和分級保障,我們會基于之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況,之后我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房,冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當于同樣一份邏輯在兩個機房各部署一次,以上就是我們整體的保障措施,

4)快手場景問題及解決方案

① PV/UV 標準化

  •  場景

第一個問題是 PV/UV 標準化,這里有三個截圖:

第一張圖是春晚活動的預熱場景,相當于是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖,

在活動進行程序中,我們發現 60~70% 的需求是計算頁面里的資訊,如:

這個頁面來了多少人,或者有多少人點擊進入這個頁面;

活動一共來了多少人;

頁面里的某一個掛件,獲得了多少點擊、產生了多少曝光,

  • 方案

抽象一下這個場景就是下面這種 SQL:

簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產生一些 Count 或者 Sum 操作,

基于這種場景,我們最開始的解決方案如上圖右邊所示,

我們用到了 Flink SQL 的 Early Fire 機制,從 Source 資料源取資料,之后做了 DID 的分桶,比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題,分桶之后會有一個叫做 Local Window Agg 的東西,相當于資料分完桶之后把相同型別的資料相加,Local Window Agg 之后再按照維度進行 Global Window Agg 的合桶,合桶的概念相當于按照維度計算出最終的結果,Early Fire 機制相當于在 Local Window Agg 開一個天級的視窗,然后每分鐘去對外輸出一次,

這個程序中我們遇到了一些問題,如上圖左下角所示,

在代碼正常運行的情況下是沒有問題的,但如果整體資料存在延遲或者追溯歷史資料的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候資料量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的資料,而 14:01 的那個點就被丟掉了,丟掉了以后會發生什么?

在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史資料的結果,橫坐標是分鐘,縱坐標是截止到當前時刻的頁面 UV,我們發現有些點是橫著的,意味著沒有資料結果,然后一個陡增,然后又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線,

為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的,

資料開一個大的天級視窗,大視窗下又開了一個小的分鐘級視窗,資料按資料本身的 Row Time 落到分鐘級視窗,

Watermark 推進過了視窗的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,資料本身落在真實的視窗, Watermark 推進,在視窗結束后觸發,此外,這種方式在一定程度上能夠解決亂序的問題,比如它的亂序資料本身是一個不丟棄的狀態,會記錄到最新的累計資料,最后是語意一致性,它會基于事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的,以上是 PV/UV 一個標準化的解決方案,

② DAU 計算

  • 背景介紹

下面介紹一下 DAU 計算:

我們對于整個大盤的活躍設備、新增設備和回流設備有比較多的監控,

活躍設備指的是當天來過的設備;新增設備指的是當天來過且歷史沒有來過的設備;回流設備指的是當天來過且 N 天內沒有來過的設備,但是我們計算程序之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標,

我們看一下離執行緒序中,邏輯應該怎么算,

首先我們先算活躍設備,把這些合并到一起,然后做一個維度下的天級別去重,接著再去關聯維度表,這個維度表包括設備的首末次時間,就是截止到昨天設備首次訪問和末次訪問的時間,

得到這個資訊之后,我們就可以進行邏輯計算,然后我們會發現新增和回流的設備其實是活躍設備里打的一個子標簽,新增設備就是做了一個邏輯處理,回流設備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?

其實我們最開始是這么做的,但遇到了一些問題:

第一個問題是:資料源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的程序之中都要改,單作業的穩定性會非常差;第二個問題是:資料量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 存盤,任何一個這樣的 RPC 服務介面,都不可能在萬億級資料量的場景下保證服務穩定性;第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘,整個鏈路要避免批處理,如果出現了一些任務性能的單點問題,我們還要保證高性能和可擴容,

  • 技術方案

針對以上問題,介紹一下我們是怎么做的:

如上圖的例子,第一步是對 A B C 這三個資料源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個分鐘級別去重的資料源,接著把它們 Union 到一起,然后再進行同樣的邏輯操作,

這相當于我們資料源的入口從萬億變到了百億的級別,分鐘級別去重之后再進行一個天級別的去重,產生的資料源就可以從百億變成了幾十億的級別,

在幾十億級別資料量的情況下,我們再去關聯資料服務化,這就是一種比較可行的狀態,相當于去關聯用戶畫像的 RPC 介面,得到 RPC 介面之后,最終寫入到了目標 Topic,這個目標 Topic 會匯入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等,

這個方案有三個方面的優勢,分別是穩定性、時效性和準確性,

首先是穩定性,松耦合可以簡單理解為當資料源 A 的邏輯和資料源 B 的邏輯需要修改時,可以單獨修改,第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務化后置和狀態可控,其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合,最后是準確性,我們支持資料驗證、實時監控、模型出口統一等,此時我們遇到了另外一個問題 - 亂序,對于上方三個不同的作業,每一個作業重啟至少會有兩分鐘左右的延遲,延遲會導致下游的資料源 Union 到一起就會有亂序,

  • 延遲計算方案

遇到上面這種有亂序的情況下,我們要怎么處理?

我們總共有三種處理方案:

第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”,比如同一個 did,04:01 來了一條,它會進行結果輸出,同樣的,04:02 和 04:04 也會進行結果輸出,但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出,

這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態大小是存 10 分鐘的兩倍,到后面這個狀態大小有點不太可控,因此我們又換了解決方案 2,

第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在資料源亂序的情況,在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示,04:01 來了一條資料,進行結果輸出,04:02 來了一條資料,如果是同一個 did,那么它會更新時間戳,然后仍然做結果輸出,04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的資料,它發現時間戳已經更新到 04:04,它會丟棄這條資料,這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由于我們不好解決這個問題,因此我們又想出了解決方案 3,

方案 3 是在方案 2 時間戳的基礎之上,加了一個類似于環形緩沖區,在緩沖區之內允許亂序,

比如 04:01 來了一條資料,進行結果輸出;04:02 來了一條資料,它會把時間戳更新到 04:02,并且會記錄同一個設備在 04:01 也來過,如果 04:04 再來了一條資料,就按照相應的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序,

綜合來看這三個方案:

方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀態大小在 480G 左右,這種情況雖然保證了準確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;

方案 2 是 30G 左右的狀態大小,對于亂序 0 容忍,但是資料不準確,由于我們對準確性的要求非常高,因此也放棄了這個方案;

方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果,方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3,

③ 運營場景

  •  背景介紹

運營場景可分為四個部分:

第一個是資料大屏支持,包括單直播間的分析資料和大盤的分析資料,需要做到分鐘級延遲,更新要求比較高;

第二個是直播看板支持,直播看板的資料會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;

第三個是資料策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的資料,更新要求比較低;

第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定,

下面進行分析這 4 種不同的狀態產生的一些不同的場景,

前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景,

針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,程序之中的曲線也不要求有一致性,第 4 種查詢模式更多的是單物體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高,

  •  技術方案

針對上方 4 種不同的場景,我們是如何去做的?

首先看一下基礎明細層 (圖中左方),資料源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費資訊,還有觀看 / 點贊 / 評論,經過一輪基礎清洗,然后做維度管理,上游的這些維度資訊來源于 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 存盤里邊,包括一些用戶的維度,

這些維度關聯了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級快取的操作,

如圖中上方,我們讀取 DWD 層的資料然后做基礎匯總,核心是視窗維度聚合生成 4 種不同粒度的資料,分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的資料,

如圖中下方,基于這些通用維度資料,我們再去加工個性化維度的資料,也就是 ADS 層,拿到了這些資料之后會有維度擴展,包括內容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic,

分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度,通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯,如果這兩個耦合在一起的話,會發現任務經常出問題,并且分不清楚哪個任務的職責是什么,構建不出這樣的一個穩定層,

如圖中右方,最終我們用到了三種不同的引擎,簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景,

3. 騰訊看點實時數倉案例

騰訊看點業務為什么要構建實時數倉,因為原始的上報資料量非常大,一天上報峰值就有上萬億條,而且上報格式混亂,缺乏內容維度資訊、用戶畫像資訊,下游沒辦法直接使用,而我們提供的實時數倉,是根據騰訊看點資訊流的業務場景,進行了內容維度的關聯,用戶畫像的關聯,各種粒度的聚合,下游可以非常方便的使用實時資料,

1) 方案選型

那就看下我們多維實時資料分析系統的方案選型,選型我們對比了行業內的領先方案,選擇了最符合我們業務場景的方案,

第一塊是實時數倉的選型,我們選擇的是業界比較成熟的 Lambda 架構,他的優點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線資料用兩套代碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做資料對賬的作業,如果有例外會進行告警,

第二塊是實時計算引擎選型,因為 Flink 設計之初就是為了流處理,SparkStreaming 嚴格來說還是微批處理,Strom 用的已經不多了,再看 Flink 具有 Exactly-once 的準確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎,

第三塊是實時存盤引擎,我們的要求就是需要有維度索引、支持高并發、預聚合、高性能實時多維 OLAP 查詢,可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內容,存放在同一個 Segment 上,計算全域 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 資料庫引擎 ClickHouse,

2) 設計目標與設計難點

我們多維實時資料分析系統分為三大模塊:

  • 實時計算引擎
  • 實時存盤引擎
  • App 層

難點主要在前兩個模塊:實時計算引擎和實時存盤引擎,

  • 千萬級/s 的海量資料如何實時接入,并且進行極低延遲維表關聯,
  • 實時存盤引擎如何支持高并發寫入、高可用分布式和高性能索引查詢,是比較難的,

這幾個模塊的具體實作,看一下我們系統的架構設計,

3) 架構設計

前端采用的是開源組件 Ant Design,利用了 Nginx 服務器,部署靜態頁面,并反向代理了瀏覽器的請求到后臺服務器上,

后臺服務是基于騰訊自研的 RPC 后臺服務框架寫的,并且會進行一些二級快取,

實時數倉部分,分為了接入層、實時計算層和實時數倉存盤層,

  • 接入層主要是從千萬級/s 的原始訊息佇列中,拆分出不同行為資料的微佇列,拿看點的視頻來說,拆分過后,資料就只有百萬級/s 了;
  • 實時計算層主要負責,多行行為流水資料進行行轉列,實時關聯用戶畫像資料和內容維度資料;
  • 實時數倉存盤層主要是設計出符合看點業務的,下游好用的實時訊息佇列,我們暫時提供了兩個訊息佇列,作為實時數倉的兩層,一層 DWM 層是內容 ID-用戶 ID 粒度聚合的,就是一條資料包含內容 ID-用戶 ID 還有 B 側內容資料、C 側用戶資料和用戶畫像資料;另一層是 DWS 層,是內容 ID 粒度聚合的,一條資料包含內容 ID,B 側資料和 C 側資料,可以看到內容 ID-用戶 ID 粒度的訊息佇列流量進一步減小到十萬級/s,內容 ID 粒度的更是萬級/s,并且格式更加清晰,維度資訊更加豐富,

實時存盤部分分為實時寫入層、OLAP 存盤層和后臺介面層,

  • 實時寫入層主要是負責 Hash 路由將資料寫入;
  • OLAP 存盤層利用 MPP 存盤引擎,設計符合業務的索引和物化視圖,高效存盤海量資料;
  • 后臺介面層提供高效的多維實時查詢介面,

4) 實時計算

這個系統最復雜的兩塊,實時計算和實時存盤,

先介紹實時計算部分:分為實時關聯和實時數倉,

① 實時高性能維表關聯

實時維表關聯這一塊難度在于 百萬級/s 的實時資料流,如果直接去關聯 HBase,1 分鐘的資料,關聯完 HBase 耗時是小時級的,會導致資料延遲嚴重,

我們提出了幾個解決方案:

  • 第一,在 Flink 實時計算環節,先按照 1 分鐘進行了視窗聚合,將視窗內多行行為資料轉一行多列的資料格式,經過這一步操作,原本小時級的關聯耗時下降到了十幾分鐘,但是還是不夠的,
  • 第二,在訪問 HBase 內容之前設定一層 Redis 快取,因為 1000 條資料訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍,為了防止過期的資料浪費快取,快取過期時間設定成 24 小時,同時通過監聽寫 HBase Proxy 來保證快取的一致性,這樣將訪問時間從十幾分鐘變成了秒級,
  • 第三,上報程序中會上報不少非常規內容 ID,這些內容 ID 在內容 HBase 中是不存盤的,會造成快取穿透的問題,所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止快取穿透,又減少一些時間,
  • 第四,因為設定了定時快取,會引入一個快取雪崩的問題,為了防止雪崩,我們在實時計算中,進行了削峰填谷的操作,錯開設定快取的時間,

可以看到,優化前后,資料量從百億級減少到了十億級,耗時從小時級減少到了數十秒,減少 99%,

②下游提供服務

實時數倉的難度在于:它處于比較新的領域,并且各個公司各個業務差距比較大,怎么能設計出方便,好用,符合看點業務場景的實時數倉是有難度的,

先看一下實時數倉做了什么,實時數倉對外就是幾個訊息佇列,不同的訊息佇列里面存放的就是不同聚合粒度的實時資料,包括內容 ID、用戶 ID、C 側行為資料、B 側內容維度資料和用戶畫像資料等,

我們是怎么搭建實時數倉的,就是上面介紹的實時計算引擎的輸出,放到訊息佇列中保存,可以提供給下游多用戶復用,

我們可以看下,在我們建設實時資料倉庫前后,開發一個實時應用的區別,沒有數倉的時候,我們需要消費千萬級/s 的原始佇列,進行復雜的資料清洗,然后再進行用戶畫像關聯、內容維度關聯,才能拿到符合要求格式的實時資料,開發和擴展的成本都會比較高,如果想開發一個新的應用,又要走一遍這個流程,有了數倉之后,如果想開發內容 ID 粒度的實時應用,就直接申請 TPS 萬級/s 的 DWS 層的訊息佇列,開發成本變低很多,資源消耗小很多,可擴展性也強很多,

看個實際例子,開發我們系統的實時資料大屏,原本需要進行如上所有操作,才能拿到資料,現在只需要消費 DWS 層訊息佇列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 記憶體,

可以看到,以 50 個消費者為例,建立實時數倉前后,下游開發一個實時應用,可以減少 98%的資源消耗,包括計算資源,存盤資源,人力成本和開發人員學習接入成本等等,并且消費者越多,節省越多,就拿 Redis 存盤這一部分來說,一個月就能省下上百萬人民幣,

5) 實時存盤

介紹完實時計算,再來介紹實時存盤,

這塊分為四個部分來介紹:

  • 分布式-高可用
  • 海量資料-寫入
  • 高性能-查詢
  • 擴容

① 分布式-高可用

我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實作高可用的方案,資料寫入一個分片,僅寫入一個副本,然后再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取資料,保證資料一致性,

這里沒有選用訊息佇列進行資料同步,是因為 ZK 更加輕量級,而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的資料,而且就算其它節點第一次來獲取資料失敗了,后面只要發現它跟 ZK 上記錄的資料不一致,就會再次嘗試獲取資料,保證一致性,

②海量資料-寫入

資料寫入遇到的第一個問題是,海量資料直接寫入 Clickhouse 的話,會導致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入,Batch 設定多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游記憶體壓力太大,通過實驗,最終我們選用了大小幾十萬的 Batch,

第二個問題是,隨著資料量的增長,單 QQ 看點的視頻內容每天可能寫入百億級的資料,默認方案是寫一張分布式表,這就會造成單臺機器出現磁盤的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似于 HBase、RocketsDB 的底層 LSM-Tree,在合并的程序中會存在寫放大的問題,加重磁盤壓力,峰值每分鐘幾千萬條資料,寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢,我們做的兩個優化方案:一是對磁盤做 Raid,提升磁盤的 IO;二是在寫入之前進行分表,直接分開寫入到不同的分片上,磁盤壓力直接變為 1/N,

第三個問題是,雖然我們寫入按照分片進行了劃分,但是這里引入了一個分布式系統常見的問題,就是區域的 Top 并非全域 Top 的問題,比如同一個內容 ID 的資料落在了不同的分片上,計算全域 Top100 閱讀的內容 ID,有一個內容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導致匯總的時候,會丟失一部分資料,影響最終結果,我們做的優化是在寫入之前加上一層路由,將同一個內容 ID 的記錄,全部路由到同一個分片上,解決了該問題,

介紹完寫入,下一步介紹 Clickhouse 的高性能存盤和查詢,

③ 高性能-存盤-查詢

Clickhouse 高性能查詢的一個關鍵點是稀疏索引,稀疏索引這個設計就很有講究,設計得好可以加速查詢,設計不好反而會影響查詢效率,我根據我們的業務場景,因為我們的查詢大部分都是時間和內容 ID 相關的,比如說,某個內容,過去 N 分鐘在各個人群表現如何?我按照日期,分鐘粒度時間和內容 ID 建立了稀疏索引,針對某個內容的查詢,建立稀疏索引之后,可以減少 99%的檔案掃描,

還有一個問題就是,我們現在資料量太大,維度太多,拿 QQ 看點的視頻內容來說,一天流水有上百億條,有些維度有幾百個類別,如果一次性把所有維度進行預聚合,資料量會指數膨脹,查詢反而變慢,并且會占用大量記憶體空間,我們的優化,針對不同的維度,建立對應的預聚合物化視圖,用空間換時間,這樣可以縮短查詢的時間,

分布式表查詢還會有一個問題,查詢單個內容 ID 的資訊,分布式表會將查詢下發到所有的分片上,然后再回傳查詢結果進行匯總,實際上,因為做過路由,一個內容 ID 只存在于一個分片上,剩下的分片都在空跑,針對這類查詢,我們的優化是后臺按照同樣的規則先進行路由,直接查詢目標分片,這樣減少了 N-1/N 的負載,可以大量縮短查詢時間,而且由于我們是提供的 OLAP 查詢,資料滿足最終一致性即可,通過主從副本讀寫分離,可以進一步提升性能,

我們在后臺還做了一個 1 分鐘的資料快取,針對相同條件查詢,后臺就直接回傳了,

④ 擴容

這里再介紹一下我們的擴容的方案,調研了業內的一些常見方案,

比如 HBase,原始資料都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始資料的遷移,但是 Clickhouse 的每個分片資料都是在本地,是一個比較底層存盤引擎,不能像 HBase 那樣方便擴容,

Redis 是哈希槽這種類似一致性哈希的方式,是比較經典分布式快取的方案,Redis slot 在 Rehash 的程序中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最后再洗掉 h[0],但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由于列式存盤,不支持洗掉的特性,一致性哈希的方案不是很適合,

目前擴容的方案是,另外消費一份資料,寫入新 Clickhouse 集群,兩個集群一起跑一段時間,因為實時資料就保存 3 天,等 3 天之后,后臺服務直接訪問新集群,

4. 有贊實時數倉案例

1) 分層設計

傳統離線數倉的分層設計大家都很熟悉,為了規范的組織和管理資料,層級劃分會比較多,在一些復雜邏輯處理場景還會引入臨時層落地中間結果以方便下游加工處理,實時數倉考慮到時效性問題,分層設計需要盡量精簡,降低中間流程出錯的可能性,不過總體而言,實時數倉還是會參考離線數倉的分層思想來設計,

實時數倉分層架構如下圖所示 :

① ODS(實時資料接入層)

ODS 層,即實時資料接入層,通過資料采集工具收集各個業務系統的實時資料,對非結構化的資料進行結構化處理,保存原始資料,幾乎不過濾資料;該層資料的主要來源有三個部分:第一部分是業務方創建的 NSQ 訊息,第二部分是業務資料庫的 Binlog 日志,第三部分是埋點日志和應用程式日志,以上三部分的實時資料最終統一寫入 Kafka 存盤介質中,

ODS 層表命名規范:部門名稱.應用名稱.數倉層級主題域前綴資料庫名/訊息名

例如:接入業務庫的 Binlog

實時數倉表命名:deptname.appname.ods_subjectname_tablename

例如:接入業務方的 NSQ 訊息

實時數倉表命名:deptname.appname.ods_subjectname_msgname

② DWS(實時明細中間層)

DWS 層,即實時明細中間層,該層以業務程序作為建模驅動,基于每個具體的業務程序事件來構建最細粒度的明細層事實表;比如交易程序,有下單事件、支付事件、發貨事件等,我們會基于這些獨立的事件來進行明細層的構建,在這層,事實明細資料同樣是按照離線數倉的主題域來進行劃分,也會采用維度建模的方式組織資料,對于一些重要的維度欄位,會做適當冗余,基于有贊實時需求的場景,重點建設交易、營銷、客戶、店鋪、商品等主題域的資料,該層的資料來源于 ODS 層,通過 FlinkSQL 進行 ETL 處理,主要作業有規范命名、資料清洗、維度補全、多流關聯,最終統一寫入 Kafka 存盤介質中,

DWS 層表命名規范:部門名稱.應用名稱.數倉層級_主題域前綴_數倉表命名

例如:實時事件 A 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameA

例如:實時事件 B 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameB

③ DIM(實時維表層)

DIM 層,即實時維表層,用來存放維度資料,主要用于實時明細中間層寬化處理時補全維度使用,目前該層的資料主要存盤于 HBase 中,后續會基于 QPS 和資料量大小提供更多合適型別的存盤介質,

DIM 層表命名規范:應用名稱_數倉層級_主題域前綴_數倉表命名

例如:HBase 存盤,實時維度表

實時數倉表命名:appname_dim_tablename

④ DWA(實時匯總層)

DWA 層,即實時匯總層,該層通過 DWS 層資料進行多維匯總,提供給下游業務方使用,在實際應用程序中,不同業務方使用維度匯總的方式不太一樣,根據不同的需求采用不同的技術方案去實作,第一種方式,采用 FlinkSQL 進行實時匯總,將結果指標存入 HBase、MySQL 等資料庫,該種方式是我們早期采用的方案,優點是實作業務邏輯比較靈活,缺點是聚合粒度固化,不易擴展;第二種方式,采用實時 OLAP 工具進行匯總,該種方式是我們目前常用的方案,優點是聚合粒度易擴展,缺點是業務邏輯需要在中間層預處理,

DWA 層表命名規范:應用名稱_數倉層級_主題域前綴_聚合粒度_資料范圍

例如:HBase 存盤,某域當日某粒度實時匯總表

實時數倉表命名:appname_dwa_subjectname_aggname_daily

⑤ APP(實時應用層)

APP 層,即實時應用層,該層資料已經寫入應用系統的存盤中,例如寫入 Druid 作為 BI 看板的實時資料集;寫入 HBase、MySQL 用于提供統一資料服務介面;寫入 ClickHouse 用于提供實時 OLAP 服務,因為該層非常貼近業務,在命名規范上實時數倉不做統一要求,

2) 實時 ETL

實時數倉 ETL 處理程序所涉及的組件比較多,接下來盤點構建實時數倉所需要的組件以及每個組件的應用場景,如下圖所示:

具體實時 ETL 處理流程如下圖所示:

① 維度補全

創建呼叫 Duboo 介面的 UDF 函式在實時流里補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 介面壓力會過大,在實際應用場景補全維度首選還是關聯維度表,但關聯也存在一定概率的丟失問題,為了彌補這種丟失,可以采用 Duboo 介面呼叫兜底的方式來補全,偽代碼如下:

create function call_dubbo as 'XXXXXXX';

create function get_json_object as 'XXXXXXX';

case

when cast( b.column as bigint) is not null

then cast( b.column as bigint)

else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'

,'serviceName'

,'methodName'

,cast(concat('[',cast(a.column as varchar),']') as varchar)

,'key'

,'rootId')

as bigint)

,a.column)

as bigint) end

② 冪等處理

實時任務在運行程序中難免會遇到執行例外的情況,當任務例外重啟的時候會導致部分訊息重新發送和消費,從而引發下游實時統計資料不準確,為了有效避免這種情況,可以選擇對實時訊息流做冪等處理,當消費完一條訊息,將這條訊息的 Key 存入 KV,如果任務例外重啟導致訊息重新發送的時候,先從 KV 判斷該訊息是否已被消費,如果已消費就不再往下發送,偽代碼如下:

create function idempotenc as 'XXXXXXX';

insert into table

select

order_no

from

select

a.orderNo as order_no

, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid

from

table1

) t

where

t.rid = 0;

③ 資料驗證

由于實時數倉的資料是無邊界的流,相比于離線數倉固定不變的資料更難驗收,基于不同的場景,我們提供了 2 種驗證方式,分別是:抽樣驗證與全量驗證,如圖 3.3 所示

  • 抽樣驗證方案

該方案主要應用在資料準確性驗證上,實時匯總結果是基于存盤在 Kafka 的實時明細中間層計算而來,但 Kafka 本身不支持按照特定條件檢索,不支持寫查詢陳述句,再加上訊息的無邊界性,統計結果是在不斷變化的,很難尋找參照物進行比對,鑒于此,我們采用了持久化訊息的方法,將訊息落盤到 TiDB 存盤,基于 TiDB 的能力對落盤的訊息進行檢索、查詢、匯總,撰寫固定時間邊界的測驗用例與相同時間邊界的業務庫資料或者離線數倉資料進行比對,通過以上方式,抽樣核心店鋪的資料進行指標準確性驗證,確保測驗用例全部通過,

  • 全量驗證方案

該方案主要應用在資料完整性和一致性驗證上,在實時維度表驗證的場景使用最多,大體思路:將存盤實時維度表的在線 HBase 集群中的資料同步到離線 HBase 集群中,再將離線 HBase 集群中的資料匯入到 Hive 中,在限定實時維度表的時間邊界后,通過資料平臺提供的資料校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的資料完全一致,

④ 資料恢復

實時任務一旦上線就要求持續不斷的提供準確、穩定的服務,區別于離線任務按天調度,如果離線任務出現 Bug,會有充足的時間去修復,如果實時任務出現 Bug,必須按照提前制定好的流程,嚴格按照步驟執行,否則極易出現問題,造成 Bug 的情況有非常多,比如代碼 Bug、例外資料 Bug、實時集群 Bug,如下圖展示了修復實時任務 Bug 并恢復資料的流程,

5. 騰訊全場景實時數倉建設案例

在數倉體系中會有各種各樣的大資料組件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據不同的需求,用戶會構建大資料存盤和處理平臺,資料在平臺經過處理和分析,結果資料會保存到 MySQL、Elasticsearch 等支持快速查詢的關系型、非關系型資料庫中,接下來應用層就可以基于這些資料進行 BI 報表開發、用戶畫像,或基于 Presto 這種 OLAP 工具進行互動式查詢等,

1) Lambda 架構的痛點

在整個程序中我們常常會用一些離線的調度系統,定期的(T+1 或者每隔幾小時)去執行一些 Spark 分析任務,做一些資料的輸入、輸出或是 ETL 作業,離線資料處理的整個程序中必然存在資料延遲的現象,不管是資料接入還是中間的分析,資料的延遲都是比較大的,可能是小時級也有可能是天級別的,另外一些場景中我們也常常會為了一些實時性的需求去構建一個實時處理程序,比如借助 Flink+Kafka 去構建實時的流處理系統,

整體上,數倉架構中有非常多的組件,大大增加了整個架構的復雜性和運維的成本,

如下圖,這是很多公司之前或者現在正在采用的 Lambda 架構,Lambda 架構將數倉分為離線層和實時層,相應的就有批處理和流處理兩個相互獨立的資料處理流程,同一份資料會被處理兩次以上,同一套業務邏輯代碼需要適配性的開發兩次,Lambda 架構大家應該已經非常熟悉了,下面我就著重介紹一下我們采用 Lambda 架構在數倉建設程序中遇到的一些痛點問題,

例如在實時計算一些用戶相關指標的實時場景下,我們想看到當前 pv、uv 時,我們會將這些資料放到實時層去做一些計算,這些指標的值就會實時呈現出來,但同時想了解用戶的一個增長趨勢,需要把過去一天的資料計算出來,這樣就需要通過批處理的調度任務來實作,比如凌晨兩三點的時候在調度系統上起一個 Spark 調度任務把當天所有的資料重新跑一遍,

很顯然在這個程序中,由于兩個程序運行的時間是不一樣的,跑的資料卻相同,因此可能造成資料的不一致,因為某一潭訓幾條資料的更新,需要重新跑一遍整個離線分析的鏈路,資料更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發流程和運維成本其實都是非常高的,

為了解決 Lambda 架構帶來的各種問題,就誕生了 Kappa 架構,這個架構大家應該也非常的熟悉,

2) Kappa 架構的痛點

我們來講一下 Kappa 架構,如下圖,它中間其實用的是訊息佇列,通過用 Flink 將整個鏈路串聯起來,Kappa 架構解決了 Lambda 架構中離線處理層和實時處理層之間由于引擎不一樣,導致的運維成本和開發成本高昂的問題,但 Kappa 架構也有其痛點,

首先,在構建實時業務場景時,會用到 Kappa 去構建一個近實時的場景,但如果想對數倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進一步的資料處理時,如將資料寫到 DWD 層的 Kafka,則需要另外接入 Flink,同時,當需要從 DWD 層的 Kafka 把資料再匯入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做進一步的分析時,顯然就增加了整個架構的復雜性,

其次,Kappa 架構是強烈依賴訊息佇列的,我們知道訊息佇列本身在整個鏈路上資料計算的準確性是嚴格依賴它上游資料的順序,訊息佇列接的越多,發生亂序的可能性就越大,ODS 層資料一般是絕對準確的,把 ODS 層的資料發送到下一個 kafka 的時候就有可能發生亂序,DWD 層再發到 DWS 的時候可能又亂序了,這樣資料不一致性就會變得很嚴重,

第三,Kafka 由于它是一個順序存盤的系統,順序存盤系統是沒有辦法直接在其上面利用 OLAP 分析的一些優化策略,例如謂詞下推這類的優化策略,在順序存盤的 Kafka 上來實作是比較困難的事情,

那么有沒有這樣一個架構,既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發的成本,解決通過訊息佇列構建 Kappa 架構程序中遇到的一些痛點?答案是肯定的,后面的篇幅會詳細論述,

3) 痛點總結

4) Flink+Iceberg 構建實時數倉

① 近實時的資料接入

前面介紹了 Iceberg 既支持讀寫分離,又支持并發讀、增量讀、小檔案合并,還可以支持秒級到分鐘級的延遲,基于這些優勢我們嘗試采用 Iceberg 這些功能來構建基于 Flink 的實時全鏈路批流一體化的實時數倉架構,

如下圖所示,Iceberg 每次的 commit 操作,都是對資料的可見性的改變,比如說讓資料從不可見變成可見,在這個程序中,就可以實作近實時的資料記錄,

② 實時數倉 - 資料湖分析系統

此前需要先進行資料接入,比如用 Spark 的離線調度任務去跑一些資料,拉取,抽取最后再寫入到 Hive 表里面,這個程序的延時比較大,有了 Iceberg 的表結構,可以中間使用 Flink,或者 spark streaming,完成近實時的資料接入,

基于以上功能,我們再來回顧一下前面討論的 Kappa 架構,Kappa 架構的痛點上面已經描述過,Iceberg 既然能夠作為一個優秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?

Iceberg 底層依賴的存盤是像 HDFS 或 S3 這樣的廉價存盤,而且 Iceberg 是支持 parquet、orc、Avro 這樣的列式存盤,有列式存盤的支持,就可以對 OLAP 分析進行基本的優化,在中間層直接進行計算,例如謂詞下推最基本的 OLAP 優化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務天級別到小時級別的延遲大大的降低,改造成一個近實時的資料湖分析系統,

在中間處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支持 Streaming read,所以在系統的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務,把中間結果做進一步計算后輸出到下游,

總的來說,Iceberg 替換 Kafka 的優勢主要包括:

優勢:

  • 實作存盤層的流批統一;
  • 中間層支持 OLAP 分析;
  • 完美支持高效回溯;
  • 存盤成本降低,

劣勢:

  • 資料延遲從實時變成近實時;
  • 對接其他資料系統需要額外開發作業,

秒級分析 - 資料湖加速:

由于 Iceberg 本身是將資料檔案全部存盤在 HDFS 上的,HDFS 讀寫這塊對于秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支持 Alluxio 這樣一個快取,借助于快取的能力可以實作資料湖的加速,這塊的架構也在我們未來的一個規劃和建設中,

作者丨五分鐘學大資料

本文來自博客園,作者:古道輕風,轉載請注明原文鏈接:https://www.cnblogs.com/88223100/p/5_examples_of_real-time_data_warehouse_construction_in_Internet_giants.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/445409.html

標籤:其他

上一篇:阿里慢SQL治理5大經典案例

下一篇:MySQL視圖

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more