背景:我正在嘗試使用從 CSV 檔案讀取的兩個“大(r)”資料集/表(左表中有 16K 行,右表中略少)進行事件時間時間連接。這兩個表都是 append-only 表,即它們的資料源當前是 CSV 檔案,但將成為 Debezium 通過 Pulsar 發出的 CDC 更改日志。我正在使用相當新的SYSTEM_TIME AS OF語法。
問題:連接結果只是部分正確,即在查詢執行的開始(前 20% 左右),左側的行與右側的行不匹配,而理論上,它們應該. 幾秒鐘后,有更多匹配項,到查詢結束時,左側的行與右側的行正確匹配/連接。每次我運行查詢時,它都會根據哪些行(不)匹配顯示其他結果。
兩個資料集都沒有按各自的事件時間排序。它們按主鍵排序。所以這真的是這種情況,只有更多的資料。
本質上,右側是一個隨時間變化的查找表,我們確信對于每個左記錄都有一個匹配的右記錄,因為它們都是在 /- 同一時刻在原始資料庫中創建的。最終,我們的目標是一個動態物化視圖,其中包含與我們在啟用 CDC 的源資料庫 (SQL Server) 中連接 2 個表時相同的資料。
顯然,我想按照 Flink 檔案中的說明實作對完整資料集的正確連接
與簡單示例和 Flink 測驗代碼不同,只有幾行的小資料集(如這里),較大資料集的連接不會產生正確的結果。
我懷疑,當探測/左表開始流動時,構建/右表尚未“在記憶體中”,這意味著左行找不到匹配的右行,而它們應該 - 如果右表有稍早開始流動。這就是為什么left join右表的列回傳空值的原因。
我已經包含了我的代碼:
@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {
private final String emr01Ddl =
"CREATE TABLE EMR01\n"
"(\n"
" SRC_NO STRING,\n"
" JRD_ETT_NO STRING,\n"
" STT_DT DATE,\n"
" MGT_SLT_DT DATE,\n"
" ATM_CRT_DT DATE,\n"
" LTD_MDT_IC STRING,\n"
" CPN_ORG_NO STRING,\n"
" PTY_NO STRING,\n"
" REG_USER_CD STRING,\n"
" REG_TS TIMESTAMP,\n"
" MUT_USER_CD STRING,\n"
" MUT_TS TIMESTAMP(3),\n"
" WATERMARK FOR MUT_TS AS MUT_TS,\n"
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'filesystem',\n"
" 'path' = '" getCsv1() "',\n"
" 'format' = 'csv'\n"
")";
private final String emr02Ddl =
"CREATE TABLE EMR02\n"
"(\n"
" CPN_ORG_NO STRING,\n"
" DSB_TX STRING,\n"
" REG_USER_CD STRING,\n"
" REG_TS TIMESTAMP,\n"
" MUT_USER_CD STRING,\n"
" MUT_TS TIMESTAMP(3),\n"
" WATERMARK FOR MUT_TS AS MUT_TS,\n"
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'filesystem',\n"
" 'path' = '" getCsv2() "',\n"
" 'format' = 'csv'\n"
")";
@Test
public void testEventTimeTemporalJoin() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(emr01Ddl);
tableEnv.executeSql(emr02Ddl);
Table result = tableEnv.sqlQuery(""
"SELECT *"
" FROM EMR01"
" LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS"
" ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");
tableEnv.toChangelogStream(result).addSink(new TestSink());
env.execute();
System.out.println("[Count]" TestSink.values.size());
//System.out.println("[Row 1]" TestSink.values.get(0));
//System.out.println("[Row 2]" TestSink.values.get(1));
AtomicInteger i = new AtomicInteger();
TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " i.incrementAndGet() " ]=" value));
}
private static class TestSink implements SinkFunction<Row> {
// must be static
public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Row value, SinkFunction.Context context) {
values.add(value);
}
}
String getCsv1() {
try {
return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String getCsv2() {
try {
return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
有沒有辦法解決這個問題?例如,有沒有辦法首先將右側加載到 Flink 狀態,然后開始加載/流式傳輸左側?這會是一個好方法嗎,因為這個問題引出了:多晚?左側可以開始流動的時間是什么時候?
我們使用的是 Flink 1.13.3。
uj5u.com熱心網友回復:
這種時間/版本化連接取決于具有準確的水印。Flink 依靠水印來知道哪些行可以安全地從正在維護的狀態中洗掉(因為它們不再影響結果)。
您使用的水印表明行按 排序MUT_TS。由于這不是真的,連接無法產生完整的結果。
為了解決這個問題,水印應該像這樣定義
WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
其中區間表示需要容納多少無序。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/382369.html
標籤:加入 apache-flink 颞
上一篇:全聯盟候補
