flink cdc sql 開發模板
flink cdc sql 讀mysql的binlog日志,實時同步到mysql開發模板
使用flink cdc前提條件:讀取目標庫的用戶必須開啟binlog權限
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ysservice-flink</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>ysservice-flink-batch</module>
<module>ysservice-flink-streaming</module>
<module>ysservice-flink-warehouse</module>
<module>ysservice-flink-datapush</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<encoding>UTF-8</encoding>
<flink.version>1.13.2</flink.version>
<scala.tools.version>2.11</scala.tools.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.0-cdh6.3.1</spark.version>
<hadoop.version>3.0.0-cdh6.3.1</hadoop.version>
<!--<hbase.version>1.2.0-cdh5.16.2</hbase.version>-->
<mysql.version>5.1.47</mysql.version>
<druid.version>1.2.3</druid.version>
<!--<redis.version>2.9.0</redis.version>>-->
<!--<ipaddress.version>5.3.3</ipaddress.version>-->
<junit.version>4.12</junit.version>
<fastjson.version>1.2.73</fastjson.version>
<httpclient.version>4.5.13</httpclient.version>
<logback.version>1.2.3</logback.version>
<log4j-over-slf4j.version>1.7.30</log4j-over-slf4j.version>
</properties>
<repositories>
<!-- 阿里云倉庫 -->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<!-- CDH倉庫 -->
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<!-- <!– https://mvnrepository.com/artifact/org.apache.flink/flink-core –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-core</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <!– https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- web ui的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <!– https://mvnrepository.com/artifact/org.apache.flink/flink-scala –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-scala_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.11.1</version>
</dependency>-->
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.level}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <type>pom</type>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <!– Either... –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-api-java-bridge_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <!– or... –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-api-scala-bridge_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-cep_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-json</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-3-uber</artifactId>
<version>3.1.1.7.2.9.0-173-9.0</version>
<scope>provided</scope>
</dependency>
<!-- <!– <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-jdbc_2.12</artifactId>-->
<!-- <version>1.10.2</version>-->
<!-- </dependency>–>-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.housepower</groupId>-->
<!-- <artifactId>clickhouse-native-jdbc</artifactId>-->
<!-- <version>1.6-stable</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.kudu</groupId>-->
<!-- <artifactId>kudu-client</artifactId>-->
<!-- <version>1.5.0</version>-->
<!-- </dependency>-->
<!--<!– <dependency>–>-->
<!--<!– <groupId>com.aliyun</groupId>–>-->
<!--<!– <artifactId>flink-connector-clickhouse</artifactId>–>-->
<!--<!– <version>1.12.0</version>–>-->
<!--<!– </dependency>–>-->
<!-- <dependency>-->
<!-- <groupId>ru.yandex.clickhouse</groupId>-->
<!-- <artifactId>clickhouse-jdbc</artifactId>-->
<!-- <version>0.2.6</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<!-- <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>-->
</dependencies>
<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
log4j.properties
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=WARN
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=WARN
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=WARN
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
package com.ysservice;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.ysservice.utils.MyCheckpoint;
import com.ysservice.utils.SystemConstants;
import com.ysservice.yszt.owner.yszt_owner_customer_base_info;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.log4j.Logger;
import static org.apache.flink.api.common.time.Time.seconds;
/**
* @Description:用flink cdc同步mysql資料
* @author: WuBo
* @date:2021/10/19 15:21
*/
public class TestDemo {
public static void main(String[] args) throws Exception {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//創建tableEnv
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//開啟Checkpoint
env.enableCheckpointing(60*1000);//開啟chechPoint,每60秒記錄一次中間狀態
env.getCheckpointConfig().setCheckpointTimeout(60*1000);//記錄狀態的超時時間為60秒
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//chechPoint最多失敗次數,因為Flink CDC Connector 在初始的全量快照同步階段,會屏蔽掉快照的執行
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//保存狀態的型別的精準一次
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, seconds(60), seconds(2)));//60秒內報錯5次,終止程式,每次重啟間隔2秒
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//停止任務時,保留Checkpoint
//創建flink cdc的輸入表, datatime 的欄位型別要改成 timestamp,否則會有時區問題
tableEnv.executeSql("CREATE TABLE Data_Input (" +
" ID bigint," + //欄位型別
" PROJECT_ID bigint," + //欄位型別
" PROJECT_CODE STRING," + //欄位型別
" PROJECT_NAME STRING," + //欄位型別
" AMOUNT decimal(20,2)," + //欄位型別
" ACTUAL_TYPE STRING," + //欄位型別
" TYPE_NAME STRING," + //欄位型別
" CREATED_AT timestamp," + //欄位型別
" CREATED_MAN STRING," + //欄位型別
" UPDATED_AT timestamp," + //欄位型別
" UPDATED_MAN STRING," + //欄位型別
" PRIMARY KEY (`ID`) NOT ENFORCED " + //mysql表的主鍵,這個必須設定,否則不能無鎖分布式讀取和切塊
") WITH (" +
" 'connector' = 'mysql-cdc'," + //connector型別:mysql-cdc
" 'hostname' = '"+ SystemConstants.dataInput_hostname_test +"'," + //MySQL的hostname,此處用的組態檔獲取
" 'port' = '3306'," +
" 'username' = '"+ SystemConstants.dataInput_username_test +"'," + //MySQL的username,此處用的組態檔獲取
" 'password' = '"+ SystemConstants.dataInput_password_test +"'," + //MySQL的password,此處用的組態檔獲取
" 'database-name' = 'test'," + //要讀取的庫名
" 'table-name' = 'OUT_NORM_RULE_LIBRARY'," + //要讀取的表名
//" 'scan.startup.mode' = 'latest-offset'," +
" 'scan.incremental.snapshot.enabled' = 'true'" + //增量式快照啟動,啟用后可以無鎖分布式讀表,默認啟用
")");
//創建輸出表
tableEnv.executeSql("CREATE TABLE Data_Output (" +
" ID bigint," +
" PROJECT_ID bigint," +
" PROJECT_CODE STRING," +
" PROJECT_NAME STRING," +
" AMOUNT decimal(20,2)," +
" ACTUAL_TYPE STRING," +
" TYPE_NAME STRING," +
" CREATED_AT timestamp," +
" CREATED_MAN STRING," +
" UPDATED_AT timestamp," +
" UPDATED_MAN STRING," +
" PRIMARY KEY (`ID`) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," + //輸出表使用jdbc connector輸出到mysql
" 'url' = '"+ SystemConstants.dataOutput_url_datapush_out +"'," +
" 'username' = '"+ SystemConstants.dataOutput_username_datapush_out +"'," +
" 'password' = '"+ SystemConstants.dataOutput_password_datapush_out +"'," +
" 'table-name' = 'OUT_NORM_RULE_LIBRARY2'" +
")");
//執行sql,執行sql時,flink會自動判斷過來的資料是插入還是洗掉(updata會變成兩條資料,先洗掉再插入),并且會自動判斷主鍵是否已經存在,存在就upsert
tableEnv.executeSql("INSERT INTO Data_Output (SELECT * FROM Data_Input)");
}
}
flink cdc 踩坑記錄:
以下總結都是基于flink 1.13.2 對應的 flink cdc 2.0的
- flink cdc 分兩種api代碼,一種是datastream api,一種是sql api,兩種api有較大的差異,在這總結一下兩種api的優劣勢:
datastream api優勢:可以讀多庫多表,代碼靈活
劣勢:只能單并行度讀表,且mysql的datatime型別和timestamp的資料讀出來有時區問題,而且程式啟動時,需要reload鎖表權限去做全量快照,會短暫的鎖表,而且不能做Checkpoint
sql api 優勢:可以多并行度的讀表,且不需要鎖表,定義資料型別時將datatime定義為timestamp型別,也能避免時區的問題,還能做Checkpoint
劣勢:只能讀取單表
2.datastream api作業在掃描 MySQL 全量資料時,checkpoint 超時,出現作業 failover
原因:Flink CDC 在 scan 全表資料,而在 scan 全表程序中是沒有 offset 可以記錄的(意味著沒法做 checkpoint),但是 Flink 框架任何時候都會按照固定間隔時間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的程序中,會讓執行中的 checkpoint 一直等待甚至超時,超時的 checkpoint 會被仍未認為是 failed checkpoint,默認配置下,這會觸發 Flink 的 failover 機制,而默認的 failover 機制是不重啟,所以會造成上面的現象
解決辦法:配置 failed checkpoint 容忍次數,以及失敗重啟策略
3.datastream api執行時報鎖權限問題
原因: 由于使用的 mysql 用戶未授權 RELOAD 權限,導致無法獲取全域讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會退化成表級讀鎖,而使用表級讀鎖需要等到全表 scan 完,才能釋放鎖,所以會發現持鎖時間過長的現象,影響其他業務寫入資料,
解決方法:給使用的 MySQL 用戶授予 RELOAD 權限即可
4.sql api 正常提交任務后,只讀全量資料,不讀增量資料
原因:sql api在分布式全量讀表完成后需要做一次全量的checkpoint,因為checkpoint未開啟,導致無法進行下一步讀取增量資料
解決方法:開啟checkpoint還有輸入表和輸出表的binlog權限
5.mysql的datatime和timestamp資料型別時區問題
在使用datastream api讀出來的datatime型別資料,會將年月日的資料型別讀成時間戳的型別,那是因為binlog在存盤datatime資料型別時,就是用時間戳的形式存盤的,且該時間搓有時區問題,和現實時間差8小時,timestamp型別的資料讀出來雖然不是時間戳型別的,但是依然會有8小時的時區差異,所以在使用datastream api時需要手動進行時區轉換(datastream api目前沒有找到其他解決方案)
但使用sql api時,讀取datatime型別的資料時,只需要將該欄位型別定義為timestamp去讀取,就能解決時區和時間戳的問題,timestamp型別的資料正常讀取即可,但是在使用sql api寫入mysql時,需要在輸出庫中配置一下時區為+8:00,避免寫入時造成時區問題,否則時間會相差12-13小時
6.運行flink任務時,flink輸出的日志為空
原因:log4j jar包沖突
解決方法:將專案的log4j依賴全部排除掉,因為flink有自帶的log4j jar包,我們再上傳log4j jar包很容易造成jar包沖突
7.idea本地依賴中的 flink-table-planner-blink依賴 和 flink集群上的 table api jar包沖突
在idea本地執行時需要將該jar包依賴放開,在打包到集群上運行時又需要將該依賴provided
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/342046.html
標籤:其他
