一、TICDC
在上篇文章中,我們介紹了使用TICDC 將資料同步至 Mysql 中,從上個任務就可以看出,TiCDC相比于Tidb binlog 在配制上就簡化了很多,而且我們也知道TICDC的性能也是優于 tidb binlog的,今天我們學習下使用TiCDC怎么將資料同步至下游Kafka中,以實作TIDB 到 ES、MongoDB、Redis等 NoSql 資料庫的同步,
上篇博客地址:
https://blog.csdn.net/qq_43692950/article/details/121731278
注意:使用TiCDC ,需將TIDB版本上級至 v4.0.6 以上,
二、TICDC 配制資料同步Kafka
本篇文章接著上篇文章繼續講解,先看下現在的集群狀況:

還是上篇文章中我們擴容出的CDC-server,
在上篇文章中,我們已經創建了TIDB 到 mysql 資料同步的任務,現在我們再創建一個到Kafka的同步任務:
./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"
tidb-cdc:表示topic
kafka-version:下游 Kafka 版本號(可選,默認值 2.4.0,目前支持的最低版本為 0.11.0.2
kafka-client-id:指定同步任務的 Kafka 客戶端的 ID(可選,默認值為 TiCDC_sarama_producer_同步任務的 ID
partition-num:下游 Kafka partition 數量(可選,不能大于實際 partition 數量,如果不填會自動獲取 partition 數量,
protocol:表示輸出到 kafka 訊息協議,可選值有 default、canal、avro、maxwell、canal-json(默認值為 default
max-message-bytes:每次向 Kafka broker 發送訊息的最大資料量(可選,默認值 64MB
replication-factor:kafka 訊息保存副本數(可選,默認值 1
ca:連接下游 Kafka 實體所需的 CA 證書檔案路徑(可選)
cert:連接下游 Kafka 實體所需的證書檔案路徑(可選)
key:連接下游 Kafka 實體所需的證書密鑰檔案路徑(可選)

已經創建成功,
使用下面命令就可以看到,所有的任務:
./cdc cli changefeed list --pd=http://192.168.40.160:2379
或者查看我們任務的詳細情況:
./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2

三、SpringBoot Kafka監聽
添加POM依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application
server:
port: 8081
spring:
kafka:
# kafka服務器地址(可以多個)
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
bootstrap-servers: 192.168.40.1:9092
consumer:
# 指定一個默認的組名
group-id: kafkaGroup
# earliest:當各磁區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各磁區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該磁區下的資料
# none:topic各磁區都存在已提交的offset時,從offset后開始消費;只要有一個磁區不存在已提交的offset,則拋出例外
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 快取容量
buffer-memory: 524288
#失敗重試次數
retries: 3
# 服務器地址
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
消費者監聽事件
@Slf4j
@Component
public class Jms_Consumer {
@KafkaListener(topics = "tidb-cdc")
public void receive4(ConsumerRecord<?, ?> consumer) throws Exception {
System.out.println("tidb tidb-cdc Listener >> ");
JSONObject jsonObject = JSONObject.parseObject(new String(consumer.value()));
String type = jsonObject.getString("type");
String db = jsonObject.getString("database");
String table = jsonObject.getString("table");
String data = jsonObject.getString("data");
log.info("操作型別:{}",type);
log.info("資料庫:{}",db);
log.info("資料表:{}",table);
log.info("更新后資料:{}",data);
}
}
四、測驗資料同步
向TIDB中插入資料:
insert into user(name,age) value('bxc','25');
kafka接受JSON
{
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "INSERT",
"es": 1638698748819,
"ts": 0,
"sql": "",
"sqlType": {
"age": -5,
"id": -5,
"name": 12
},
"mysqlType": {
"age": "int",
"id": "int",
"name": "varchar"
},
"data": [{
"age": "25",
"id": "242219",
"name": "bxc"
}],
"old": [null]
}

更新資料:
update user set age=24 where name = 'bxc';
Kafka接受JSON
{
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "UPDATE",
"es": 1638699660093,
"ts": 0,
"sql": "",
"sqlType": {
"age": -5,
"id": -5,
"name": 12
},
"mysqlType": {
"age": "int",
"id": "int",
"name": "varchar"
},
"data": [{
"age": "24",
"id": "242216",
"name": "bxc"
}],
"old": [{
"age": "23",
"id": "242216",
"name": "bxc"
}]
}

洗掉資料:
delete from user where name = 'bxc';
Kafka接受JSON
{
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "DELETE",
"es": 1638699773943,
"ts": 0,
"sql": "",
"sqlType": {
"age": -5,
"id": -5,
"name": 12
},
"mysqlType": {
"age": "int",
"id": "int",
"name": "varchar"
},
"data": [{
"age": "25",
"id": "242218",
"name": "bxc"
}],
"old": [{
"age": "25",
"id": "242218",
"name": "bxc"
}]
}

五、擴展
停止同步任務:
./cdc cli changefeed pause --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2
洗掉同步任務
./cdc cli changefeed remove --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2

喜歡的小伙伴可以關注我的個人微信公眾號,獲取更多學習資料!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374653.html
標籤:其他
