我們知道 Flink 有Table(表)、View(視圖)、Function(函式/算子)、Database(資料庫)的概念,相對于這些耳熟能詳的概念,Flink 里還有一個 Catalog(目錄) 的概念,
本文將為大家帶來 Flink Catalog 的介紹以及 Flink Catalog 在 ChunJun 中的實踐之路,
Flink Catalog 簡介
Catalog 提供元資料,如資料庫、表、磁區、視圖,以及訪問存盤在資料庫或其他外部系統中的資料所需的函式和資訊,
Flink Catalog 作用
資料處理中最關鍵的一個方面是管理元資料:
· 可能是暫時性的元資料,如臨時表,或針對表環境注冊的 UDFs;
· 或者是永久性的元資料,比如 Hive 元存盤中的元資料,
Catalog 提供了一個統一的 API 來管理元資料,并使其可以從表 API 和 SQL 查詢陳述句中來訪問,
Catalog 使用戶能夠參考他們資料系統中的現有元資料,并自動將它們映射到 Flink 的相應元資料,例如,Flink 可以將 JDBC 表自動映射到 Flink 表,用戶不必在 Flink 中手動重寫 DDL,Catalog 大大簡化了用戶現有系統開始使用 Flink 所需的步驟,并增強了用戶體驗,
Flink Catalog 的結構
● Flink Catalog 原生結構
? GenericInMemoryCatalog:基于記憶體實作的 Catalog
? Jdbc Catalog:可以將 Flink 通過 JDBC 協議連接到關系資料庫,目前 Flink 在1.12和1.13中有不同的實作,包括 MySql Catalog 和 Postgres Catalog
? Hive Catalog:作為原生 Flink 元資料的持久化存盤,以及作為讀寫現有 Hive 元資料的介面
● Flink Iceberg Catalog
● Flink Hudi Catalog
HoodieCatalog、HoodieHiveCatalog


Flink Catalog 詳解
GenericInMemoryCatalog
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
defaultCatalog =
new GenericInMemoryCatalog(
defaultCatalogName, settings.getBuiltInDatabaseName());
CatalogManager catalogManager =
builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();
GenericInMemoryCatalog 所有的資料都保存在 HashMap 里面,無法持久化,
JDBC Catalog
CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);
USE CATALOG my_catalog;
如果創建并使用 Postgres Catalog 或 MySQL Catalog,請配置 JDBC 連接器和相應的驅動,
JDBC Catalog 支持以下引數:
? name:必填,Catalog 的名稱
? default-database:必填,默認要連接的資料庫
? username:必填,Postgres/MySQL 賬戶的用戶名
? password:必填,賬戶的密碼
? base-url: 必填,(不應該包含資料庫名)
對于 Postgres Catalog base-url 應為 "jdbc:postgresql://:" 的格式
對于 MySQL Catalog base-url 應為 "jdbc:mysql://:" 的格式
Hive Catalog
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'mydatabase',
'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

Iceberg Catalog
● Hive Catalog 管理 Iceberg 表
(Flink) default_database.flink_table ->
(Iceberg) default_database.flink_table
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
(Flink)default_database.flink_table ->
(Iceberg) hive_db.hive_iceberg_table
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'catalog-database'='hive_db',
'catalog-table'='hive_iceberg_table',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
● Hadoop Catalog 管理 Iceberg 表
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hadoop_prod',
'catalog-type'='hadoop',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
● 自定義 Catalog 管理 Iceberg 表
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='custom_prod',
'catalog-impl'='com.my.custom.CatalogImpl',
-- More table properties for the customized catalog
'my-additional-catalog-config'='my-value',
...
);
? connector:iceberg
? catalog-name:用戶指定的目錄名稱,這是必須的,因為連接器沒有任何默認值
? catalog-type:內置目錄的 hive 或 hadoop(默認為hive),或者對于使用 catalog-impl 的自定義目錄實作,不做設定
? catalog-impl:自定義目錄實作的全限定類名,如果 catalog-type 沒有被設定,則必須被設定,更多細節請參見自定義目錄
? catalog-database: 后臺目錄中的 iceberg 資料庫名稱,默認使用當前的 Flink 資料庫名稱
? catalog-table: 后臺目錄中的冰山表名,默認使用 Flink CREATE TABLE 句子中的表名
Hudi Catalog
create catalog hudi with(
'type' = 'hudi',
'mode' = 'hms',
'hive.conf.dir'='/etc/hive/conf'
);
--- 創建資料庫供hudi使用
create database hudi.hudidb;
--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
uuid INT,
ts INT,
num INT,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
select * from hudi.hudidb.orders_hudi;


Flink Catalog 在 ChunJun 中的實踐
下面將為大家介紹本文的重頭戲,Flink Catalog 在 ChunJun 中的實踐之路,
直接引入開源 Catalog
ChunJun 目前的所有 Catalog 為以下四種:

● Hive Catalog 需要的依賴

● Iceberg Catalog 需要的依賴

● JDBC Catalog
JDBC 因為 Flink 1.12 和 1.13 API 有變化,因此需要涉及原始碼的改動,改動一些 API 后,從原始碼引入,
● DT Catalog
結合內部業務,自定義的一種 Catalog ,下文將會進行詳細介紹,
DT Catalog -存盤元資料表設計
● 創建 mysql 元資料表 database_info
-- 創建表的 sql
create table database_info
(
`id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '專案ID',-- database id
`catalog_name` varchar(255) COMMENT 'catalog 名字',
`database_name` varchar(255) COMMENT 'database 名字',
`catalog_type` varchar(30) COMMENT 'catalog 型別, eg: mysql,oracle...',
`project_id` int(11) NOT NULL COMMENT '專案ID',
`tenant_id` int(11) NOT NULL COMMENT '租戶ID'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- 創建索引
CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);
● 創建 mysql 元資料表 table_info
-- 創建表的 sql
create table table_info
(
`id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT,
`database_id` bigint COMMENT 'database_info 表的 id',
`table_name` varchar(255) COMMENT '表名',
`project_id` int(11) NOT NULL COMMENT '專案ID',
`tenant_id` int(11) NOT NULL COMMENT '租戶ID'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- 創建索引
CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`);
CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);
● 創建 mysql 元資料表 properties_info
create table properties_info
(
`id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT ,
`table_id` bigint(20) COMMENT 'table_info 表的 id',
`key` varchar(255) COMMENT '表的屬性 key',
`value` varchar(255) COMMENT '表的屬性 value'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE INDEX idx_table_id ON properties_info (table_id);
● properties_info 里面存了什么?
schema.0.name=id,
schema.0.data-type=INT NOT NULL,
schema.1.name=name,
schema.1.data-type=VARCHAR(2147483647)
schema.2.name=age,
schema.2.data-type=BIGINT,
schema.primary-key.name=PK_3386,
schema.primary-key.columns=id,
connector=jdbc,
url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false,
username=drpeco,
password=DT@Stack#123,
comment=,
scan.auto-commit=true,
lookup.cache.max-rows=20000,
scan.fetch-size=10,
lookup.cache.ttl=700000
table-name=t2,
使用 DT Catalog
● 創建 DT Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://xxx:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);

● 創建 Database
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
Drop a database with the given database name. If the database to drop does not exist, an exception is thrown.
IF EXISTS
If the database does not exist, nothing happens.
RESTRICT
Dropping a non-empty database triggers an exception. Enabled by default.
CASCADE
Dropping a non-empty database also drops all associated tables and functions.
create database if not exists catalog1.database1
drop database if exists catalog1.database1
-- 洗掉非空資料庫,連通資料庫中的所有表也一起洗掉
drop database if exists catalog1.database1 CASCADE
● 創建 Table
1)Rename Table
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
Rename the given table name to another new table name
2)Set or Alter Table Properties
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.
-- 創建表
CREATE TABLE if not exists catalog1.default_database.table1
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);
-- 洗掉表
drop table if exists mysql_catalog2.wujuan_database2.wujuan_table
-- 重命名表名
ALTER TABLE catalog1.default_database.table1 RENAME TO table2;
-- 設定表屬性
ALTER TABLE catalog1.default_database.table1
SET (
'tablename'='t2',
'url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false'
)
使用 DTCatalog 的具體場景和實作原理
● 全部是 DDL,只有 Catalog 的創建
CREATE CATALOG catalog1
WITH (
'type' = 'DT',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
```
· 可以執行,但是沒有意義,ChunJun 不會存盤 Catalog 資訊,只有平臺存盤;
· 不支持語法校驗,
● 全部是 DDL,包含 Catalog、Database、Table 的創建
-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
-- 創建資料庫
create database if not exists database1
-- 創建表
CREATE TABLE if not exists catalog1.default_database.table1
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);
· 無論創建資料庫、表,洗掉資料庫、表,必須包含 create catalog 陳述句;
· 可以執行,可以創建資料庫和表;
· 不支持語法校驗,
// 拋出例外的邏輯
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv);
TableResult execute = statementSet.execute(); -->
tableEnvironment.executeInternal(operations); -->
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); -->
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->
// 拋出例外的方法
public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations){
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot generate StreamGraph.");
}
...
return generator.generate();
}
// 如果沒有 insert 陳述句的時候,無法生成 JobGraph,但是 DDL 是執行成功的,
// 因此捕獲 FlinkX 拋出的特殊例外,此陳述句的例外 Message 是 FlinkX 里面處理的,
try {
PackagedProgramUtils.createJobGraph(program, flinkConfig, 1, false);
} catch (ProgramInvocationException e) {
// 僅執行 DDL FlinkX 拋出的例外
if (!e.getMessage().contains("OnlyExecuteDDL")) {
throw e;
}
}

● DDL + DML,包含 create + insert 陳述句
1)初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
2.1)創建資料庫
create database if not exists database1
2.2)創建源表
CREATE TABLE if not exists catalog1.default_database.table1
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);
3.1)創建資料庫
create database if not exists catalog1.database2;
3.2)創建結果表
CREATE TABLE if not exists catalog1.database2.table2
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'print'
);
4)執行任務
insert into catalog1.database2.table2 select * from catalog1.database1.table1
· 不可以執行,可以提交;
· 支持語法校驗,
● DML,只有 Insert 陳述句
-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
-- 執行任務
insert into catalog1.database2.table2 select * from catalog1.database1.table1
· 如果 Catalog 的 資料庫和表都已經創建好了,那么直接寫 insert 就可以提交任務;
· 不可以執行,可以提交;
· 支持語法校驗,
《資料治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b
想了解或咨詢更多有關袋鼠云大資料產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠云官網:https://www.dtstack.com/?src=https://www.cnblogs.com/DTinsight/p/szbky
同時,歡迎對大資料開源專案有興趣的同學加入「袋鼠云開源框架釘釘技術qun」,交流最新開源技術資訊,qun號碼:30537511,專案地址:https://github.com/DTStack
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/549464.html
標籤:大數據
上一篇:hadoop學習記錄
