寫在開始
- 本篇內容主要是分享一下如何通過Flink Sql 流式的讀寫Hudi表,也會說說我對Hudi的理解
- IDE選擇的是Zeppelin,環境配置可以參考一下我這篇博客
環境準備
-
代碼編譯
這里我們使用的是Master分支的Hudi進行編譯git clone https://github.com/apache/hudi.git # 我使用的是scala版本為2.11的Flink,如果是2.12的scala,請在下面陳述句的最后加上 -Pscala-2.12 -Dscala-2.12 mvn clean install -s $MAVEN_HOME/conf/settings.xml -DskipTests # 編譯完成之后,packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar 就是要放到我們環境里面的Jar包 -
Jar包配置
在使用Zeppelin進行Flink X Hudi的開發時,如果你使用flink.execution.jars的方式加載Hudi的包,那么會有一些問題,不推薦這樣使用,如圖所示

這里我是建完Hudi表,然后向表中插入資料的時候拋出的例外
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.common.metrics.LocalRegistry按理說這根本不可能,但是如果加載這個類的加載器,和執行這段代碼的類加載器不一樣,那么就會有這樣的問題,所以我建議還是簡單粗暴的將hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar這個包放在${FLINK_HOME}/lib下面 -
繼續踩坑
在環境配置完成之后,我們自然需要去簡單的驗證一下功能,于是我們建了個表

又出問題了,不過這次這個問題很好解決,在
packaging/hudi-flink-bundle/pom.xml的第164行插入
<relocation> <pattern>org.apache.hadoop.hive.ql.</pattern> <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.ql.</shadedPattern> </relocation>
然后重新打包就行,這個BUG我已經提PR了,當我這篇文章發出去的時候,應該已經合并到Master分支了,到時候大家就不會有這個問題了
這里簡單的和大家說一下如何處理這種包沖突的問題- 在
${FLINK_HOME}/lib下執行grep -w 'com.example.class' *看看是否有Jar有重復的類,如果有多個結果直接Vim每個Jar包,然后再重新搜索你的類,看看類名是否完全一致,如果確實存在多個Jar中有多個同名類,那么有兩種解決方式- 自己手動進行排包,利用
mvn dependency:tree或者Idea中的插件Maven Helper進行排包 - 通過打shade包,來對同名類進行重命名處理,也就是我上面的處理方案,至于什么是shade包,可以看這里
- 自己手動進行排包,利用
- 在
解決完了環境問題之后,我們接下來開始正式的學習之旅
Flink X Hudi
建表
create database if not exists hudi_db;
use hudi_db;
drop table if exists hudi_test_dijie;
CREATE TABLE hudi_test_dijie(
a varchar,
dt string
)
PARTITIONED BY (`dt`)
WITH (
'connector'= 'hudi',
'path'= 'hdfs:///hudi/hudi_test_dijie',
'table.type'= 'MERGE_ON_READ',
'read.streaming.enabled'= 'true',
-- 'read.streaming.start-commit'= '20210401134557',
'read.streaming.check-interval'= '4'
);
我們來看一下這個陳述句,有幾個特殊的地方來和大家說一下
首先我們先說一下Hudi的一些基本概念
Apache Hudi (pronounced Hoodie) stands for Hadoop Upserts Deletes and Incrementals.
Hudi的4個字母分別代表的是Hadoop Upserts Delete Incrementals
那既然有更新和洗掉,那么主鍵就是必不可少的,不然也沒法知道更新哪條資料
在Hudi中,每條資料都有HoodieKey唯一標識,HoodieKey由資料的key和資料所屬的磁區組成
在Flink中我們可以通過指定hoodie.datasource.write.recordkey.field來指定我們的Key是資料中的哪個欄位,也可以通過PRIMARY KEY(a) NOT ENFORCED來指定,前者會被后者覆寫
表資料的更新和洗掉程序在不同型別的Hudi表中有不同的體現,Hudi中目前有兩種表型別,分別是
- Copy On Write
- 在資料寫入的時候,通過復制舊檔案資料并且與新寫入的資料進行合并
- Merge On Read
- 在資料寫入的時候通過記錄Log檔案,來標記某條資料到底是Insert還是Update;在進行資料讀取的時候,將本批次讀取到的資料進行Merge
這里為了方便理解,我概括的比較簡單,更多的內容可以參考官方檔案
read.streaming.start-commit指的是從哪一個提交位點進行消費
Hudi維護了一個timeline的概念,它記錄了在不同時刻對于表提交的操作,這有助于提供表在不同時刻的狀態,同時還可以高效的按照時間進行資料查詢
read.streaming.check-interval比較簡單,代表我們每個多久去檢驗一次有沒有新的操作在timeline上產生
寫表
簡單的灌幾條重復資料進去,順帶驗證能否Upsert
INSERT INTO hudi_test_dijie VALUES ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01'), ('id','2020-04-25',TIMESTAMP '1970-01-01 00:00:01')
然后我們在Flink的Web Ui上觀察一下任務

報錯了,可這次報錯看不懂了,哪來的uuid這個欄位?我們并沒有在DDL中指定這個,莫慌,我們用IDE打開Hudi的原始碼一探究竟
先看看例外的堆疊資訊,從這條日志資訊at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)可以看出來,是Flink在處理每條資料的時候拋出的例外,于是我們到原始碼中的Hudi-Flink模塊,搜索uuid

從描述中能看出來,是為了組裝我們前面說的HoodieKey,從資料中取uuid這個欄位,而我們在進行表定義的時候,并沒有這個欄位,難怪會拋出例外,
再看一下這個欄位在哪被參考過

可以看出來,會先取出表定義的主鍵,如果主鍵不為空,則用主鍵組成的聯合欄位去替換uuid,如果為空就還是uuid
所以接下來,我們重新建一下有主鍵的表
create database if not exists hudi_db;
use hudi_db;
drop table if exists hudi_test_dijie;
CREATE TABLE hudi_test_dijie(
a varchar,
dt string,
PRIMARY KEY(a) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'connector'= 'hudi',
'path'= 'hdfs:///hudi/hudi_test_dijie',
'table.type'= 'MERGE_ON_READ',
'read.streaming.enabled'= 'true',
-- 'read.streaming.start-commit'= '20210401134557',
'read.streaming.check-interval'= '4',
'write.insert.drop.duplicates' = 'true'
);
然后再執行一下插入陳述句,依舊報錯

有了上一次的經驗,我們很容易定位到問題

當多條資料Key相同需要做預合并時,我們需要根據一個欄位來確定哪條資料是新資料哪條是舊資料,
再重新建一次表,這次加上ts欄位
create database if not exists hudi_db;
use hudi_db;
drop table if exists hudi_test_dijie;
CREATE TABLE hudi_test_dijie(
a varchar,
dt string,
ts TIMESTAMP(3),
PRIMARY KEY(a) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'connector'= 'hudi',
'path'= 'hdfs:///hudi/hudi_test_dijie',
'table.type'= 'MERGE_ON_READ',
'read.streaming.enabled'= 'true',
-- 'read.streaming.start-commit'= '20210401134557',
'read.streaming.check-interval'= '4',
'write.insert.drop.duplicates' = 'true'
);
這次終于不報錯了,那么我們進行下一步
查表
查表的陳述句很簡單

可以看到資料能夠正常的查出來,而且資料不存在重復的情況
那,如果我這個時候,將上面的插入陳述句再執行一次,查出的資料會多嗎?
執行一下插入陳述句,同時再觀察一下查詢陳述句的輸出結果

既然Hudi支持Upsert,那為何還是會有重復資料呢?
因為MOR表會在讀取最新資料的時候,只會將這一次提交的資料給去重,以前的已經下發的資料,沒有辦法再去處理;如果是從歷史提交位點進行讀取資料,那么會將這個位點到最新位點中所有的資料做一次去重
那么COW表呢?很遺憾,目前Hudi Streaming Read只支持MOR表,COW表暫未支持
好了,寫到這里,本文的內容已經全部講完,更多的內容可以多多看看官網,或者等我更新~
寫在最后
- 目前Hudi也是支持了Streaming Read&Write,但是美中不足的地方是,Streaming Read只支持對一個批次的資料進行Merge,涉及歷史資料無法改變,等待社區更新吧,看看有沒有什么新的方案,我個人感覺有兩種做法
- 引入類似于
canal-json的Foramt,將資料以canal-json的格式存入Hudi表中,讀取的時候再以同樣的格式讀取,可以利用Flink的能力來進行資料回撤,這種方式比較推薦 - 在HudiSource中引入一個MapState,Key為HoodieKey,Value為該Key最新的一條資料,利用Flink的狀態來進行資料回撤;這種的話對Source端壓力比較大,畢竟需要引入新的State,同時,何時清理State也比較難定義
- 引入類似于
- 下一篇會寫一下Iceberg和Hudi的深度對比,敬請期待!
- 記得點贊!
參考
[1] 官方檔案
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/281625.html
標籤:其他
上一篇:小程式云開發--CMS內容管理
下一篇:初識docker
