我需要將 MySQL 資料庫復制到 PostgreSQL 資料庫。我選擇了:
- Debezium 連接
- Avro 格式
- 融合模式注冊表
- 卡夫卡
正在復制資料,但是,我丟失了一些架構資訊。例如,datetime在 mysql 中具有格式的列bigint在 Postgres 中被復制,沒有創建外鍵,也沒有保留列的順序(這很好),等等。
PostgreSQL 接收器連接器:
{
"name": "jdbc-sink-dbt",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics.regex": "test_(.*)",
"connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
"transforms": "unwrap,removePrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.removePrefix.regex": "test_([^.] )",
"transforms.removePrefix.replacement": "$1",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
MySQL 連接器:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.] )\\.([^.] )\\.([^.] )",
"transforms.route.replacement": "$2_$3",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium 連接配置:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
uj5u.com熱心網友回復:
例如 mysql 中的 datetime 格式的列被復制為 bigint
這是由于time.precision.modeDebezium 連接器在源端使用的默認設定。如果您查看檔案,您會注意到默認精度將datetime列作為 INT64 發出,這解釋了為什么接收器連接器將內容寫入為bigint.
您現在可以在源端設定time.precision.mode為connect,以便 JDBC 接收器連接器可以正確解釋這些值。
未創建外鍵
這是意料之中的,請參閱這個Confluent GitHub 問題。目前,JDBC 接收器不具備支持在 JDBC 級別實作外鍵關系的能力。
不保留列的順序
這也是意料之中的。沒有預期的保證 Debezium 應該以與資料庫中完全相同的順序存盤關系列(盡管我們這樣做),并且 JDBC 接收器連接器不能保證在讀取欄位時保留欄位的順序發出的事件。如果接收器連接器使用諸如 HashMap 之類的容器來存盤列名,那么順序可能與源資料庫有很大不同。
如果有必要在目標系統中保留更高級別的關系元資料(例如外鍵和列順序)以反映源系統,您可能需要查看一個單獨的工具鏈來通過一些復制初始模式和關系型別的模式轉儲、轉換和匯入到目標資料庫,然后依賴 CDC 管道進行資料復制方面。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/411520.html
標籤:
