前言:
前兩篇文章分別分析了Leader處理客戶端非事務請求、事務請求的處理程序,最后我們來分析下Follower節點處理客戶端請求的不同之處,
程序與Leader處理的程序基本差不多,所以相似的地方筆者就簡略帶過,重點分析不同之處,
1.FollowerZookeeperServer請求處理鏈
同樣的,我們先從其構造上來分析下其處理鏈
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
}
所以,其處理鏈為FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
同時還有一個SyncRequestProcessor回應leader的proposal,后續我們詳細分析
2.FollowerRequestProcessor
public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
// 從客戶端獲取的請求全部存入queuedRequests,后續通過run()方法呼叫執行
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request);
}
}
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
...
// 請求交由下一個processor(commitProcessor)處理
nextProcessor.processRequest(request);
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
// 有關于事務型別請求,直接交由leader處理,具體見2.1
zks.getFollower().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
}
2.1 follower轉發事務請求到leader
public class Learner {
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
writePacket(qp, true);
}
}
FollowerRequestProcessor將事務請求交由leader處理;同時繼續將請求交由commitProcessor來處理;
我們來回憶下前文中Leader節點處理事務請求的程序:針對事務請求,leader生成proposal資訊到各follower,follower處理完成后回傳ack,leader接收到足夠的ack后再次向各follower發送commit資訊,
那么這個程序中,我們的follower相關處理在哪里呢?這時候又要回到FollowerServer啟動的時候了,如下,
2.2 Follower.followLeader() follower啟動
public class Follower extends Learner{
void followLeader() throws InterruptedException {
...
try {
QuorumServer leaderServer = findLeader();
try {
// 創建與leader連接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 將當前節點資訊注冊到leader上
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
// 與leader進行資料同步
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 接收leader請求包,并進行處理
readPacket(qp);
processPacket(qp);
}
} ...
}
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
// 接收到leader發送過來的proposal
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
// 進行事務日志處理,具體見2.2.1
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
// 當leader收集到足夠的ack后,向各follower發送commit,具體見2.2.2
fzk.commit(qp.getZxid());
break;
...
}
}
}
2.2.1 FollowerZooKeeperServer.logRequest() 創建事務請求日志
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
// 直接交由syncProcessor處理
syncProcessor.processRequest(request);
}
}
SyncProcessor的作用我們都知道,就是將當前請求進行事務日志保存,
而事務日志保存完成后,則直接交由SendAckRequestProcessor來處理
2.2.2 SendAckRequestProcessor 回傳leader ack回應
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
public void processRequest(Request si) {
if(si.type != OpCode.sync){
// 直接回傳leader ack回應包
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
...
}
}
}
}
2.2.3 FollowerZooKeeperServer.commit() 提交事務proposal
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
// 最終交由commitProcessor處理,詳見3.1
commitProcessor.commit(request);
}
}
總結:關于這塊的處理,讀者可以對照著Leader處理事務請求的程序來比對著看,
Follower關于事務請求還是分為兩部分:
接收leader proposal請求,記錄事務日志后,回傳ack回應;
接收leader commit請求,將請求交由CommitProcessor處理;
3.CommitProcessor
3.1 CommitProcessor.commit() 提交leader事務proposal
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
synchronized public void commit(Request request) {
if (!finished) {
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
// 很簡單,直接將請求放入committedRequests
committedRequests.add(request);
notifyAll();
}
}
}
follower提交事務proposal的方式很簡單,就是將請求放入committedRequests集合中,依據我們之前Leader節點對CommitProcessor的分析,在如下
3.2 CommitProcessor.run() 處理請求
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// leader獲取的請求集合
LinkedList<Request> queuedRequests = new LinkedList<Request>();
// 已經被follower 提交的請求集合
LinkedList<Request> committedRequests = new LinkedList<Request>();
public void run() {
try {
Request nextPending = null;
while (!finished) {
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// 5.請求proposal已完成,交由下個processor處理即可
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
// 2.若沒有收到請求且沒有收到leader的commit請求,則等待
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// 3.committedRequests不為空,說明當前follower已經接受到leader的commit請求
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 4.本次請求可以提交給下個processor處理
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
// 1.請求達到時,nextPending被設定為當前request,下次回圈時會使用到
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
}
與leader中CommitProcessor的處理類似,讀者可以按照上面數字排序來分析整個程序
4.FinalRequestProcessor
最終都是交由FinalRequestProcessor來處理,這塊我們已經分析過很多次了,不再贅述,
總結:
這里通過分析Follower節點處理請求(事務請求)的程序,可以了解到:Follower本身并不處理事務請求,而是直接轉發給leader來處理;
但是follower會配合leader進行proposal的處理,最終將節點資訊添加到當前ZKDatabase,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/352164.html
標籤:其他
上一篇:Centos7(linux)下hbase的偽分布式搭建以及eclipse遠程連接
下一篇:大資料熱點圖制作(微重點)
