主頁 >  其他 > 2021-10-24 基于Docker結合Canal實作MySQL實時增量資料傳輸功能

2021-10-24 基于Docker結合Canal實作MySQL實時增量資料傳輸功能

2021-10-26 07:46:42 其他

基于Docker結合Canal實作MySQL實時增量資料傳輸功能

主要介紹了基于Docker結合Canal實作MySQL實時增量資料傳輸功能,本文給圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
Canal的介紹
Canal的歷史由來
在早期的時候,阿里巴巴公司因為杭州和美國兩個地方的機房都部署了資料庫實體,但因為跨機房同步資料的業務需求 ,便孕育而生出了Canal,主要是基于trigger(觸發器)的方式獲取增量變更,從
2010年開始,阿里巴巴公司開始逐步嘗試資料庫日志決議,獲取增量變更的資料進行同步,由此衍生出了增量訂閱和消費業務,
當前的Canal支持的資料源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x,
Canal的應用場景
目前普遍基于日志增量訂閱和消費的業務,主要包括:

  • 基于資料庫增量日志決議,提供增量資料訂閱和消費
  • 資料庫鏡像 資料庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務Cache重繪
  • 帶業務邏輯的增量資料處理
  • Canal的作業原理
    在介紹Canal的原理之前,我們先來了解下MySQL主從復制的原理,
    MySQL主從復制原理
    在這里插入圖片描述
  • MySQL Master將資料變更的操作寫入二進制日志binary log中, 其中記錄的內容叫做二進制日志事件binary log
    events,可以通過show binlog events命令進行查看
  • MySQL Slave會將Master的binary log中的binary log events拷貝到它的中繼日志relay log
    MySQL Slave重讀并執行relay log中的事件,將資料變更映射到它自己的資料庫表中
  • 了解了MySQL的作業原理,我們可以大致猜想到Canal應該也是采用類似的邏輯去實作增量資料訂閱的功能,那么接下來我們看看實際上Canal的作業原理是怎樣的?

Canal作業原理
在這里插入圖片描述

  • Canal模擬MySQL Slave的互動協議,偽裝自己為MySQL Slave,向MySQL Master發送dump協議

  • MySQL Master收到dump請求,開始推送binary log給Slave(也就是Canal) Canal決議binary

  • log物件(資料為byte流)

基于這樣的原理與方式,便可以完成資料庫增量日志的獲取決議,提供增量資料訂閱和消費,實作MySQL實時增量資料傳輸的功能,
既然Canal是這樣的一個框架,又是純Java語言撰寫而成,那么我們接下來就開始學習怎么使用它并把它用到我們的實際作業中,

Canal的Docker環境準備
因為目前容器化技術的火熱,本文通過使用Docker來快速搭建開發環境,而傳統方式的環境搭建,在我們學會了Docker容器環境搭建后,也能自行依葫蘆畫瓢搭建成功,由于本篇主要講解Canal,所以
關于Docker的內容不會涉及太多,主要會介紹Docker的基本概念和命令使用, 如果你想和更多容器技術專家交流,可以加我微信liyingjiese,備注『加群』,群里每周都有全球各大公司的最佳實踐以及
行業最新動態 ,
什么是Docker
相信絕大多數人都使用過虛擬機VMware,在使用VMware進行環境搭建的時候,只需提供了一個普通的系統鏡像并成功安裝,剩下的軟體環境與應用配置還是如我們在本機操作一樣在虛擬機里也操作一
遍,而且VMware占用宿主機的資源較多,容易造成宿主機卡頓,而且系統鏡像本身也占用過多空間,
為了便于大家快速理解Docker,便與VMware做對比來做介紹,Docker提供了一個開始,打包,運行APP的平臺,把APP(應用)和底層infrastructure(基礎設施)隔離開來,Docker中最主要的兩個概
念就是鏡像(類似VMware的系統鏡像)與容器(類似VMware里安裝的系統),
什么是Image(鏡像)

  • 檔案和meta data的集合(root filesystem)
  • 分層的,并且每一層都可以添加改變洗掉檔案,成為一個新的image
  • 不同的image可以共享相同的layer
  • Image本身是read-only的
    在這里插入圖片描述
    什么是Container(容器)
  • 通過Image創建(copy)
  • 在Image layer之上建立一個container layer(可讀寫)
  • 類比面向物件:類和實體
  • Image負責APP的存盤和分發,Container負責運行APP
    在這里插入圖片描述
    Docker的網路介紹
    Docker的網路型別有三種:
  • Bridge:橋接網路,默認情況下啟動的Docker容器,都是使用Bridge,Docker安裝時創建的橋接網路,每次Docker容器重啟時,會按照順序獲取對應的IP地址,這個就導致重啟下,Docker的IP地
    址就變了,
  • None:無指定網路,使用 --network=none,Docker容器就不會分配局域網的IP,
  • Host:主機網路,使用–network=host,此時,Docker容器的網路會附屬在主機上,兩者是互通的,例如,在容器中運行一個Web服務,監聽8080埠,則主機的8080埠就會自動映射到容器
    中,

創建自定義網路:(設定固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork

查看存在的網路型別docker network ls:
在這里插入圖片描述
搭建Canal環境
附上Docker的下載安裝地址==> Docker Download ,
下載Canal鏡像docker pull canal/canal-server:
在這里插入圖片描述
下載MySQL鏡像docker pull mysql,下載過的則如下圖:
在這里插入圖片描述
查看已經下載好的鏡像docker images:
在這里插入圖片描述
接下來通過鏡像生成MySQL容器與canal-server容器:

生成mysql容器

docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql

生成canal-server容器

docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server

命令介紹

–net mynetwork #使用自定義網路
–ip #指定分配ip
查看Docker中運行的容器docker ps:
在這里插入圖片描述
MySQL的配置修改
以上只是初步準備好了基礎的環境,但是怎么讓Canal偽裝成Salve并正確獲取MySQL中的binary log呢?
對于自建MySQL,需要先開啟Binlog寫入功能,配置binlog-format為ROW模式,通過修改MySQL組態檔來開啟bin_log,使用find / -name my.cnf查找my.cnf,修改檔案內容如下:
[mysqld]
log-bin=mysql-bin # 開啟binlog
binlog-format=ROW # 選擇ROW模式
server_id=1 # 配置MySQL replaction需要定義,不要和Canal的slaveId重復
進入MySQL容器docker exec -it mysql bash,
創建鏈接MySQL的賬號Canal并授予作為MySQL slave的權限,如果已有賬戶可直接GRANT:
mysql -uroot -proot

創建賬號

CREATE USER canal IDENTIFIED BY ‘canal’;

授予權限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;

重繪并應用

FLUSH PRIVILEGES;
資料庫重啟后,簡單測驗 my.cnf 配置是否生效:
在這里插入圖片描述
show variables like ‘log_bin’;
show variables like ‘log_bin’;
show master status;
canal-server的配置修改
進入canal-server容器docker exec -it canal-server bash,
編輯canal-server的配置vi canal-server/conf/example/instance.properties:
在這里插入圖片描述
更多配置請參考==>Canal配置說明 ,
重啟canal-server容器docker restart canal-server 進入容器查看啟動日志:
docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
在這里插入圖片描述
至此,我們的環境作業準備完成!
拉取資料并同步保存到ElasticSearch
本文的ElasticSearch也是基于Docker環境搭建,所以讀者可執行如下命令:

下載對鏡像

docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine

創建容器并運行

docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
環境已經準備好了,現在就要開始我們的編碼實戰部分了,怎么通過應用程式去獲取Canal決議后的binlog資料,首先我們基于Spring Boot搭建一個canal demo應用,結構如下圖所示:
在這里插入圖片描述

Student.java
package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
// @Data 用戶生產getter、setter方法 @Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
}
CanalConfig.java
package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author haha
*/@Configuration
public class CanalConfig {
// @Value 獲取 application.properties配置中端內容 @Value("${canal.server.ip}")
private String canalIp; @Value("${canal.server.port}")
private Integer canalPort; @Value("${canal.destination}")
private String destination; @Value("${elasticSearch.server.ip}")
private String elasticSearchIp; @Value("${elasticSearch.server.port}")
private Integer elasticSearchPort; @Value("${zookeeper.server.ip}")
private String zkServerIp;
// 獲取簡單canal-server連接 @Bean
public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
return canalConnector;
}
// 通過連接zookeeper獲取canal-server連接 @Bean
public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
return canalConnector;
}
// elasticsearch 7.x客戶端 @Bean
public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
);
return client;
}
}
CanalDataParser.java
由于這個類的代碼較多,文中則摘出其中比較重要的部分,其它部分代碼可從GitHub上獲取:
public static class TwoTuple<A, B> {
public final A eventType;
public final B columnMap;
public TwoTuple(A a, B b) {
eventType = a;
columnMap = b;
}
}
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
for (Entry entry : entrys) {
// binlog event的事件事件
long executeTime = entry.getHeader().getExecuteTime();
// 當前應用獲取到該binlog鎖延遲的時間
long delayTime = System.currentTimeMillis() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 當前的entry(binary log event)的條目型別屬于事務
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 列印事務頭資訊,執行的執行緒id,事務耗時
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()),
simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 列印事務提交資訊,事務id
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
}
continue;
}
// 當前entry(binary log event)的條目型別屬于原始資料
if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null;
try {
// 獲取儲存的內容
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 獲取當前內容的事件型別
EventType eventType = rowChage.getEventType();
logger.info(row_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
// 事件型別是query或資料定義語言DDL直接列印sql陳述句,跳出繼續下一次回圈
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
printXAInfo(rowChage.getPropsList());
// 回圈當前內容條目的具體資料
for (RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> columns;
// 事件型別是delete回傳洗掉前的列內容,否則回傳改變后列的內容
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}HashMap<String, Object> map = new HashMap<>(16);
// 回圈把列的name與value放入mapfor (Column column: columns){ map.put(column.getName(), column.getValue());
}
rows.add(new TwoTuple<>(eventType, map));
}
}
}
return rows;
}
ElasticUtils.java
package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
/**
* @author haha
*/@Slf4j @Component
public class ElasticUtils { @Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 新增
* @param student
* @param index 索引
*/
public void saveEs(Student student, String index) {
IndexRequest indexRequest = new IndexRequest(index)
.id(student.getId())
.source(JSON.toJSONString(student), XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("保存資料至ElasticSearch成功:{}", response.getId());
} catch (IOException e) {
log.error("保存資料至elasticSearch失敗: {}", e);
}
}
/**
* 查看
* @param index 索引
* @param id _id
* @throws IOException
*/
public void getEs(String index, String id) throws IOException { GetRequest getRequest = new GetRequest(index, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource();
for (Map.Entry<String, Object> entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
/**
* 更新
* @param student
* @param index 索引
* @throws IOException
*/
public void updateEs(Student student, String index) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("更新資料至ElasticSearch成功:{}", response.getId());
}
/**
* 根據id洗掉資料
* @param index 索引
* @param id _id
* @throws IOException
*/
public void DeleteEs(String index, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("洗掉資料至ElasticSearch成功:{}", response.getId());
}
}
BinLogElasticSearch.java
package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author haha
*/@Slf4j @Component
public class BinLogElasticSearch { @Autowired
private CanalConnector canalSimpleConnector; @Autowired
private ElasticUtils elasticUtils;
//@Qualifier("canalHaConnector")使用名為canalHaConnector的bean @Autowired @Qualifier("canalHaConnector")
private CanalConnector canalHaConnector;
public void binLogToElasticSearch() throws IOException {
openCanalConnector(canalHaConnector);
// 輪詢拉取資料
Integer batchSize = 5 * 1024; while (true) { Message message = canalHaConnector.getWithoutAck(batchSize);
// Message message = canalSimpleConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
log.info("當前監控到binLog訊息數量{}", size);
if (id == -1 || size == 0) {
try {
// 等待2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//1. 決議message物件
List<CanalEntry.Entry> entries = message.getEntries();
List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple);
// 2,將決議出的物件同步到elasticSearch中
elasticUtils.saveEs(student, "student_index");
// 3.訊息確認已處理
// canalSimpleConnector.ack(id);
canalHaConnector.ack(id);
}
if(tuple.eventType == CanalEntry.EventType.UPDATE){ Student student = createStudent(tuple);
elasticUtils.updateEs(student, "student_index");
// 3.訊息確認已處理
// canalSimpleConnector.ack(id);
canalHaConnector.ack(id);
}
if(tuple.eventType == CanalEntry.EventType.DELETE){
elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
canalHaConnector.ack(id);
}
}
}
}
}
/**
* 封裝資料至Student
* @param tuple
* @return
*/
private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple){ Student student = new Student();
student.setId(tuple.columnMap.get("id").toString());
student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
student.setName(tuple.columnMap.get("name").toString());
student.setSex(tuple.columnMap.get("sex").toString());
student.setCity(tuple.columnMap.get("city").toString());
return student;
}
/**
* 打開canal連接
*
* @param canalConnector
*/
private void openCanalConnector(CanalConnector canalConnector) {
//連接CanalServer
canalConnector.connect();
// 訂閱destination
canalConnector.subscribe();
}
/**
* 關閉canal連接
*
* @param canalConnector
*/
private void closeCanalConnector(CanalConnector canalConnector) {
//關閉連接CanalServer
canalConnector.disconnect();
// 注銷訂閱destination
canalConnector.unsubscribe();
}
}
CanalDemoApplication.java(Spring Boot啟動類)
package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author haha
*/@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner { @Autowired
private BinLogElasticSearch binLogElasticSearch;
public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args);
}
// 程式啟動則執行run方法 @Override
public void run(ApplicationArguments args) throws Exception {
binLogElasticSearch.binLogToElasticSearch();
}
}
application.properties
server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = 192.168.124.5
elasticSearch.server.port = 9200

Canal集群高可用的搭建
通過上面的學習,我們知道了單機直連方式的Canala應用,在當今互聯網時代,單實體模式逐漸被集群高可用模式取代,那么Canala的多實體集群方式如何搭建呢!
基于ZooKeeper獲取Canal實體
準備ZooKeeper的Docker鏡像與容器:
docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

1、機器準備:
運行Canal的容器IP: 172.18.0.4 , 172.18.0.8 ZooKeeper容器IP:172.18.0.3:2181 MySQL容器IP:172.18.0.6:3306
2、按照部署和配置,在單臺機器上各自完成配置,演示時instance name為example,
3、修改canal.properties,加上ZooKeeper配置并修改Canal埠:
canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4、創建example目錄,并修改instance.properties:
canal.instance.mysql.slaveId = 1235
#之前的canal slaveId是1234,保證slaveId不重復即可
canal.instance.master.address = 172.18.0.6:3306
注意: 兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置,
啟動兩個不同容器的Canal,啟動后,可以通過tail -100f logs/example/example.log查看啟動日志,只會看到一臺機器上出現了啟動成功的日志,
比如我這里啟動成功的是 172.18.0.4:
在這里插入圖片描述
查看一下ZooKeeper中的節點資訊,也可以知道當前作業的節點為172.18.0.4:11111:
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{“active”:true,“address”:“172.18.0.4:11111”,“cid”:1}
客戶端鏈接, 消費資料
可以通過指定ZooKeeper地址和Canal的instance name,canal client會自動從ZooKeeper中的running節點獲取當前服務的作業節點,然后與其建立鏈接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{“active”:true,“address”:“172.18.0.4:11111”,“cid”:1}
對應的客戶端編碼可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一個HA連接
CanalConnector connector = CanalConnectors.newClusterConnector(“172.18.0.3:2181”, “example”, “”, “”);
鏈接成功后,canal server會記錄當前正在作業的canal client資訊,比如客戶端IP,鏈接的埠資訊等(聰明的你,應該也可以發現,canal client也可以支持HA功能):
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{“active”:true,“address”:“192.168.124.5:59887”,“clientId”:1001}
資料消費成功后,canal server會在ZooKeeper中記錄下當前最后一次消費成功的binlog位點(下次你重啟client時,會從這最后一個位點繼續進行消費):
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":“com.alibaba.otter.canal.protocol.position.LogPosition”,“identity”:{“slaveId”:-1,“sourceAddress”:{“address”:“mysql.mynetwork”,“port”:3306}},“postion”:{“included”:false,“journalName”:“binlog.000004”,“position”:2169,“timestamp”:1562672817000}}
停止正在作業的172.18.0.4的canal server:
docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
這時172.18.0.8會立馬啟動example instance,提供新的資料服務:
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{“active”:true,“address”:“172.18.0.8:11111”,“cid”:1}
與此同時,客戶端也會隨著canal server的切換,通過獲取ZooKeeper中的最新地址,與新的canal server建立鏈接,繼續消費資料,整個程序自動完成,
例外與總結
elasticsearch-head無法訪問Elasticsearch
es與es-head是兩個獨立的行程,當es-head訪問es服務時,會存在一個跨域問題,所以我們需要修改es的組態檔,增加一些配置項來解決這個問題,如下:
[root@localhost /usr/local/elasticsearch-head-master]# cd …/elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml

檔案末尾加上如下配置

http.cors.enabled: true
http.cors.allow-origin: “*”
修改完組態檔后需重啟es服務,
elasticsearch-head查詢報406 Not Acceptable
在這里插入圖片描述
解決方法:
1、進入head安裝目錄;
2、cd _site/
3、編輯vendor.js 共有兩處
#6886行 contentType: "application/x-www-form-urlencoded 改成 contentType: “application/json;charset=UTF-8”
#7574行 var inspectData = s.contentType === “application/x-www-form-urlencoded” && 改成 var inspectData = s.contentType === “application/json;charset=UTF-8” &&
使用elasticsearch-rest-high-level-client報org.elasticsearch.action.index.IndexRequest.ifSeqNo
#pom中除了加入依賴

org.elasticsearch.client
elasticsearch-rest-high-level-client
7.1.1

#還需加入

org.elasticsearch
elasticsearch
7.1.1

相關參考: git hub issues ,
為什么ElasticSearch要在7.X版本不能使用type?
參考: 為什么ElasticSearch要在7.X版本去掉type?
使用spring-data-elasticsearch.jar報org.elasticsearch.client.transport.NoNodeAvailableException
由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底層采用es官方TransportClient,而es官方計劃放棄TransportClient,工具以es官方推薦的RestHighLevelClient進行呼叫請
求, 可參考 RestHighLevelClient API ,
設定Docker容器開啟啟動
如果創建時未指定 --restart=always ,可通過update 命令
docker update --restart=always [containerID]
Docker for Mac network host模式不生效
Host模式是為了性能,但是這卻對Docker的隔離性造成了破壞,導致安全性降低, 在性能場景下,可以用–netwokr host開啟Host模式,但需要注意的是,如果你用Windows或Mac本地啟動容器的話,
會遇到Host模式失效的問題,原因是Host模式只支持Linux宿主機,
參見官方檔案: https://docs.docker.com/network/host/ ,
客戶端連接ZooKeeper報authenticate using SASL(unknow error)
在這里插入圖片描述

  • zookeeper.jar與Dokcer中的ZooKeeper版本不一致
  • zookeeper.jar使用了3.4.6之前的版本

出現這個錯的意思是ZooKeeper作為外部應用需要向系統申請資源,申請資源的時候需要通過認證,而sasl是一種認證方式,我們想辦法來繞過sasl認證,避免等待,來提高效率,
在專案代碼中加入System.setProperty(“zookeeper.sasl.client”, “false”);,如果是Spring Boot專案可以在application.properties中加入zookeeper.sasl.client=false,
參考: Increased CPU usage by unnecessary SASL checks ,
如果更換canal.client.jar中依賴的zookeeper.jar的版本
把Canal的官方原始碼下載到本機git clone https://github.com/alibaba/canal.git ,然后修改client模塊下pom.xml檔案中關于ZooKeeper的內容,然后重新mvn install:
在這里插入圖片描述
把自己專案依賴的包替換為剛剛mvn install生產的包:
在這里插入圖片描述
關于選型的取舍
在這里插入圖片描述

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/336199.html

標籤:其他

上一篇:Cloudera系列(3)使用DataFrame的Queries分析資料

下一篇:hadoop學習之(MapReduce、Pig、hive)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more