??昨天學習完一些比較基礎的Datastream的API后,讓我們繼續學習Datastream上一層的Table API和SQL,這兩個API都是處理關系型資料的,可以降低flink的開發門檻,對往期內容感興趣的同學可以參考如下內容👇:
- hadoop專題: hadoop系列文章.
- spark專題: spark系列文章.
- flink專題: Flink系列文章.
💦Table API和SQL可以通過一種更加直觀的方式對資料流進行多種處理,比如選擇、過濾、分組、求和以及多表連接,也支持視窗操作,讓我們開始今日份的學習吧!
目錄
- 1. 開發環境的構建
- 2. 創建TableEnvironment
- 3. 連接外部資料源
- 3.1 連接檔案系統的檔案
- 3.2 連接訊息佇列kafka
- 3.3 連接ElasticSearch(ES)
- 3.4 Table API根據SQL建表
- 4. Table的操作
- 4.1 from_path
- 4.2 from_elements
- 4.3 select
- 4.4 as
- 4.5 where和filter
- 4.6 add_columns
- 4.7 add_or_replace_columns
- 4.8 drop_column
- 4.9 rename_columns
- 4.10 group_by
- 4.11 Window group_by
- 4.12 over_window
- 4.13 distinct
- 4.14 Inner Join
- 4.15 Outer Join
- 5. 時間概念
- 5.1 Watermark生成策略
- 6. 總結
- 7. 參考資料
1. 開發環境的構建
我們在上一章DataStream API已經介紹過一遍,這里仔簡單介紹一下:
- 準備一臺帶java8或11、python(3.6,3.7,3.8)的機器
- 寫入命令👇
python3 -m pip install apache-flink
TableEnvironment的開發環境構建完成,后續就可以在IDEA中進行Table API和SQL的編碼作業了,
2. 創建TableEnvironment
開發Table API和SQL必須先宣告執行背景關系環境為TableEnvironment,它是一個介面,主要有如下功能:
- 創建 Table
- 將 Table 注冊成臨時表
- 執行 SQL 查詢,更多細節可查閱 SQL
- 注冊用戶自定義的 (標量,表值,或者聚合) 函式
- 配置作業
- 管理 Python 依賴
- 提交作業執行
例如:創建流和批的TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
#創建流TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# 創建批TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
3. 連接外部資料源
Flink Table API和SQL可以通過Connector與外部檔案系統進行互動,用于流批資料的讀取操作,外部檔案系統包含傳統的關系型資料庫、K-V記憶體資料庫、訊息佇列以及分布式檔案系統等,
3.1 連接檔案系統的檔案
# 環境配置
t_env = TableEnvironment.create(
environment_settings=EnvironmentSettings.in_batch_mode())
#連接檔案系統檔案
create table test01(
id int,
name string
)WITH(
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/Users/liuxiaocong/data01.csv'
)
其中WITH(…)中的內容為外部連接器的配置,不同的外部連接器涉及的配置不同,且必選的和可選的配置都不同,
解釋:‘connector.type’ = ‘filesystem’說明外部連接器的型別為檔案系統,‘connector.path’ =’/Users/liuxiaocong/data01.csv’指定了外部檔案系統連接器的檔案路徑,‘format.type’ = 'csv’則說明檔案系統的資料格式為CSV,
3.2 連接訊息佇列kafka
create table test02(
id int,
name string
)WITH(
'connector. type' = 'kafka'
'connector.version'='0.11'
'connector. topic'='topic name'
'connector.properties.bootstrap. servers'='localhost:9092'
'connector.properties.group.id'='my group id'
'connector.startup-mode'='earliest-offset'
'format.type '='json')
解釋:‘connector.type’ = 'kafka’表示此外部連接器的型別為Kafka,‘connector.topic’ = 'topic_name’指定Kafka訊息主題,‘connector.properties.bootstrap.servers’ = 'localhost:9092’指定Kafka服務的地址,‘format.type’ = 'json’指定資料格式為JSON,
3.3 連接ElasticSearch(ES)
create table test02(
id int,
name string
)
WITH
(
'connector. type'='elasticsearch',
'connector.version'='7',
'connector. hosts'='http://hostl:9092;http://host2:9093',
'connector. index'='myusers',
'connector. document-type'='user',
'format. type'='json',
'update-mode'='append',
'connector. key-delimiter'='$',
'connector. key-null-literal'='n/a',
'connector. failure-handler'='fail',
'connector. flush-on-checkpoint'='true',
'connector. bulk-flush.max-actions'='42',
'connector.bulk-flush.max-size'='42 mb',
'connector.bulk-flush. interval'='60000',
'connector. bulk-flush.backoff.type'='disabled',
'connector.bulk-flush.backoff.max-retries'='3',
'connector, bulk-flush.backoff.delay'='30000',
'connector. connection-max-retry-timeout'='3'
解釋:connector.type’ = 'elasticsearch’表示此外部連接器的型別為Elasticsearch,‘connector.hosts’='http://host1:9092;http://host2:9093’指定Elasticsearch服務的地址,‘format.type’ ='json’指定資料格式為JSON,
3.4 Table API根據SQL建表
不知道大家看完上面有沒有產生疑惑,上面都是SQL和Table有啥關系,這是個好問題,execute_sql可來執行Create Table相關DDL陳述句,來獲取Table,
例如:
from pyflink.table import *
# 環境配置
t_env = TableEnvironment.create(
environment_settings=EnvironmentSettings.in_batch_mode())
# 在表環境中注冊 Orders 表
source_data_path = "/Users/liuxiaocong/test/"
source_ddl = f"""
create table Orders(
a VARCHAR,
b BIGINT,
c BIGINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{source_data_path}'
)
"""
#execute_sql建表
t_env.execute_sql(source_ddl)
#獲取Orders表
orders = t_env.from_path("Orders")
4. Table的操作
下面將介紹一些比較常見的操作,以下所有操作都使用python撰寫,并都支持Streaming和Batch
4.1 from_path
和 SQL 查詢的 FROM 子句類似,執行一個注冊過的表的掃描,
orders = t_env.from_path("Orders")
4.2 from_elements
和 SQL 查詢中的 VALUES 子句類似, 基于提供的行生成一張行內表,
#1.不指定型別,自己猜
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')])
#2.指定型別
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')],
schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD('name', DataTypes.STRING())]))
4.3 select
和 SQL 的 SELECT 子句類似, 執行一個選擇操作,
#1.select基礎用法
orders = t_env.from_path("Orders")
result = orders.select(orders.a, orders.c.alias('d'))
#2.也可以select*
from pyflink.table.expressions import col
result = orders.select(col("*"))
4.4 as
重命名欄位
orders = t_env.from_path("Orders")
result = orders.alias("x, y, z, t")
4.5 where和filter
和 SQL 的 WHERE 子句類似, 過濾掉未驗證通過過濾謂詞的行,
#1.where
orders = t_env.from_path("Orders")
result = orders.where(orders.a == 'red')
#2.filter
orders = t_env.from_path("Orders")
result = orders.filter(orders.a == 'red')
4.6 add_columns
執行欄位添加操作, 如果所添加的欄位已經存在,將拋出例外,
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_columns(concat(orders.c, 'sunny'))
4.7 add_or_replace_columns
執行欄位添加操作, 如果添加的列名稱和已存在的列名稱相同,則已存在的欄位將被替換, 此外,如果添加的欄位里面有重復的欄位名,則會使用最后一個欄位,
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
4.8 drop_column
洗掉某列
orders = t_env.from_path("Orders")
result = orders.drop_columns(orders.b, orders.c)
4.9 rename_columns
執行欄位重命名操作, 欄位運算式應該是別名運算式,并且僅當欄位已存在時才能被重命名,
orders = t_env.from_path("Orders")
result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
4.10 group_by
和 SQL 的 GROUP BY 子句類似, 使用分組鍵對行進行分組,使用伴隨的聚合算子來按照組進行聚合行,
orders = t_env.from_path("Orders")
result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
4.11 Window group_by
使用分組視窗結合單個或者多個分組鍵對表進行分組和聚合,
from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col
orders = t_env.from_path("Orders")
result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \
.group_by(orders.a, col('w')) \
.select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))
4.12 over_window
和 SQL 的 OVER 子句類似,所有的聚合必須定義在同一個視窗上,比如同一個磁區、排序和范圍內,目前只支持 PRECEDING 到當前行范圍(無界或有界)的視窗,尚不支持 FOLLOWING 范圍的視窗,ORDER BY 操作必須指定一個單一的時間屬性,
from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
orders = t_env.from_path("Orders")
result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
.preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
.alias("w")) \
.select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))
4.13 distinct
和 SQL DISTINCT 聚合子句類似,例如 COUNT(DISTINCT a), Distinct 聚合宣告的聚合函式(內置或用戶定義的)僅應用于互不相同的輸入值.
#1.distinct和聚合函式一起用
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
orders = t_env.from_path("Orders")
# 按屬性分組后的的互異(互不相同、去重)聚合
group_by_distinct_result = orders.group_by(orders.a) \
.select(orders.a, orders.b.sum.distinct.alias('d'))
# 按屬性、時間視窗分組后的互異(互不相同、去重)聚合
group_by_window_distinct_result = orders.window(
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \
.select(orders.a, orders.b.sum.distinct.alias('d'))
# over window 上的互異(互不相同、去重)聚合
result = orders.over_window(Over
.partition_by(orders.a)
.order_by(orders.rowtime)
.preceding(UNBOUNDED_RANGE)
.alias("w")) \
.select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))
#2.單獨去重
orders = t_env.from_path("Orders")
result = orders.distinct()
4.14 Inner Join
和 SQL 的 JOIN 子句類似,關聯兩張表,兩張表必須有不同的欄位名,并且必須通過 join 算子或者使用 where 或 filter 算子定義至少一個 join 等式連接謂詞,
from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e)
4.15 Outer Join
和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句類似, 關聯兩張表, 兩張表必須有不同的欄位名,并且必須定義至少一個等式連接謂詞,
from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
# 左連接
left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#右鏈接
right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#全連接
full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
5. 時間概念
在前面我們提到過Flink可以分為事件時間和處理時間,同樣地,在Flink Table API和SQL中,也有時間的概念,需要注意的點如下:
- 在Table中,時間概念需要通過Table的時間屬性(Time Attributes)來進行確定,它是Table Schema的一部分,
- Table的時間屬性可以用Create Table DDL語法或從DataStream創建表時進行定義,一旦定義了時間屬性,就可以參考該欄位,
為了對亂序事件資料進行處理,Table API和SQL程式需要知道每一行事件資料的時間戳,并且還需要定期來生成水位線Watermark,目前可以在CREATE TABLE DDL或DataStream到Table的轉換程序中定義事件時間屬性以及Watermark生成策略,
5.1 Watermark生成策略
- 嚴格的單調遞增水印
-- 以當前事件資料中最大的時間戳為水印進行發送,當事件資料中的時間戳比水位印大時,則視為非遲到資料,
WATERMARK FOR rowtime_column AS rowtime_column
- 單調遞增水印
-- 以當前事件資料中最大的時間戳減去1毫秒為水印進行發送,當事件資料中的時間戳比水印大或者相等時,則視為非遲到資料,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
- 固定延遲水印
--當前事件資料中最大的時間戳減去一個固定的延遲時間長度為水印進行發送”
WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND
6. 總結
本章節本來是想將flink的Table API和SQL一起寫了,可是發現光是Table API的內容就夠多了,而且Table API這一部分中的流的轉換、視窗操作、與pandas的轉換等都還沒講述,后續將繼續補充,
7. 參考資料
《PyDocs》(pyflink官方檔案)
《Flink入門與實戰》
《Kafka權威指南》
《Apache Flink 必知必會》
《Apache Flink 零基礎入門》
《Flink 基礎教程》
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/428568.html
標籤:其他
下一篇:Flink CEP結合案例詳解
