我正在嘗試在 Flink 中實作一個事件時間時間連接。這是第一個連接表:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka ("
"`timestamp` TIMESTAMP_LTZ(3),"
"`area` STRING,"
"`networkEdge` STRING,"
"`vehiclesNumber` BIGINT,"
"`averageSpeed` INTEGER,"
"WATERMARK FOR `timestamp` AS `timestamp`"
") WITH ("
"'connector' = 'kafka',"
"'topic' = 'seneca.trafficdata.aggregated',"
"'properties.bootstrap.servers' = 'localhost:9092',"
"'properties.group.id' = 'traffic-data-aggregation-job',"
"'format' = 'json',"
"'json.timestamp-format.standard' = 'ISO-8601'"
")");
該表用作以下查詢的接收器:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
這是另一個連接表。我使用 Debezium 將 Postgres 表流式傳輸到 Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka ("
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL,"
"`urn` STRING,"
"`flow_rate` INTEGER,"
"PRIMARY KEY(`urn`) NOT ENFORCED,"
"WATERMARK FOR `timestamp` AS `timestamp`"
") WITH ("
"'connector' = 'kafka',"
"'topic' = 'seneca.network.transport_network_edge',"
"'scan.startup.mode' = 'latest-offset',"
"'properties.bootstrap.servers' = 'localhost:9092',"
"'properties.group.id' = 'traffic-data-aggregation-job',"
"'format' = 'debezium-json',"
"'debezium-json.schema-include' = 'true'"
")");
最后是臨時連接:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, "
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka "
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` "
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
我遇到的問題是連接僅在前幾秒鐘(在 Postgres 表中更新后)有效,但我需要不斷地將第一個表與 debezium 連接起來。難道我做錯了什么?謝謝euks
uj5u.com熱心網友回復:
使用您正在使用的 AS OF 語法的臨時連接需要:
- 具有有效事件時間屬性的僅附加表
- 具有主鍵和有效事件時間屬性的更新表
- 主鍵上的相等謂詞
當 Flink SQL 的時間運算子應用于事件時間流時,水印在確定何時產生結果以及何時清除狀態方面起著至關重要的作用。
執行臨時連接時:
- 僅追加表中的行以 Flink 狀態緩沖,直到連接運算子的當前水印達到其時間戳
- 對于版本化表,對于每個鍵,時間戳在連接運算子當前水印之前的最新版本保持在狀態,加上當前水印之后的任何版本
- 每當連接運算子的水印前進時,就會產生新的結果,并清除不再相關的狀態
連接算子跟蹤它從其輸入通道接收到的水印,其當前水印始終是這兩個水印中的最小值。這就是為什么您的加入會停止,并且只有在 flow_rate 更新時才會取得進展。
解決此問題的一種方法是為 TransportNetworkEdge_Kafka 表設定水印,如下所示:
"WATERMARK FOR `timestamp` AS " Watermark.MAX_WATERMARK
這會將這個表/流的水印設定為最大的可能值,這將使來自該流的水印變得無關緊要——該流的水印永遠不會是最小的。
然而,這將具有使連接結果不確定的缺點。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/315238.html
