前文:
前兩篇文章主要介紹了集群模式下Zookeeper服務端的啟動流程,以及Leader選舉的程序,在leader選舉完成后,集群中的各節點分別有了對應的角色:Leader、Follower、Observer,那么按照對應的模式,會分別啟動不同的服務,也就是前文提到的幾個服務類,如下所示:
本文就主要先介紹下其基礎類ZookeeperServer的知識點,后續再分別介紹其子類,
1.ZookeeperServer的屬性及構造方法
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
* PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
*/
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// 心跳時間
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// session超時時間設定
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// session管理器,之前有專門分析過
protected SessionTracker sessionTracker;
//事務日志、快照日志處理器
private FileTxnSnapLog txnLogFactory = null;
// 記憶體資料庫
private ZKDatabase zkDb;
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
// 構建Processor處理器
protected RequestProcessor firstProcessor;
// server狀態
protected volatile State state = State.INITIAL;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR;
}
// 服務埠連接處理器
private ServerCnxnFactory serverCnxnFactory;
// 服務端狀態資訊
private final ServerStats serverStats;
// shutdown鉤子函式處理
private ZooKeeperServerShutdownHandler zkShutdownHandler;
public ZooKeeperServer() {
serverStats = new ServerStats(this);
listener = new ZooKeeperServerListenerImpl(this);
}
// 默認構造以下引數資訊
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
this.minSessionTimeout = minSessionTimeout;
this.maxSessionTimeout = maxSessionTimeout;
listener = new ZooKeeperServerListenerImpl(this);
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
+ " datadir " + txnLogFactory.getDataDir()
+ " snapdir " + txnLogFactory.getSnapDir());
}
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
DataTreeBuilder treeBuilder) throws IOException {
this(txnLogFactory, tickTime, -1, -1, treeBuilder,
new ZKDatabase(txnLogFactory));
}
}
注釋中,我們能發現很多資訊,作為一個ZookeeperServer在處理請求時的流程鏈為:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor,
2.ZookeeperServer主要方法分析
2.1 loadData() 啟動加載節點資訊
public void loadData() throws IOException, InterruptedException {
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// 之前有分析過加載資料的這塊邏輯,主要在zkDb.loadDataBase()中實作
setZxid(zkDb.loadDataBase());
}
// 有關于過期的會話,則會直接洗掉
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
zkDb.setDataTreeInit(true);
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
}
這塊邏輯之前有分析過,通過ZKDatabase.loadDataBase()方法來加載快照日志資訊和事務日志資訊,最侄訓傳最新的那條事務操作的zxid資訊,
2.2 firstProcessor的構造
有關于RequestProcessor的構造實際上是使用了一個裝飾器的模式,從firstProcess開始執行,會不斷的呼叫nextProcessor,一直執行到最后一個為止,
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
PrepRequestProcessor和SyncRequestProcessor都有一個nextProcessor的屬性,整個呼叫鏈如上所示:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor,
2.3 處理客戶端連接請求
在還未對客戶端創建連接時,則首先會處理客戶端的連接請求,在之前server處理會話創建請求的文章中我們有詳細分析過,這里大致過一下即可
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request from client "
+ cnxn.getRemoteSocketAddress()
+ " client's lastZxid is 0x"
+ Long.toHexString(connReq.getLastZxidSeen()));
}
...
// 如果請求的lastZXID 大于 server端的最新的ZXID,說明客戶端請求例外
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
// 與服務端協商session超時時間,需要介于minSessionTimeout 和 maxSessionTimeout之間
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
// 如果客戶端是首次連接,那么sessionId未分配過,則默認為0,如果不是0,說明之前已經分配過
// 但由于某種原因,又斷開重連了,所以服務端針對這種連接會重新打開對應的session
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
// 重新打開對應的session
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
// 首次連接,需要創建Session
createSession(cnxn, passwd, sessionTimeout);
}
}
2.4 處理客戶端其他請求
有關于處理客戶端請求,之前的博客中也有過說明,這里再簡單介紹下
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
// 拼裝Request物件
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
public void submitRequest(Request si) {
if (firstProcessor == null) {
// 若還未初始化,則針對已經到達的請求,先休息1秒,再判斷processor是否已創建完成,未完成直接報錯
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
// session處理
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 直接交由PrepRequestProcessor進行處理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
請求的處理主要是交由RequestProcessor來處理,
總結:
本文沒有什么比較特殊的點,其中的知識點基本在之前的博客中都有涉獵,主要是對ZookeeperServer做一個總結,方便后續對其子類進行展開介紹,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/345684.html
標籤:其他
