前言:
總體而言,Zookeeper服務端的日志分為三種:事務日志、快照日志、log4j日志,
log4j日志無需多言,我們在%ZOOKEEPER_DIR%/conf/log4j.properties中配置了日志的詳細資訊,
本文主要介紹下事務日志的內容和Zookeeper如何生成事務日志以及其作用,快照日志的話,下一篇會著重介紹,
1.什么是事務日志?
我們在%ZOOKEEPER_DIR%/conf/zoo.cfg中配置的dataDir引數,是專門用于存盤事務日志和快照日志的檔案夾路徑,當然,我們也可以將兩個日志分開(事務讀寫比較頻繁時事務日志會比較大,將兩者分開可以提高系統性能),這時可以在zoo.cfg中配置dataLogDir路徑,
那么什么是事務日志呢?
就是Zookeeper服務端針對客戶端的所有事務請求(create、update、delete)等操作,在回傳成功之前,都會將本次操作的內容持久化到磁盤上,完成之后,才回傳客戶端成功標志,
2.查看事務日志資訊
在筆者的機器上,我們在%ZOOKEEPER_DIR%/data/version-2目錄下,看到以下幾個檔案

這個就是事務日志,直接打開的話是二進制內容,不利于查看,那么我們可以通過Zookeeper原始碼中提供的org.apache.zookeeper.server.LogFormatter來查看,
通過在main()方法中指定需要查看的事務日志檔案路徑即可以查看,筆者在查看log.1檔案時,生成以下輸出:
...
// 創建節點 /hello20040
21-10-5 下午05時13分46秒 session 0x10000d4a6d50002 cxid 0x29 zxid 0x3ac8 create '/hello20040,#776f726c643230303430,v{s{31,s{'world,'anyone}}},F,15041
// 創建一個Session會話
21-10-7 下午01時21分41秒 session 0x10000d4a6d50005 cxid 0x0 zxid 0x12505 createSession 40000
// 設定/hello20040 值
21-10-7 下午01時21分41秒 session 0x10000d4a6d50005 cxid 0x1 zxid 0x12506 setData '/hello20040,#3137,1
// 洗掉/hello20040節點
21-10-7 下午01時22分33秒 session 0x10000d4a6d50006 cxid 0x1 zxid 0x1250a delete '/hello20040
// 關倍訓話
21-10-7 下午01時22分41秒 session 0x10000d4a6d50007 cxid 0x0 zxid 0x1250b createSession 40000
通過以上日志可以很清楚的看到每一次事務操作時的具體資訊,這樣方便我們進行問題排查,
當然,不僅可以直接通過debug代碼的方式來查看,我們同樣可以通過Zookeeper.jar的方式來查看,大家可以參考這篇博文: https://blog.csdn.net/qq_34291777/article/details/86644347
3.事務日志請求執行程序
有了前面對Zookeeper server端處理請求的分析,我們知道事務日志的添加呼叫入口是通過SyncRequestProcessor來完成的,下來就一起來分析下其是如何將事務日志落入磁盤的,
我們就以create()方法為示例,來看下整個程序,前面server處理會話創建請求的文章中,我們知道,最終交由三個requestProcessor來處理,處理順序為 PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
3.1 PrepRequestProcessor.pRequest() 創建事務請求物件
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// 事務請求request,分為hdr請求頭和txn請求體
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
case OpCode.create:
// 這里的CreateRequest就是請求體
CreateRequest createRequest = new CreateRequest();
// 交由pRequest2Txn()方法處理
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
}
...
}
// 交由下一個processor執行
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
// 創建請求頭
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
case OpCode.create:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CreateRequest createRequest = (CreateRequest)record;
if(deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
// 檢查path合法性及ACL權限控制
String path = createRequest.getPath();
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
LOG.info("Invalid path " + path + " with session 0x" +
Long.toHexString(request.sessionId));
throw new KeeperException.BadArgumentsException(path);
}
List<ACL> listACL = removeDuplicates(createRequest.getAcl());
if (!fixupACL(request.authInfo, listACL)) {
throw new KeeperException.InvalidACLException(path);
}
// 檢查pathACL
String parentPath = path.substring(0, lastSlash);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
// 根據節點是否持久化和順序化進行不同的驗證
CreateMode createMode =
CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion()+1;
// 生成事務請求體
request.txn = new CreateTxn(path, createRequest.getData(),
listACL,
createMode.isEphemeral(), newCversion);
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
// 將父節點的變更資訊和當前節點的變更資訊推送到ZooKeeperServer.outstandingChanges中
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
break;
}
...
}
}
總結:事務請求物件Request,包含請求頭TxnHeader hdr和請求體Record txn,所以PrepRequestProcessor的主要作業就是堆hdr和txn的封裝
3.2 SyncRequestProcessor 事務日志添加
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// 事務請求Request被添加到queuedRequests中
public void processRequest(Request request) {
queuedRequests.add(request);
}
public void run() {
try {
int logCount = 0;
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
// 不斷從queuedRequests獲取事務請求資訊
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 在這里將Request添加到事務日志中
if (zks.getZKDatabase().append(si)) {
logCount++;
// 如果需要重繪到磁盤則執行flush操作
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
// 快照日志單獨啟動一個執行緒來執行,避免阻塞主執行緒執行,后續專門分析
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
// 執行flush操作
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
}
事務日志的磁盤寫入,默認分為兩步:寫入(append)、重繪(rollLog/commit)
寫入動作并不是真正的寫入磁盤(而是暫時快取下來),重繪操作才是真正將快取的內容寫入到磁盤中,
有了以上的分析,我們后面直接去分析append方法和flush方法的執行程序
4.事務日志的生成
主要就是對ZKDatabase.append()方法和ZKDatabase.rollLog()方法的呼叫
4.1 ZKDatabase相關方法
public class ZKDatabase {
protected FileTxnSnapLog snapLog;
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);
}
public void rollLog() throws IOException {
this.snapLog.rollLog();
}
public void commit() throws IOException {
this.snapLog.commit();
}
}
本質上都交由snapLog來操作
4.2 FileTxnSnapLog相關方法
public class FileTxnSnapLog {
// 事務日志操作類
private final File dataDir;
private TxnLog txnLog;
// 快照日志操作類
private final File snapDir;
private SnapShot snapLog;
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn);
}
/**
* commit the transaction of logs
* @throws IOException
*/
public void commit() throws IOException {
txnLog.commit();
}
/**
* roll the transaction logs
* @throws IOException
*/
public void rollLog() throws IOException {
txnLog.rollLog();
}
}
FileTxnSnapLog本質上只是一個包裝類,統一提供對事務日志和快照日志的操作API,
4.3 FileTxnLog 事務日志操作
在分析代碼之前,我們先看下FileTxnLog類的注釋,可以幫助我們很好的理解事務日志檔案的組成,如下圖所示:

事務日志檔案主要由三部分組成:檔案頭(FileHead)、事務內容(Txn組成的list,每一個Txn包含了checksum Txnlen TxnHeader Record 0x42等屬性)、填充數字
事務內容的組成,如下圖所示:

public class FileTxnLog implements TxnLog {
// 最新的zxid
long lastZxidSeen;
// 事務日志流
volatile BufferedOutputStream logStream = null;
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) {
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream==null) {
// 若檔案為空,則默認以當前事務的zxid結尾來創建log檔案
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
// 先寫入fileheader
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);
}
filePadding.padFile(fos.getChannel());
// 將事務請求轉換為byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
// 計算checksum,并寫入
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 將事務請求體寫入,并添加EOR標志位
Util.writeTxnBytes(oa, buf);
return true;
}
}
總結:append()的程序本質上就是將事務請求體不斷寫入的程序,按照標準的流操作執行即可,
而關于commit()等方法,就更簡單了,就是執行流的flush操作,筆者不再贅述,
public class FileTxnLog implements TxnLog {
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush();
this.logStream = null;
oa = null;
}
}
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
...
}
}
}
總結:
本文分析了Zookeeper事務日志的相關知識點,從如何查看到原始碼分析其寫入程序,代碼并不算復雜,了解了該日志的基本資訊后,我們在日常的問題排查中就可以考慮查看事務日志來還原客戶端操作程序,
后續會繼續對快照日志進行分析,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/316444.html
標籤:其他
