一、TiDB Binlog
在上篇文章中我們介紹了使用TiDB Binlog將資料同步至下游的Mysql 中,本篇我們學習下使用TiDB Binlog工具將資料同步至Kafka中自定義業務邏輯,比如可以做TIDB和ES、MongoDB 或 Redis的資料同步,這功能就和Canal決議Mysql的binlog功能相差不大,如果還不了解TiDB Binlog工具的也可以參考我的上篇博客:
https://blog.csdn.net/qq_43692950/article/details/121597230
注意:在做實驗前,請確保已經配置好Kafka環境:不了解的可以參考下面一篇我的博客:
訊息中間件KafKa集群搭建與使用: https://blog.csdn.net/qq_43692950/article/details/110648852
二、TiDB Binlog 配置
在上篇文章中,我們使用tiup 擴容出了一個pump 和 一個 drainer,我們先看下現在的集群架構:

但上篇文章我們講解的是TIDB 到 Mysql之間的同步,如果換成Kafka只需修改下組態檔即可,但考慮到有些小伙伴可能沒有看過前面我們的系列教程,這里我們還是通過擴容的方式擴容出pump 和 drainer,如果已經安裝過pump 和 drainer,直接修改配置即可:
tiup cluster edit-config tidb-test
drainer_servers:
- host: 192.168.40.162
ssh_port: 22
port: 8249
deploy_dir: /tidb-deploy/drainer-8249
data_dir: /tidb-data/drainer-8249
log_dir: /tidb-deploy/drainer-8249/log
config:
syncer.db-type: kafka
syncer.to.kafka-addrs: 192.168.40.1:9092
syncer.to.kafka-version: 2.6.0
syncer.to.topic-name: tidb-test
arch: amd64
os: linux

修改上述kafka的指向即可,如果是kafka集群,用英文逗號隔開即可,
下面我們再講下擴容的方式,沒有安裝pump 和 drainer的就用看下面的方式:
撰寫擴容配置
vi scale-out-binlog.yaml
寫入以下內容:
pump_servers:
- host: 192.168.40.160
ssh_port: 22
port: 8250
deploy_dir: /tidb-deploy/pump-8250
data_dir: /tidb-data/pump-8250
log_dir: /tidb-deploy/pump-8250/log
config:
gc: 7
storage.stop-write-at-available-space: 200MB
arch: amd64
os: linux
drainer_servers:
- host: 192.168.40.162
ssh_port: 22
port: 8249
deploy_dir: /tidb-deploy/drainer-8249
data_dir: /tidb-data/drainer-8249
log_dir: /tidb-deploy/drainer-8249/log
config:
syncer.db-type: kafka
syncer.to.kafka-addrs: 192.168.40.1:9092
syncer.to.kafka-version: 2.6.0
syncer.to.topic-name: tidb-test
arch: amd64
os: linux
注意 storage.stop-write-at-available-space 這個引數表示存盤空間低于指定值時不再接收 binlog 寫入請求,默認為10G ,如果硬碟沒這么大,就調小一點,
開始擴容:
tiup cluster scale-out tidb-test scale-out-binlog.yaml -u root -p
等待一會就可以看到集群中已經有pump 和 drainer了:

下一步還要開啟TIDB的binglog配制:
tiup cluster edit-config tidb-test
修改 server_configs 的配制:
server_configs:
tidb:
binlog.enable: true
binlog.ignore-error: true

重新加載集群:
tiup cluster reload tidb-test
使用mysql 客戶端連接tidb,查看bnlog是否已經開啟:
show variables like "log_bin";

ON即為開啟狀態,
看下pump和drainer的狀態:
show pump status;

show drainer status;

狀態都為online在線狀態,
三、SpringBoot 訊息監聽客戶端
下載官方demo
https://github.com/pingcap/tidb-tools/tree/master/tidb-binlog/driver/example/kafkaReader
官方demo是直接用的Java Kafka Api,本篇我們使用SpringBoot 的 spring-kafka ,
下載之后需要將三個檔案復制到自己的SpringBoot專案中:

需要用這三個工具進行決議資料,不然決議出來的是亂碼,這點可以去Tidb的社區看下:
POM檔案引入的主要依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.9.1</version>
</dependency>
application配制資訊:
server:
port: 8080
spring:
kafka:
# kafka服務器地址(可以多個)
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
bootstrap-servers: 192.168.40.1:9092
consumer:
# 指定一個默認的組名
group-id: kafkaGroup
# earliest:當各磁區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各磁區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該磁區下的資料
# none:topic各磁區都存在已提交的offset時,從offset后開始消費;只要有一個磁區不存在已提交的offset,則拋出例外
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 快取容量
buffer-memory: 524288
#失敗重試次數
retries: 3
# 服務器地址
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
注意consumer.value-deserializer這個要使用ByteArrayDeserializer,主要發送端就是byte[],我們只能配合:
日志監聽:
@Slf4j
@Component
public class TidbConsumer {
@KafkaListener(topics = "tidb-test")
public void receive3(ConsumerRecord<String, byte[]> consumer) throws Exception {
System.out.println("tidb bing-log Listener >> ");
//binglog物件
BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(consumer.value());
//操作型別 0 DML 1 DDL
BinLogInfo.BinlogType type = binlog.getType();
log.info(binlog.toString());
log.info("操作型別:{} ", type);
//決議內容
if (BinLogInfo.BinlogType.DML == type) {
BinLogInfo.DMLData dmlData = binlog.getDmlData();
if (dmlData.getTablesCount() == 0) {
return;
}
dmlData.getTablesList().forEach(table -> {
String db = table.getSchemaName();
log.info("更新資料庫:{}", db);
String tableName = table.getTableName();
log.info("更新資料表:{}", tableName);
List<BinLogInfo.ColumnInfo> columnInfoList = table.getColumnInfoList();
List<BinLogInfo.TableMutation> MutationsList = table.getMutationsList();
MutationsList.forEach(mutation -> {
BinLogInfo.MutationType mutationType = mutation.getType();
log.info("操作型別:{}", mutationType);
List<BinLogInfo.Column> columnsList = mutation.getRow().getColumnsList();
//決議更新后的資料
for (int i = 0; i < columnInfoList.size(); i++) {
String filedName = columnInfoList.get(i).getName();
log.info("欄位:{} ,更新后的值:{} ", filedName, columnsList.get(i));
}
});
});
} else if (BinLogInfo.BinlogType.DDL == type) {
BinLogInfo.DDLData ddlData = binlog.getDdlData();
String db = ddlData.getSchemaName();
String tableName = ddlData.getTableName();
String ddlSql = ddlData.getDdlQuery().toStringUtf8();
log.info("更新資料庫:{}",db);
log.info("更新資料表:{}", tableName);
log.info("DDL :{}", ddlSql);
} else {
throw new Exception("analysis binglog err!");
}
}
}
四、測驗
測驗表結構:

添加資料:
insert into user(name,age) values('bxc',10);
BinLogInfo.Binlog toString資訊:
type: DML
commit_ts: 429572910085570562
dml_data {
tables {
schema_name: "testdb"
table_name: "user"
column_info {
name: "id"
mysql_type: "int"
is_primary_key: true
}
column_info {
name: "name"
mysql_type: "varchar"
is_primary_key: false
}
column_info {
name: "age"
mysql_type: "int"
is_primary_key: false
}
mutations {
type: Insert
row {
columns {
int64_value: 212247
}
columns {
string_value: "bxc"
}
columns {
int64_value: 10
}
}
}
5: {
1: "PRIMARY"
2: "id"
}
}
}
決議列印資訊:

更新資料:
update user set age = 20 where name = 'bxc';

洗掉資料:
delete from user where name = 'bxc';

創建一個新的表:
CREATE TABLE `user_copy` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(10) DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214;
BinLogInfo.Binlog toString資訊:
type: DDL
commit_ts: 429573182230102017
ddl_data {
schema_name: "testdb"
table_name: "user_copy"
ddl_query: "CREATE TABLE `user_copy` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214"
}


喜歡的小伙伴可以關注我的個人微信公眾號,獲取更多學習資料!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374652.html
標籤:其他
