DWM 建表,需要看 DWS 需求,
DWS 來自維度(訪客、商品、地區、關鍵詞),為了出最終的指標
ADS 需求指標
DWT 為什么實時數倉沒有DWT,因為它是歷史的聚集,累積結果,實時數倉中不需要
DWD 不需要加工
DWM 需要加工的資料
| 統計主題 | 需求指標【ADS】 | 輸出方式 | 計算來源 | 來源層級 |
|---|---|---|---|---|
| 訪客【DWS】 | pv | 可視化大屏 | page_log 直接可求 | dwd |
| UV(DAU) | 可視化大屏 | 需要用 page_log 過濾去重 | dwm | |
| 跳出率 | 可視化大屏 | 需要通過 page_log 行為判斷 | dwm | |
| 進入頁面數 | 可視化大屏 | 需要識別開始訪問標識 | dwd | |
| 連續訪問時長 | 可視化大屏 | page_log 直接可求 | dwd | |
| 商品 | 點擊 | 多維分析 | page_log 直接可求 | dwd |
| 收藏 | 多維分析 | 收藏表 | dwd | |
| 加入購物車 | 多維分析 | 購物車表 | dwd | |
| 下單 | 可視化大屏 | 訂單寬表 | dwm | |
| 支付 | 多維分析 | 支付寬表 | dwm | |
| 退款 | 多維分析 | 退款表 | dwd | |
| 評論 | 多維分析 | 評論表 | dwd | |
| 地區 | PV | 多維分析 | page_log 直接可求 | dwd |
| UV | 多維分析 | 需要用 page_log 過濾去重 | dwm | |
| 下單 | 可視化大屏 | 訂單寬表 | dwm | |
| 關鍵詞 | 搜索關鍵詞 | 可視化大屏 | 頁面訪問日志 直接可求 | dwd |
| 點擊商品關鍵詞 | 可視化大屏 | 商品主題下單再次聚合 | dws | |
| 下單商品關鍵詞 | 可視化大屏 | 商品主題下單再次聚合 | dws |
獨立訪客UV
UV,全稱是 Unique Visitor,即獨立訪客,對于實時計算中,也可以稱為 DAU(Daily Active User),即每榷訓躍用戶,因為實時計算中的 UV 通常是指當日的訪客數,
那么如何從用戶行為日志中識別出當日的訪客,那么有兩點:
- 是識別出該訪客打開的第一個頁面,表示這個訪客開始進入我們的應用
- 由于訪客可以在一天中多次進入應用,所以我們要在一天的范圍內進行去重(狀態去重)
KeyState min -> state (存日期)
- 獲取執行環境
- 讀取Kafka dwd_page_log 主題的資料
- 將每行資料轉換為JSON物件
- 過濾資料,狀態編程 只保留每個 mid 每天第一次登錄的資料
- 將資料寫入kafka
- 啟動任務
過濾思路
- 首先用 keyby 按照 mid 進行分組,每組表示當前設備的訪問情況
- 分組后使用 keystate 狀態,記錄用戶進入時間,實作 RichFilterFunction 完成過濾
- 重寫 open 方法用來初始化狀態
- 重寫 filter 方法進行過濾
? 可以直接篩掉 last_page_id 不為空的欄位,因為只要有上一頁,說明這條不是這個用戶進入的首個頁面,
? 狀態用來記錄用戶的進入時間,只要這個 lastVisitDate 是今天,就說明用戶今天已經訪問過了所以篩除掉,如果為慷訓者不是今天,說明今天還沒訪問過,則保留,
? 因為狀態值主要用于篩選是否今天來過,所以這個記錄過了今天基本上沒有用了,這里 enableTimeToLive 設定了 1 天的過期時間,避免狀態過大,
跳出明細
跳出就是用戶成功訪問了網站的一個頁面后就退出,不在繼續訪問網站的其它頁面,
跳出率就是用跳出次數除以訪問次數,
關注跳出率,可以看出引流過來的訪客是否能很快的被吸引,渠道引流過來的用戶之間的質量對比,對于應用優化前后跳出率的對比也能看出優化改進的成果,
跳出率高不是好事、留存率高是好事
計算跳出行為的思路
首先要識別哪些是跳出行為,要把這些跳出的訪客最后一個訪問的頁面識別出來,那么要抓住幾個特征:
- 該頁面是用戶近期訪問的第一個頁面
這個可以通過該頁面是否有上一個頁面(last_page_id)來判斷,如果這個表示為空,就說明這是這個訪客這次訪問的第一個頁面, - 首次訪問之后很長一段時間(自己設定),用戶沒繼續再有其他頁面的訪問,
這第一個特征的識別很簡單,保留 last_page_id 為空的就可以了,但是第二個訪問的判斷,其實有點麻煩,首先這不是用一條資料就能得出結論的,需要組合判斷,要用一條存在的資料和不存在的資料進行組合判斷,而且要通過一個不存在的資料求得一條存在的資料,更麻煩的他并不是永遠不存在,而是在一定時間范圍內不存在,那么如何識別有一定失效的組合行為呢?
最簡單的辦法就是 Flink 自帶的 CEP 技術,這個 CEP 非常適合通過多條資料組合來識別某個事件,
用戶跳出事件,本質上就是一個條件事件加一個超時事件的組合,
- 獲取執行環境
- 讀取 Kafka dwd_page_log 主題的資料
- 將每行資料轉換為JSON物件,并提取時間戳生成 Watermark
- 定義模式序列
- 將模式序列作用到流上 CEP
- 提取匹配上的和超時事件
- UNION 兩種事件
- 將資料寫入kafka
- 啟動任務
訂單寬表
需求分析與思路
訂單是統計分析的重要的物件,圍繞訂單有很多的維度統計需求,比如用戶、地區、商品、品類、品牌等等,
為了之后統計計算更加方便,減少大表之間的關聯,所以在實時計算程序中將圍繞訂單的相關資料整合成為一張訂單的寬表,
那究竟哪些資料需要和訂單整合在一起?

如上圖,由于在之前的操作我們已經把資料分拆成了事實資料和維度資料,事實資料(綠色)進入 kafka 資料流(DWD 層)中,維度資料(藍色)進入 hbase 中長期保存,那么我們在 DWM 層中要把實時和維度資料進行整合關聯在一起,形成寬表,那么這里就要處理有兩種關聯,事實資料和事實資料關聯、事實資料和維度資料關聯,
- 事實資料和事實資料關聯,其實就是流與流之間的關聯,
- 事實資料與維度資料關聯,其實就是流計算中查詢外部資料源,
訂單和訂單明細關聯(雙流 join)
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
在 flink 中的流 join 大體分為兩種,一種是基于時間視窗的 join(Time Windowed Join),比如 join、coGroup 等,另一種是基于狀態快取的 join(Temporal Table Join),比如 Interval Join,
這里選用 Interval Join,因為相比較視窗 join,Interval Join 使用更簡單,而且避免了應匹配的資料處于不同視窗的問題,Interval Join 目前只有一個問題,就是還不支持 left join,
但是我們這里是訂單主表與訂單從表之間的關聯不需要 left join,所以 intervalJoin 是較好的選擇,
- 設定事件時間水位線
- 創建合并后的寬表物體類
- 訂單和訂單明細關聯 intervalJoin
- 獲取執行環境
- 讀取兩個埠資料創建流,并提取時間戳生成 Watermark
- 雙流join
- 列印
- 啟動任務
維表關聯代碼實作
維度關聯實際上就是在流中查詢存盤在 HBase 中的資料表,但是即使通過主鍵的方式查詢,HBase 速度的查詢也是不及流之間的 join,外部資料源的查詢常常是流式計算的性能瓶頸,所以咱們再這個基礎上還有進行一定的優化,
- 獲取執行環境
- 讀取 Kafka dwd_page_log 主題的資料
- 將每行資料轉換為JavaBean物件,并提取時間戳生成 Watermark
- 雙流join
- 關聯維度資訊
- 將資料寫入kafka
- 啟動任務
優化-加入旁路快取模式 (cache-aside-pattern)
我們在上面實作的功能中,直接查詢的 HBase,外部資料源的查詢常常是流式計算的性能瓶頸,所以我們需要在上面實作的基礎上進行一定的優化,我們這里使用旁路快取,
旁路快取模式是一種非常常見的按需分配快取的模式,如下圖,任何請求優先訪問快取,快取命中,直接獲得資料回傳請求,如果未命中則,查詢資料庫,同時把結果寫入快取以備后續請求使用,


這種快取策略有幾個注意點
快取要設過期時間,不然冷資料會常駐快取浪費資源,
要考慮維度資料是否會發生變化,如果發生變化要主動清除快取,
快取的選型
一般兩種:堆快取或者獨立快取服務(redis,memcache),
堆快取,從性能角度看更好,畢竟訪問資料路徑更短,減少程序消耗,但是管理性差,其他行程無法維護快取中的資料,
獨立快取服務(redis,memcache)本事性能也不錯,不過會有創建連接、網路 IO 等消耗,但是考慮到資料如果會發生變化,那還是獨立快取服務管理性更強,而且如果資料量特別大,獨立快取更容易擴展,
因為咱們的維度資料都是可變資料,所以這里還是采用 Redis 管理快取,
優化-異步查詢
在 Flink 流處理程序中,經常需要和外部系統進行互動,用維度表補全事實表中的欄位,例如:在電商場景中,需要一個商品的 skuid 去關聯商品的一些屬性,例如商品所屬行業、商品的生產廠家、生產廠家的一些情況;在物流場景中,知道包裹 id,需要去關聯包裹的行業屬性、發貨資訊、識訓資訊等等,
默認情況下,在 Flink 的 MapFunction 中,單個并行只能用同步方式去互動: 將請求發送到外部存盤,IO 阻塞,等待請求回傳,然后繼續發送下一個請求,這種同步互動的方式往往在網路等待上就耗費了大量時間,為了提高處理效率,可以增加 MapFunction 的并行度,但增加并行度就意味著更多的資源,并不是一種非常好的解決方式,
Flink 在 1.2 中引入了 Async I/O,在異步模式下,將 IO 操作異步化,單個并行可以連續發送多個請求,哪個請求先回傳就先處理,從而在連續的請求間不需要阻塞式等待,大大提高了流處理效率,
Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,解決與外部系統互動時網路延遲成為了系統瓶頸的問題,

異步查詢實際上是把維表的查詢操作托管給單獨的執行緒池完成,這樣不會因為某一個查詢造成阻塞,單個并行可以連續發送多個請求,提高并發效率,
這種方式特別針對涉及網路 IO 的操作,減少因為請求等待帶來的消耗,
支付寬表
支付寬表的目的,最主要的原因是支付表沒有到訂單明細,支付金額沒有細分到商品上,沒有辦法統計商品級的支付狀況,
所以本次寬表的核心就是要把支付表的資訊與訂單寬表關聯上,
解決方案有兩個
- 一個是把訂單寬表輸出到 HBase 上,在支付寬表計算時查詢 HBase,這相當于把訂單寬表作為一種維度進行管理,
- 一個是用流的方式接收訂單寬表,然后用雙流 join 方式進行合并,因為訂單與支付產生有一定的時差,所以必須用 Interval Join 來管理流的狀態時間,保證當支付到達時訂單寬表還保存在狀態中,
訂單寬表不需要永久保存,資料本身要寫Kafka所以沒必要再寫一份到 HBase,還要從里面查,綜合考慮,采用第2種方案,

https://www.bilibili.com/video/BV1Ju411o7f8/?p=73
尚硅谷 源代碼
大資料 - 資料倉庫-實時數倉架構分析
大資料 - 業務資料采集-FlinkCDC
大資料 - DWD&DIM 行為資料
大資料 - DWD&DIM 業務資料
大資料 - DWM層 業務實作
大資料 - DWS層 業務實作
大資料 - ADS 資料可視化實作
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/540962.html
標籤:其他
下一篇:大資料 - DWS層 業務實作
