關注 “Java藝術” 我們一起成長!
訂閱Binlog的目的在于,實作實時的快取更新、處理復雜邏輯資料實時同步到Elasticsearch/其它庫-表等業務場景,
本篇內容包括:
一種在應用層實作監聽
SQL的方式預備知識:關于
Mysql事務的兩階段提交與Binlog預備知識:關于
Kafka阿里云資料傳輸服務
DTS-資料訂閱官方
DEMO的消費模型:生產->消費模型官方
DEMO提供的MetaStore與Checkpoint特性使用官方
DEMO需要注意的地方關于
avro序列化與反序列化
一種在應用層實作監聽SQL的方式
筆者之前寫過關于在應用中利用Mybatis插件和SQL決議工具實作監聽SQL從而更新資料的文章,并且將這一功能整合到了個人的開源專案(easymulti-datasource-spring-boot-starter)中,支持監聽事務,支持實時消費監聽到的SQL,也可以通過給事務監聽器注冊回呼介面方式,在事務提交時才開始消費監聽到的SQL,
使用easymulti-datasource-spring-boot-starter也能在應用層面輕松實作實時SQL訂閱,但這種方式也存在弊端,雖然監聽到SQL后也是異步消費,但攔截SQL、分析SQL,本身也會有點性能損耗,而如果是有多個應用修改同一個表的情況,那么就需要每個應用都寫一遍消費的代碼,
如果可以在更底層,直接訂閱Mysql的Binlog,效率會比在應用層實作高得多,
預備知識:關于Mysql事務的兩階段提交與Binlog
Binlog用于記錄資料庫執行的寫入操作資訊,以二進制的形式保存在磁盤中,
Binlog是Mysql的邏輯日志,并且由Server層進行記錄,無論使用何種存盤引擎,Mysql資料庫都會記錄Binlog日志,
Mysql只有在事務提交時才會記錄Biglog,并且事務在提交時,Biglog還只是記錄在記憶體中,然后才通過配置的刷盤策略寫入到檔案中,
Mysql通過sync_binlog引數控制Biglog的刷盤時機,取值范圍是0-N:
0:由系統自行判斷何時寫入磁盤;1:每次commit都將Binlog寫入磁盤;N:每N個事務commit才將Binlog寫入磁盤;
毫無疑問,sync_binlog最安全的是設定是1,這也是MySQL 5.7.7之后版本的默認值,
通常我們提到的Mysql事務的兩階段提交都與InnoDB存盤引擎有關,
Mysql事務分兩個階段提交,第一階段由存盤引擎預寫記錄,如InnoDB存盤引擎寫Redolog,此階段Binlog不作任何操作;第二階段首先是寫Binlog,然后再由存盤引擎完成事務的提交作業,如寫入commit日記、釋放鎖等,
當第二階段的寫Binlog成功后,MySQL就會認為事務已經提交并且持久化了,所以在這一步Binlog就已經可以發送給訂閱者了,如果在寫完Binlog后,存盤引擎還沒有完成提交的事務,剛好在這個時刻資料庫崩潰,那么重啟后依然能根據Binlog正確恢復該事務,如果在寫Binlog這一步完成之前,任何操作的失敗都會引起事務回滾,
所以,如果是直接訂閱Binlog,我們并不需要關心事務最終是提交了還是回滾了,在事務未提交之前,我們都訂閱不到該事務中執行的任何SQL的日記,
想要了解更多,推薦閱讀文章:《MySQL · 原理介紹 · 再議MySQL的故障恢復》http://mysql.taobao.org/monthly/2018/12/04/
預備知識:關于Kafka
資料存盤問題
Kafka集群保留所有發布的記錄,無論它們是否已被消費,可通過配置保留期限引數來控制訊息的保留時長,如果保留策略設定為2天,一條記錄發布后兩天內,可以隨時被消費,兩天過后這條記錄會被拋棄并釋放磁盤空間,
offset消費偏移量
偏移量由消費者所控制,由消費者在消費記錄后commit一個新的偏移量,kafka會為消費者存盤這個偏移量,以便于后續繼續消費,kafka會按group + topic + partition存盤偏移量,當然,也可以自行存盤,關于自行存盤偏移量需要注意的問題后續會提到,
由于kafka按group + topic + partition存盤偏移量,這同時也對應另一個問題:"同一個分組內,一個topic的每個partition都只能有一個消費者消費,但一個消費者可以同時消費多個partition,"
由于offset由消費者控制,所以消費者可以采用任何順序來消費記錄,也就是說,一個topic的任一消費者都可以重置到一個舊的偏移量,從而重新處理過去的資料,也可以跳過最近的記錄,從當前位置開始消費,
消費者
一個KafkaConsumer實體并不一定就等于一個消費者,
在subscribe模式下,一個KafkaConsumer實體等于一個消費者,假設只有一個磁區,開啟多個KafkaConsumer,那么將會有一個消費者處于空閑狀態,即這個執行緒每次呼叫該KafkaConsumer實體的poll方法都會一直回傳空,拉不到任何訊息,直到當前正在消費的KafkaConsumer長連接掉線后,重平衡后空閑的消費者才會拉取到記錄,
這也證實了這句話:Kafka實作消費的方式是將日志中的磁區劃分到每一個消費者實體上,以便在任何時間,每個消費者都是某個磁區的唯一消費者,
在subscribe模式下,與其說一個KafkaConsumer等于一個消費者,不如說,一個連接(Socket)等于一個消費者,
但在assign模式下,如果多個KafkaConsumer訂閱的都是指定的topic和磁區(并且同組),那么這些KafkaConsumer拉取的都會是同一個磁區的記錄,這里只是舉例說明,不要這樣用,否則會重復消費記錄,兩個執行緒交叉提交(commit)偏移量(offset)也會出問題,
消費者組
通常情況下,每個topic都會有一些消費組,一個消費組就是一個邏輯訂閱者,
例如:
topic:用戶注冊
group 1:短信推送服務訂閱者
group 2:郵件推送服務訂閱者
group1和group2是邏輯訂閱者,但每個邏輯訂閱者下面都可以有多個消費者,
同一個組內的消費者數量不要超過topic的partition數量,因為超出partition數量的消費者不會被分配到partition,也就是會處于空閑狀態(見"消費者"下的描述);
維護消費者組中的消費關系由Kafka協議動態處理,當有新的消費者加入組時,新加入的消費者將從組中其他成員處接管一些partition磁區,當一個消費者消失時,該消費者擁有的磁區將被重新分配給其它剩余的消費者,
還有一點,在同一個分組下,如果一個topic的每個磁區當前都有一個消費者正在消費,新加入的消費者將會替代一個正在消費的消費者,接管被替代的消費者消費的磁區,
阿里云資料傳輸服務DTS-資料訂閱
阿里云資料傳輸服務DTS支持MySQL及DRDS的Binlog實時訂閱,
我們可以不必使用官方提供的SDK訂閱Binlog,而只需要使用Kafka客戶端,使用Kafka的API實作Binlog訂閱,
官方檔案:使用Kafka客戶端消費訂閱資料https://help.aliyun.com/document_detail/121239.html?spm=a2c4g.11186623.6.785.6d4d6d2aIOqQQm
官方提供的DEMO:[subscribe_example]https://github.com/LioRoger/subscribe_example,該DEMO由龍玄大佬提供,
我們選擇基于官方DEMO[subscribe_example/javaimpl]構建Mysql Binlog實時訂閱服務(試用階段),而不是重復造輪子,但我們對原始碼做了部分修改,保留訊息反序列化、MetaStore與Checkpoint特性,其中MetaStore與Checkpoint是這個DEMO最值得學習的地方,
官方DEMO的消費模型:生產->消費模型
DEMO只開啟一個消費者,這個消費者負責訂閱訊息,并將訂閱到的訊息放入一個阻塞佇列(LinkedBlockingQueue)中,這個阻塞佇列的默認大小設定為512,
另外開啟一個真正消費訊息的執行緒,從該阻塞佇列中讀取訊息并呼叫RecordListener的consume方法消費,在 RecordListener消費完訊息后,將該訊息的offset包裝成一個檢查點(Checkpoint),將該檢查點設定為最新的檢查點,另外會有一個定時任務每5秒提交一次最新的檢查點,即提交offset,
kafka消費者每次都有可能拉取到一批訊息,并且這些訊息是按發布順序排好序的,因為topic的一個磁區只能被一個消費者消費,而訊息在磁區中本就按訊息的發布順序排好序的,
在DEMO中,消費者將訂閱到的訊息放入阻塞佇列也是按順序放入,當佇列滿時會阻塞等待,因此只需要確保按順序消費阻塞佇列中的訊息并提交offset,
如果不按順序消費阻塞佇列中的訊息會怎樣?
假設多個執行緒并行無順序的消費拉取到的訊息,那么就無法確保offset被正確提交,可能會導致部分訊息重復消費,
在不嚴格要求每條訊息都必須正確無例外地被消費的情況下,我們可以使用多執行緒消費,提升訊息的消費速度,
比如,消費阻塞佇列中訊息的執行緒只負責從阻塞佇列獲取訊息,并負責決議,其它例如更新快取等行為放到異步執行緒池中去執行,只要成功放入異步執行緒池,就更新Checkpoint(offset),繼續消費后面的訊息,
官方DEMO提供的MetaStore與Checkpoint特性
Checkpoint用于記錄分組內的一個topic的某個磁區當前實際消費到的位置(偏移量:offset),
/**
* 安全檢查點(即:記錄消費位置)
*/
public class Checkpoint {
// 磁區資訊
private final TopicPartition topicPartition;
private final long timeStamp;
private final long offset;
public Checkpoint(TopicPartition topicPartition, long timeStamp, long offset) {
this.topicPartition = topicPartition;
this.timeStamp = timeStamp;
this.offset = offset;
}
}
MetaStore則用于存盤Checkpoint,或者說是提交偏移量,
public interface MetaStore<V> {
Future<V> serializeTo(TopicPartition topicPartition, String group, V value);
V deserializeFrom(TopicPartition topicPartition, String group);
}
DEMO提供了兩個實作類:KafkaMetaStore、LocalFileMetaStore,其中LocalFileMetaStore實作的就是使用本地檔案存盤消費的磁區的偏移量,KafkaMetaStore則是呼叫KafkaConsumer的commitAsync方法異步提交偏移量,也就是說讓kafka存盤偏移量,
需要注意的是,在subscribe模式下,不要使用LocalFileMetaStore,
當消費者以集群方式部署時,節點重啟后由于kafka的再平衡,該節點消費的磁區可能與重啟之前的磁區不同,那么本地檔案存盤的消費偏移量就使用不上,會導致從頭(配置的初始化消費位置)開始消費記錄,
而如果只是部署一個消費者服務,或者多個消費者是在一個行程內的,又或是使用assign模式,那么可以使用LocalFileMetaStore,但需要確保每次服務重啟都存在偏移量檔案,如果切換服務器部署,則需要將偏移量檔案同步到新的服務器上,
為了省去不必要的麻煩,我們直接棄用LocalFileMetaStore,而使用KafkaMetaStore,
public class KafkaMetaStore implements MetaStore<Checkpoint> {
private volatile KafkaConsumer kafkaConsumer;
//.....
// 異步提交offset
@Override
public Future<Checkpoint> serializeTo(TopicPartition topicPartition, String group, Checkpoint value) {
KafkaFutureImpl ret = new KafkaFutureImpl();
if (null != kafkaConsumer) {
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(value.getOffset(), String.valueOf(value.getTimeStamp()));
// 異步提交(不能同步提交,否則影響RecordGenerator#run())
// Notice: commitAsync is only put commit offset request to sending queue, the future result will be driven by KafkaConsumer.poll() function
// So if you only call this method but not poll, you may not wait offset commit call back
kafkaConsumer.commitAsync(Collections.singletonMap(topicPartition, offsetAndMetadata), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null != exception) {
log.warn("KafkaMetaStore: Commit offset for group[" + group + "] topicPartition[" + topicPartition.toString() + "] " +
value.toString() + " failed cause " + exception.getMessage(), exception);
ret.completeExceptionally(exception);
} else {
log.debug("KafkaMetaStore:Commit offset success for group[{}] topicPartition [{}] {}", group, topicPartition, value);
ret.complete(value);
}
}
});
} else {
log.warn("KafkaMetaStore: kafka consumer not set, ignore report");
ret.complete(value);
}
return ret;
}
// 從kafka獲取當前磁區的offset和時間戳
@Override
public Checkpoint deserializeFrom(TopicPartition topicPartition, String group) {
if (null != kafkaConsumer) {
OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(topicPartition);
if (null != offsetAndMetadata) {
return new Checkpoint(topicPartition, Long.valueOf(offsetAndMetadata.metadata()), offsetAndMetadata.offset(), offsetAndMetadata.metadata());
} else {
return null;
}
} else {
log.warn("KafkaMetaStore: kafka consumer not set, ignore fetch offset");
throw new KafkaException("KafkaMetaStore: kafka consumer not set, ignore fetch offset for group[" + group + "] and tp [" + topicPartition + "]");
}
}
}
使用官方DEMO需要注意的地方
如果topic的某個磁區從未被消費過,那么在首次啟動消費者時,需要配置初始化消費位置,可以使用時間戳,也可以使用offset定位到想要消費的位置,
如果磁區被消費過,那么就可以在消費者啟動/重啟時,先獲取最后消費的位置,然后再從最后消費的位置開始消費,但由于offset是定時每5秒才提交一次,所以獲取到的offset并不能代表實際消費的偏移量,所以每次重起都會有小部分記錄被重新消費,這需要我們自行確保訊息的冪等性消費,
官方DEMO默認使用LocalFileMetaStore,替換MetaStore只需要修改RecordGenerator#getConsumerWrap方法,代碼如下:
public class RecordGenerator{
private ConsumerWrap getConsumerWrap(String message) {
// KafkaConsumer包裝器
ConsumerWrap kafkaConsumerWrap = getConsumerWrap();
// 不建議使用LocalFileMetaStore存盤(特別是部署到k8s上),否則將消費者部署到其它服務器后,需要將localCheckpointStore檔案也要同步過去才可以
// metaStoreCenter.registerStore(LOCAL_FILE_STORE_NAME, new LocalFileMetaStore(LOCAL_FILE_STORE_NAME));
// 使用KafkaMetaStore
metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));
// 從檢查點存盤器獲取檢查點(由于是每5秒提交一次,所以每次重起都會有小部分記錄被重新消費)
Checkpoint checkpoint = getCheckpoint();
// 沒有找到檢查點,則使用配置的初始化檢查點
if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
checkpoint = initialCheckpoint; // 在組態檔中配置
log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
} else {
log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
}
//.......
}
}
最后,由于阿里云資料傳輸服務DTS-資料訂閱只會將日記提交到一個磁區,即一個topic只有一個磁區,這是為了確保能夠按正確的順序消費每一條sql日記,所以,我們沒有必要使用subscribe模式,應該使用assign模式,并且沒有必要部署集群,這也是官方DEMO所推薦的,
DefaultConsumerWrap封裝了KafkaConsumer,使用assign模式在該類的assignTopic方法表現,代碼如下,
public class DefaultConsumerWrap extends ConsumerWrap {
private KafkaConsumer<byte[], byte[]> consumer;
@Override
public void assignTopic(TopicPartition topicPartition, Checkpoint checkpoint) {
// KafkaConsumer
consumer.assign(Collections.singletonList(topicPartition));
log.info("RecordGenerator: assigned for {} with checkpoint {}", topicPartition, checkpoint);
// 設定消費位置
setFetchOffsetByTimestamp(topicPartition, checkpoint);
}
}
其中,assignTopic方法的第二個引數(Checkpoint)從MetaStore獲取而來,或者是使用配置的初始化位置,在呼叫KafkaConsumer#assign方法之后,呼叫setFetchOffsetByTimestamp方法設定消費位置,后續就可以呼叫KafkaConsumer#poll方法拉取訊息了,
setFetchOffsetByTimestamp方法實作如下,相比DEMO原始碼,我們做了點修改,
public class DefaultConsumerWrap extends ConsumerWrap {
@Override
public void setFetchOffsetByOffset(TopicPartition topicPartition, Checkpoint checkpoint) {
// 移動到指定位置繼續消費
consumer.seek(topicPartition, checkpoint.getOffset());
}
// recommended
@Override
public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint) {
// 優先使用偏移量
if (checkpoint.getOffset() > 0) {
setFetchOffsetByOffset(topicPartition, checkpoint);
return;
}
long timeStamp = checkpoint.getTimeStamp();
// 根據時間戳獲取偏移量
Map<TopicPartition, OffsetAndTimestamp> remoteOffset = consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timeStamp));
OffsetAndTimestamp toSet = remoteOffset.get(topicPartition);
if (null == toSet) {
throw new RuntimeException("RecordGenerator:seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed");
}
// 移動到指定位置繼續消費
consumer.seek(topicPartition, toSet.offset());
}
}
關于avro序列化與反序列化
官方提供的demo,其中com.alibaba.dts.formats.avro這個package是由avro的shcema編譯而來的,我們也可以自行編譯,具體實作如下:
1、執行命令編譯avsc檔案生成java代碼
java -jar avro/avro-tools-1.8.2.jar compile -string schema avro/Record.avsc .
2、將生成的com.alibaba.dts.formats.avro這個package拷貝到當前工程根目錄下面,當然,也可以封裝到一個模塊,在主模塊中引入,
[Java藝術] 微信號:javaskill
一個只推送原創文章的技術公眾號,分享Java后端相關技術,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/221110.html
標籤:其他
