Seata 包括 Server端和Client端,Seata中有三種角色:TC、TM、RM,其中,Server端就是TC,TM和RM屬Client端,Client端的原始碼學習上一篇已講過,詳見 《Seata 1.5.2原始碼學習》,今天來學習Server端的原始碼,
原始碼下載地址:https://github.com/seata/seata
啟動類 ServerApplication 沒什么好說的,重點是ServerRunner

ServerRunner 是一個 CommandLineRunner 實體,因此在Spring Boot啟動完成后會回呼其run()方法,而在ServerRunner的run()方法中呼叫了Server.start()方法,

在Server#start()方法中,初始化了包括id生成器在內的很多組件,我們先不管這些,重點關注以下幾行代碼:
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
DefaultCoordinator是一個單例Bean,在整個應用中只有一個DefaultCoordinator實體



DefaultCoordinator 實作了 TransactionMessageHandler
NettyRemotingServer#setHandler()設定的正是TransactionMessageHandler

DefaultCoordinator#onRequest()

重點是這三行:
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
transactionRequest.handle(context);
DefaultCoordinator實作了TCInboundHandler介面,所以它不僅是一個TransactionMessageHandler,還是一個TCInboundHandler
這里transactionRequest.setTCInboundHandler(this),就是指定AbstractTransactionRequestToTC中的TCInboundHandler設定為DefaultCoordinator


AbstractTransactionRequest#handle()

不同的請求分發給對應的處理器去處理

現在請求和對應的處理器都有了,下面具體看一下每種請求都是如何被處理的
1. 開啟全域事務

開啟事務直接呼叫子類DefaultCoordinator#doGlobalBegin(),同時放在一個處理模板中執行

在doGlobalBean()中呼叫DefaultCore#begin()并回傳全域事務ID(xid)

new GlobalSession()

添加一個SessionManager作為Session的監聽器

Core

總結一下,開啟事務:
- 創建一個GlobalSession
- 給GlobalSession添加一個監聽器SessionManager
- session.begin()

開啟事務,創建一個全域事務,如果是seata.store.mode=db的話,向global_table表插入一條記錄
2. 分支事務注冊


DefaultCore#branchRegister()



如果是AT模式,這里呼叫的就是ATCore#branchSessionLock()
ATCore#branchSessionLock()檢查是否拿到鎖了






具體每種鎖的實作就不往下看了,挑其中一個看下,就RedisLocker吧

總之,分支注冊的時候需要檢查鎖,拿到本次事務中所涉及的所有需要加鎖的行的鎖才能注冊成功
所有行都加鎖成功,分支注冊才算成功,才會回傳true
再回到AbstractCore#branchRegister(),整個方法是放在SessionHolder#lockAndExecute()中執行的


總結一下,分支注冊:
- 創建一個BranchSession
- 加鎖,獲取所有行的鎖
- 將BranchSession加到GlobalSession中
- 回傳branchId

分支注冊,創建BranchSession,獲取全域鎖成功后將branchSession加入globalSession
3. 提交全域事務


首先判斷全域事務狀態是否為begin,如果不是則不應該提交,如果是,則將事務active置為false,釋放全域鎖,判斷是否可以異步提交,分支型別是AT的都可以異步提交,因此AT模式,默認是異步提交,如果不能異步提交,則采取同步提交,
3.1. 同步提交



遍歷所有已注冊的分支事務,向分支發送同步請求,告訴它全域事務開始提交了,不出意外的情況下回傳分支狀態是二階段提交成功,當所有分支都提交成功,則回傳true,于是全域事務提交成功,回傳全域事務狀態為已提交,如果有分支提交失敗,則回傳false,全域事務提交失敗,回傳全域狀態為提交失敗,如果拋例外了,則會有定時任務稍后重試提交,
3.2. 異步提交

異步提交只是將全域狀態置為異步提交中,剩下的事情交給定時任務去執行
啟動的時候呼叫了DefaultCoordinator#init()方法,啟動定時任務


每次,定時任務執行前,要先獲取一把分布式鎖,這個鎖是io.seata.core.store.DistributedLocker,不是分支注冊時的那把鎖io.seata.core.lock.Locker


異步提交首先將全域狀態設定為AsyncCommitting,回傳回傳全域狀態Committed,后臺有定時任務掃描,找到所有狀態為AsyncCommitting的全域事務,回圈遍歷,對于每個全域事務提交,呼叫DefaultCore#doGlobalCommit(),遍歷所有已注冊的分支事務,向分支事務發請求,通知其提交事務,分支事務回傳二階段提交成功,表示該分支事務提交成功,當所有分支事務都二階段提交成功,則全域事務提交成功,

4. 回滾全域事務






回滾,首先檢查全域事務狀態是否為Begin,不是的話直接結束,遍歷當前全域事務中已注冊的分支事務,依次給每個分支發請求,告訴分支事務需要回滾,如果所有分支回傳回滾成功,則全域回滾成功,如果有分支回滾失敗且不重試,則直接直接結束,如果失敗且可重試,或者執行程序中拋例外,則稍后會有定時任務重試回滾操作,

5. 分支上報
RM向TC報告分支事務狀態


只是更新一下分支狀態及相關資料
6. 查詢全域事務狀態

7. 查詢全域鎖



挑RedisLocker看一下吧

關于Seata Server 的原始碼學習就先到這里,歡迎交流,多謝點贊 (^_^)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/531816.html
標籤:其他
下一篇:day15-Servlet04
