前言
在canal同步資料到es一篇中,分析并實戰演示了如何利用canal完成資料從mysql到es的準實時同步程序,本篇將基于已經構建好的canal服務,演示在代碼中如何利用canal完成一些業務場景的使用
環境準備
- 已經搭建好的canal服務
- 兩個不同環境(IP)下的mysql服務
一、快速搭建canal服務
為方便后文的演示和學習,以便看到的同學能體驗到完整的操作流程,在正式撰寫代碼之前,先基于centos7環境快速搭建起一個canal服務
搭建步驟
1、服務器使用docker快速安裝一個mysql并開啟binlog日志
具體可參考:docker安裝mysql及開啟binlog日志一篇
2、上傳canal安裝包并解壓

tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
3、進入到第二步解壓后的檔案目錄,并修改組態檔
進入conf目錄,需要的修改的組態檔為:canal.properties

#################################################
######### common argument #############
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
說明:這個檔案是 canal 的基本通用配置,canal 埠號默認就是 11111,修改 canal 的
輸出 model,默認 tcp,改為輸出到 kafka
重點關注上面的:canal.serverMode = tcp 這個配置,默認情況,如果是使用mysql,可以不做修改,如果需要將資料同步到kafka,或者rocketmq,可以分別修改即可,此處暫不做修改
進入example目錄,需要的修改的組態檔為:instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
- canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
- canal.instance.master.address=127.0.0.1:3306 ,監聽的mysql的master節點資訊
配置連接 MySQL 的用戶名和密碼,默認就是我們前面授權的 canal

4、啟動canal服務
回傳到bin目錄,直接: startup.sh


二、與springboot整合
搭建好了canal服務,如何在業務中使用呢?其實在很場景下,可以考慮借助canal實作一些諸如資料同步、災備、同城雙活等,比如來考慮這樣一種場景,一些比較大的電商頁面上,都有商品搜索服務,用戶輸入商品關鍵字可以快速檢索到商品
基本上搜索服務都是采用了諸如es這樣的搜索引擎,思考一下,網站的所有上架的商品資料開始肯定是存放在mysql這樣的關系型資料庫,但是搜索走mysql的話肯定不可能,所以需要定期或者準實時的將mysql的資料同步到es
在之前的某一篇中,我們可以直接基于canal做配置,將mysql的資料同步到es中
但是考慮到資料并非所有的都同步,比如說要對同步到es的資料進行分類、篩選、過濾等操作,純粹的配置就很難勝任了
于是,可以考慮在程式中,通過某種機制監聽到mysql中的商品上架資料的變化然后觸發程式,再通程序式將資料寫入到es,實作準實時同步
在上面的這種業務場景下,canal就是一種很好的選擇
1、Java中使用canal
匯入基本的依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
2、撰寫一個demo
通過客戶端,連接canal的資訊,可以在程式中監聽到mysql的master節點資料變化
下面直接貼出核心代碼:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws Exception{
//1.獲取 canal 連接物件
CanalConnector canalConnector =
CanalConnectors.newSingleConnector(new
InetSocketAddress("canal所在服務器IP", 11111), "example", "", "");
System.out.println("canal啟動并開始監聽資料 ...... ");
while (true){
canalConnector.connect();
//訂閱表
canalConnector.subscribe("shop001.*");
//獲取資料
Message message = canalConnector.get(100);
//決議message
List<CanalEntry.Entry> entries = message.getEntries();
if(entries.size() <=0){
System.out.println("未檢測到資料");
Thread.sleep(1000);
}
for(CanalEntry.Entry entry : entries){
//1、獲取表名
String tableName = entry.getHeader().getTableName();
//2、獲取型別
CanalEntry.EntryType entryType = entry.getEntryType();
//3、獲取序列化后的資料
ByteString storeValue = entry.getStoreValue();
//判斷是否rowdata型別資料
if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
//對第三步中的資料進行決議
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//獲取當前事件的操作型別
CanalEntry.EventType eventType = rowChange.getEventType();
//獲取資料集
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
//便利資料
for(CanalEntry.RowData rowData : rowDatasList){
//資料變更之前的內容
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column column : beforeColumnsList){
beforeData.put(column.getName(),column.getValue());
}
//資料變更之后的內容
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
JSONObject afterData = new JSONObject();
for(CanalEntry.Column column : afterColumnsList){
afterData.put(column.getName(),column.getValue());
}
System.out.println("Table :" + tableName +
",eventType :" + eventType +
",beforeData :" + beforeData +
",afterData : " + afterData);
}
}else {
System.out.println("當前操作型別為:" + entryType);
}
}
}
}
}
關于API的使用,可以參考官方的demo示例代碼,核心的代碼處理步驟大概如下:
- 建立連接
- 訂閱指定資料庫(或者所有資料庫,或某個庫下的表)
- 檢測到資料變更
- 提取binlog中的元資料,決議變更資料型別,決議元資料中的資訊
- 基于變更資料做自身的業務邏輯或其他業務
下面運行上面的代碼,這時候我們去資料庫中修改一下本次訂閱的資料庫下的某個表的資料

接下來去 shop001資料庫中給 user_info表新增一條資料


執行sql,然后觀察控制臺日志輸出

我們再次修改其中一條資料,很快控制臺上輸出了資料更改前和修改后的資料資訊日志

由此我們得知,基于上面決議出來的資訊,可以檢測到資料庫中某些表的變化情況,從而將變化后的資料做同步或者接入其他的中間件進行訊息通知等
3、與springboot整合
接下來,我們仍然以一個具體的業務場景為例
需求描述:
將從canal中讀取到的資料同步變更到另一個資料庫下相同的表中
匯入下面依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.14</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-dbutils/commons-dbutils -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.7</version>
</dependency>
<!--canal客戶端連接-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
2、application.yml 組態檔
注意,這里連接的資料庫地址是目標資料庫,即從canal中讀取并決議后的資料即將寫入的服務器地址
server:
port: 8083
logging:
config: classpath:logback-spring.xml #日志
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://IP:3306/bank1?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
username: root
password: root
druid:
max-active: 100
initial-size: 10
max-wait: 60000
min-idle: 5
3、核心工具類
其實我們可以直接拿上面的演示代碼,在里面做業務邏輯的處理也可以,不過在實際專案中,這樣做不便于代碼的維護性和可閱讀性,因此需要根據功能封裝一些方法形成可復用的工具類
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/**
* canal入庫方法
*/
public void handleMessages() {
CanalConnector connector = CanalConnectors.newSingleConnector(new
InetSocketAddress("canal所在的服務IP",
11111), "example", "", "");
int batchSize = 1000;
System.out.println("canal啟動并開始監聽資料 ...... ");
try {
connector.connect();
connector.subscribe("shop001.*");
connector.rollback();
try {
while (true) {
//嘗試從master那邊拉去資料batchSize條記錄,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
System.out.println("未檢測到任何資料變化......");
Thread.sleep(2000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//當佇列里面堆積的sql大于一定數值的時候就模擬執行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 模擬執行佇列里面的sql陳述句
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
/**
* 資料處理
* @param entrys
*/
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新陳述句
* @param entry
*/
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暫時只支持單一主鍵
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存洗掉陳述句
*
* @param entry
*/
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暫時只支持單一主鍵
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存插入陳述句
*
* @param entry
*/
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into "+ entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 入庫
* @param sql
*/
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: "+ row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
4、提供一個配置類,在程式啟動后監聽資料變化
@Configuration
public class InitConfig implements CommandLineRunner {
@Resource
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.handleMessages();
}
}
5、啟動類
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
提前在目標寫入的資料庫中準備一張相同的表

啟動程式后,往canal服務監聽的mysql服務資料庫的user_info表中插入上面的一條新資料,然后觀察控制臺輸出日志資訊,

執行資料插入,這時候控制臺檢測到了資料變化

同時目標資料表中也新增了一條資料,

通過上面的操作,就完成了預期的需求,當然基于此業務邏輯,還可以衍生出更多的需求場景,比如只監聽變化的資料,然后通知下游的其他應用等
本篇到此結束,最后感謝觀看!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/421874.html
標籤:其他
上一篇:Flink學習之環境搭建
