CDC
CDC 是 Change Data Capture(變更資料獲取)的簡稱,核心思想是,監測并捕獲資料庫的變動(包括資料或資料表的插入、更新以及洗掉等),將這些變更按發生的順序完整記錄下來,寫入到訊息中間件中以供其他服務進行訂閱及消費,
CDC 的種類
CDC 主要分為基于查詢和基于 Binlog 兩種方式,我們主要了解一下這兩種之間的區別:
| 基于查詢的 CDC | 基于 Binlog 的 CDC | |
|---|---|---|
| 開源產品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
| 執行模式 | Batch | Streaming |
| 是否可以捕獲所有資料變化 | 否 | 是 |
| 延遲性 | 高延遲 | 低延遲 |
| 是否增加資料庫壓力 | 是 | 否 |
FlinkCDC
Flink 社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等資料庫直接讀取【全量資料】和【增量變更資料】的 source 組件,而不需要使用類似 Kafka 之類的中間件中轉資料
目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors


| Connector | Database | Driver |
|---|---|---|
| mongodb-cdc | MongoDB: 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
| mysql-cdc | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 |
JDBC Driver: 8.0.27 |
| oceanbase-cdc | OceanBase CE: 3.1.x OceanBase EE (MySQL mode): 2.x, 3.x |
JDBC Driver: 5.1.4x |
| oracle-cdc | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
| postgres-cdc | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
| sqlserver-cdc | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
| tidb-cdc | TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
| db2-cdc | Db2: 11.5 | DB2 Driver: 11.5.0.0 |
DataStream:
- 優點: 多庫多表
- 缺點: 需要自定義反序列化器(但靈活)
FlinkSQL: - 優點: 不需要自定義反序列化器
- 缺點: 單表
Demo
注意開啟 binlog_format=ROW
my.ini
log-bin=mysql-bin
#binlog_format="STATEMENT"
binlog_format="ROW"
#binlog_format="MIXED"
#service-id=1

POM
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
基于 DataStream
CustomerDeserialization.java
package com.vipsoft;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封裝的資料格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d",
* //"ts":156456135615
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.創建JSON物件用于存盤最終資料
JSONObject result = new JSONObject();
//2.獲取庫名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = https://www.cnblogs.com/vipsoft/p/(Struct) sourceRecord.value();
//3.獲取"before"資料
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = https://www.cnblogs.com/vipsoft/p/before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.獲取"after"資料
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = https://www.cnblogs.com/vipsoft/p/after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.獲取操作型別 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.將欄位寫入JSON物件
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.輸出資料
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
FlinkCDC.java
package com.vipsoft;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.獲取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通過FlinkCDC構建SourceFunction并讀取資料
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.serverTimeZone("GMT+8") //時區報錯增加這個設定
.port(3306)
.username("root")
.password("110")
.databaseList("springboot")
.tableList("springboot.sys_user") //如果不添加該引數,則消費指定資料庫中所有表的資料.如果指定,指定方式為db.table
//.deserializer(new StringDebeziumDeserializationSchema())
.deserializer(new CustomerDeserialization()) //使用自定義反序列化器
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.列印資料
streamSource.print();
//4.啟動任務
env.execute("FlinkCDC");
}
}
運行效果
- 默認 StringDebeziumDeserializationSchema

- 自定義反序列化器

FlinkSQL
package com.vipsoft;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkCDCWithSQL {
public static void main(String[] args) throws Exception {
//1.獲取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.DDL方式建表
tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
" id STRING NOT NULL, " +
" username STRING, " +
" nick_name STRING " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '110', " +
" 'database-name' = 'springboot', " +
" 'table-name' = 'sys_user' " +
")");
//3.查詢資料
Table table = tableEnv.sqlQuery("select * from mysql_binlog");
//4.將動態表轉換為流
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
//5.啟動任務
env.execute("FlinkCDCWithSQL");
}
}
運行效果

對比
通過對比,FlinkCDC 最舒服
| FlinkCDC | Maxwell | Canal | |
|---|---|---|---|
| 斷點續傳 | CK | MySQL | 本地磁盤 |
| SQL -> 資料 | 無 | 無 | 一對一(炸開處理) |
| 初始化功能 | 有(多庫多表) | 有(單表) | 無(單獨查詢歷史資料) |
| 封裝格式 | 自定義 | JSON | JSON(c/s自定義) |
| 高可用 | 運行集群高可用 | 無 | 集群(ZK) |
插入對比
插入兩條資料
INSER INTO z_user_info VALUES(30,'zhang3','13800000000'),(31,'li4','13999999999')

FlinkCDC 每條變化都會產生一條 json

Maxwell 每條變化都會產生一條 json

Canal 一次性執行的SQL,會產生一條JSON(兩條資料組合在一起)【不方便,需要炸開決議】

更新對比
UPDATE z_user_info SET user_name='wang5' WHERE id IN(30,31)
FlinkCDC 包括了修改前的 before 資料

Maxwell 不包括修改前的資料

Canal 仍然是一條json

洗掉對比
DELETE FROM z_user_info WHERE id IN(30,31)
FlinkCDC 兩條洗掉的 json 資料

Maxwell

Canal

【尚硅谷】Flink資料倉庫視頻教程
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/539566.html
標籤:大數據
