引言
Hyperledger Fabric是當前比較流行的一種聯盟鏈系統,它隸屬于Linux基金會在2015年創建的超級賬本專案且是這個專案最重要的一個子專案,目前,與Hyperledger的另外幾個子專案Hyperledger Iroha,Hyperledger Indy和Hyperledger Sawtooth一樣,Hyperledger Fabric正處于生命周期中的活躍(active)階段,它的架構設計正在不斷地完善并持續為開發者和用戶提供更強大,更便捷的區塊鏈服務,
與主流的區塊鏈系統一樣,Hyperledger Fabric實際上也是個去中心化的分布式賬本,其總賬上的資料和交易記錄由網路中的多方節點共同維護,而且賬本上的記錄一旦被寫入將永遠無法被篡改,同時支持基于時間戳的交易追溯,然而,與當前成熟的位元幣,以太坊等公有鏈系統不同的是,Hyperledger Fabric是一種聯盟鏈系統,它的去中心化程度是受限的,即它只允許被授權的節點加入到區塊鏈網路中,更多地,Fabric還提供了創建通道的功能來進一步滿足不同聯盟方的實際需求,這進一步提高了區塊鏈系統的安全性和私密性,
區塊鏈系統中的交易處理和共識模塊是一個核心功能,它們為實作區塊鏈的主體功能發揮了核心作用,在接下來的內容中,為了讓讀者更好地了解Fabric中的共識模塊,我們首先簡要介紹了Hyperledger Fabric的架構設計,接著詳細分析了Fabric中的交易流程,最后結合Hyperledger Fabric的原始碼來深入了解Raft共識演算法及其在區塊鏈系統中的具體實作,
Hyperledger Fabric架構簡介
在Hyperledger Fabric系統的架構中,它引以為豪的一項設計就是采用了模塊化的設計思想,并且支持可插拔的組件開發,
具體地,Fabric的架構主要包括三個重要的組成模塊,即成員服務,區塊鏈服務以及合約代碼服務,以下將詳細介紹每個模塊包含的功能以及設計原理,
a) 成員服務模塊:成員服務之所以會單獨劃分為一個模塊主要是考慮到聯盟鏈的特殊性,即每個節點在進入區塊鏈系統之前都需要經過身份驗證,只有通過驗證的節點才能參與到系統中,成員服務提供成員的注冊,身份管理以及認證功能,保證了系統的安全性,便利了節點的權限管理,
b) 區塊鏈服務模塊:無論是在公有鏈系統還是在聯盟鏈系統,區塊鏈服務始終作為區塊鏈系統的核心組成部分,為區塊鏈主體功能提供底層的服務支撐,具體地,該模塊主要承擔了節點間的共識管理,賬本的分布式存盤,去中心化網路協議的具體實作等任務,
c) 合約代碼服務模塊:該模塊也不是Fabric系統獨有的,很多系統如以太坊等都具備部署智能合約的功能,在Hyperledger Fabric系統中,智能合約在Docker容器中運行,所以該模塊提供了一個智能合約的執行引擎為合約代碼程式提供了一個強大,便捷的部署運行環境,
根據以上的模塊劃分,Fabric的詳細架構如下圖1.1所示,

圖1.1 Hyperledger Fabric架構圖
Hyperledger Fabric中的交易流程介紹
在Hyperledger Fabric系統中,所謂的交易就是一次合約代碼的呼叫程序,包含兩種型別的交易,即部署交易和呼叫交易,
部署交易主要用來在Hyperledger Fabric區塊鏈中安裝合約代碼,具體地,它使用一個程式作為引數創建新的合約代碼,然后執行部署以完成合約的安裝,而呼叫交易簡單來說就是執行合約代碼,當成功地執行呼叫交易后,可以相應地修改賬本的狀態,并且為客戶端回傳輸出結果,不管是部署交易還是呼叫交易,只要在區塊鏈系統中執行后都會被打包成區塊,區塊鏈接在一起就組成了分布式賬本的區塊鏈,
Hyperledger Fabric區塊鏈系統中,交易主要可以分為三個階段,分別是提議階段,排序打包階段以及驗證提交階段,這里每個階段參與的節點的型別略有不同,設計到的技術原理也不同,具體地,下圖2.1詳細地描述了Hyperledger Fabric中的交易流程,

圖2.1 Hyperledger Fabric中的交易流程
2.1 提議階段
在Fabric的第一階段中,主要的作業流程是客戶端節點提交交易提案、背書節點模擬執行鏈碼、背書節點為交易提案進行背書、背書節點回傳背書結果給客戶端,
具體地,提議階段主要可以細分為以下幾個步驟:
-
客戶端首先構建交易的提案,提案的作用是呼叫通道中的鏈碼來讀取或者將資料寫入分布式賬本,客戶端打包交易提案,并使用用戶的私鑰對提案進行簽名,
-
應用端打包完交易提案后,便開始把提案提交給通道中的背書節點,通道的背書策略定義了哪些節點背書后交易才能有效,應用端根據背書策略選擇相應的背書節點,并向它們提交交易提案,
-
背書節點收到交易提案后,首先校驗交易的簽名是否合法,然后根據簽名者的身份,確認其是否具有權限進行相關交易,此外,背書節點還需要檢查交易提案的格式是否正確以及是否之前提交過,這樣做的目的是防止重放攻擊,
-
在所有合法性校驗通過后,背書節點按照交易提案,模擬呼叫鏈碼,鏈碼模擬執行時,讀取的鍵值對資料是節點中本地的狀態資料庫,需要注意的是,鏈碼在背書節點中是模擬執行,即對資料庫的寫操作并不會對賬本作改變,
-
在鏈碼模擬執行完成之后,將回傳模擬執行的回傳值、鏈碼讀取過的資料集和鏈碼寫入的資料集,讀操作集合和寫操作集合將在確認節點中用于確定交易是否最終寫入賬本,
-
背書節點把鏈碼模擬執行后得到的讀寫集等資訊使用其私鑰進行簽名(背書簽名)后發回給提案提交方即客戶端,

圖2.2 交易流程之提議階段
2.2 排序和打包交易階段
一般地,等客戶端收集到足夠多的背書節點回傳的回應提案的背書回應后,客戶端便會將交易提案、讀寫集和背書簽名等發送給排序節點,排序節點將會對自己接收到的交易資訊按照通道分類進行排序,且打包成區塊,
排序和打包交易階段可以細分為以下幾個子階段:
-
客戶端收到背書回應之后,檢查背書節點的簽名和比較不同節點背書的結果是否一致,如果提案是查詢賬本的請求,則客戶端無需提交交易給排序節點,如果是更新賬本的請求,客戶端在收集到滿足背書策略的背書回應數量之后,把背書提案中得到的讀寫集、所有背書節點的簽名和通道號發給排序節點,
-
排序節點在收到各個節點發來的交易后,并不檢查交易的全部內容,而是按照交易中的通道號對交易分類排序,然后把相同通道的一批交易打包成區塊,
-
排序節點把打包好的區塊廣播給通道中的所有成員,區塊的廣播有兩種觸發條件,一種是當通道的交易數量達到某個預設的閾值,另一種是在交易數量沒有超過閾值但距離上次廣播的時間超過某個特定閾值,也可觸發廣播資料塊,兩種方式相結合,使得排序過的交易可以及時廣播出去,

圖2.3 交易流程之排序和打包區塊階段
2.3 驗證和提交階段
最后,對于驗證和提交階段來說,擔負的職責就是驗證其收到的區塊,即驗證區塊中的背書簽名以及驗證交易的有效性,驗證成功后,Peer節點將更新賬本和世界狀態,
驗證和提交階段的詳細作業流如下:
-
節點收到排序節點發來的交易資料塊后,逐筆檢查區塊中的交易,先檢查交易的合法性以及該交易是否曾經出現過,然后呼叫 VSCC(Validation System Chaincode)的系統鏈碼檢驗交易的背書簽名是否合法,以及背書的數量是否滿足背書策略的要求,
-
接下來進行多版本并發控制 MVCC 的檢查,即校驗交易的讀集是否和當前賬本中的版本一致,如果沒有改變,說明交易寫集中對資料的修改有效,把該交易標注為有效,交易的寫集更新到狀態資料庫中,
-
如果當前賬本的資料和讀集版本不一致,則該交易被標注為無效,不更新狀態資料庫,資料塊中的交易資料在標注成"有效"或"無效"后封裝成區塊寫入賬本的區塊鏈中,
-
最后,節點會通過事件機制通知客戶端交易是否已經被加入區塊鏈以及交易是否有效,

圖2.4 交易流程之驗證和提交階段
Hyperledger Fabric中的共識演算法及其原始碼分析
對于Hyperledger Fabric系統來說,前一節分析的整個交易流程就是共識,通過這個交易處理流程,所有的Peer節點在由排序節點提供的流程中對交易的排序和根據交易打包成的區塊達成了一致,因此,我們可以知道排序服務是共識機制中最重要的一環,所有的交易都需要通過排序服務后才能達成全網節點的共識,
Hyperledger Fabric 的網路節點本質上是互相復制的狀態機,節點之間需要保持相同的賬本狀態,為了實作這個目的,各個節點需要通過共識程序,對賬本狀態的變化達成一致性的認同,
如何實作所有節點的共識可以說是去中心化的區塊鏈系統所面臨的最重要的問題之一,而共識機制又被稱為"區塊鏈的靈魂",所以,針對不同的區塊鏈系統選擇合適的共識演算法對于分布式系統保持一致性是至關重要的,
在區塊鏈領域,使用的比較多的共識演算法有大名鼎鼎的PoW共識演算法,PoS和DPoS等權益證明演算法,以及PBFT,RAFT等共識演算法,對于Hyperledger Fabric這類聯盟鏈系統,PoW和PoS等演算法并不適用,因為這類演算法實作共識的本質都是挖礦,雖然這類演算法具備完全去中心化和節點自由進出的優點,但是由于挖礦需要耗費大量的電力和CPU資源以及達成共識的周期很長,并不適用于商業的區塊鏈應用,因此,與現在大部分的聯盟鏈系統一樣,Hyperledger Fabric也將目光聚集在PBFT和RAFT等共識演算法上,
Fabric的共識服務設計成了可插拔的模塊,以此滿足了根據不同應用場景切換不同共識選項的需求,在Hyperledger Fabric最新版本中,Fabric系統的共識模塊中實作了三種共識演算法,其中包括Solo,Kafka以及Raft演算法,官方推薦的是使用Raft共識演算法,但是為了更好地理解Fabric中的共識模塊,我們也簡單介紹一下Solo和Kafka這兩種共識演算法,
-
solo共識:假設網路環境中只有一個排序節點,從Peer節點發送來的訊息由一個排序節點進行排序和產生區塊,由于排序服務只有一個排序節點為所有的peer節點服務,雖然可以肯定保證順序一致性,但是沒有高可用性和可擴展性,所以不適合用于生產環境,只能用于開發和測驗環境,
-
Kafka共識:Kafka是一個分布式的流式資訊處理平臺,目標是為實時資料提供統一的、高吞吐、低延遲的性能,Hyperledger Fabric之前版本的核心共識演算法通過Kafka集群實作,簡單來說,就是通過Kafka對所有交易資訊進行排序(如果系統存在多個通道,則對每個通道分別排序),
-
Raft共識:Raft是Hyperledger Fabric在1.4.1版本中引入的,它是一種基于 etcd 的崩潰容錯(CFT)排序服務,Raft 遵循 "領導者和追隨者" 模型,其中領導者在通道中的排序節點之間動態選出(這個節點集合稱為"consenter set"),該領導者將訊息復制到跟隨者節點,Raft保證即使在小部分(≤ (N-1)/2)節點故障的情況下,系統仍然能正常對外提供服務,所以Raft被稱為"崩潰容錯",
其實,Hyperledger Fabric在1.4.1版本以前,它的核心共識演算法通過Kafka集群實作,但是在1.4.1版本之后,Fabric推薦使用Raft演算法實作節點的共識,其實從提供服務的視角來看,基于Raft和Kafka的排序服務是類似的,他們都是基于CFT(crash fault tolerant)模型的排序服務,并且都使用了主從節點的設定,但是為什么Hyperledger Fabric選擇Raft演算法呢?我們列舉了Raft相較于Kafka所展現出的優勢來回答這個問題,
a. 第一點,Raft 更容易設定,雖然 Kafka 有很多崇拜者,但即使是那些崇拜者也(通常)會承認部署 Kafka 集群及其所必須的 ZooKeeper 集群會很棘手,需要在 Kafka 基礎設施和設定方面擁有高水平的專業知識,此外,使用 Kafka 管理的組件比使用 Raft 管理的組件多,Kafka 有自己的版本,必須與排序節點協調,而使用 Raft,所有內容都會嵌入到排序節點中,
b. 第二點,Kafka和zookeeper的設計不適用于大型網路,它們的設計是CFT模型,但局限于運行的比較緊密的主機上,也就是說,需要有一個組織專門運行Kafka集群,鑒于此,當有多個組織使用基于Kafka排序服務的時候,其實沒有實作去中心化,因為所有的節點連接的都是由一個組織單獨控制的Kafka集群,如果使用Raft演算法,每個組織可以貢獻排序節點,共同組成排序服務,可以更好的去中心化,
c. 第三點,Raft是原生支持的,而Kafka需要經過復雜的步驟部署,并且需要單獨學習成本,而且Kafka和Zookeeper的支持相關的issue要通過apache來處理,而不是Hyperledger Fabric,Raft的實作是包含在Fabric社區的,開發支持更加便利,
d. 第四點,Raft 是向開發拜占庭容錯(BFT)排序服務邁出的第一步,正如我們將看到的,Fabric 開發中的一些決策是由這個驅動的,Fabric使用Raft共識演算法是向BFT類演算法過渡的步驟,
鑒于以上對Kafka和Raft的分析比較,再考慮到目前最新版的Fabric中推薦采用的共識演算法,我們以下將詳細分析Raft共識演算法,而不是之前版本使用的Kafka,具體地,在3.1節中我將從理論角度闡述Raft共識演算法及共識流程,而在3.2節我則會從Fabric原始碼角度來進一步深入分析Raft演算法的實作及其與Fabric交易流程的整合,
3.1 Raft演算法理論
在分布式系統中,為了消除單點故障提高系統可用性,通常會使用副本來進行容錯,但這會帶來另一個問題,即如何保證多個副本之間的一致性?而所謂的一致性并不是指集群中所有節點在任一時刻的狀態必須完全一致,而是指一個目標,即讓一個分布式系統看起來只有一個資料副本,并且讀寫操作都是原子的,這樣應用層就可以忽略系統底層多個資料副本間的同步問題,也就是說,我們可以將一個強一致性分布式系統當成一個整體,一旦某個客戶端成功的執行了寫操作,那么所有客戶端都一定能讀出剛剛寫入的值,即使發生網路磁區故障,或者少部分節點發生例外,整個集群依然能夠像單機一樣提供服務,
為了實作一致性的目的,共識演算法基于狀態復制機模型來建模,所有的節點從一個相同的狀態(state)出發,經過相同的操作日志,最終達到一致的狀態,在眾多的共識演算法中,Paxos演算法可以說是一個最經典的共識演算法,也是公認的可以實作有效共識的演算法,然而,由于Paxos卻很少在實際架構中應用,因為它難以理解,更加難以實作,作者為了讓讀者更好地理解Paxos演算法,甚至為此專門發表了論文進行進一步解釋,其次,Paxos沒有提供一個足夠好的用來構建一個現實系統的基礎,而且它也并不是十分易于構建實踐性的系統,因此,現在的分布式系統通常使用Paxos的一種變種共識演算法,即Raft演算法,它的優點是容易理解,容易實作,這使得Raft得到更廣泛的普及和應用,
3.1.1 基本概念
Raft 使用 Quorum 機制(一種集群一致性和可用性之間的權衡機制)來實作共識和容錯,我們將對 Raft 集群的操作稱為提案(提案可以簡單理解為對集群的讀寫操作),每當發起一個提案,必須得到大多數(> N/2)節點的同意才能提交,
接下來,我們詳細介紹下Raft中涉及到的一些關鍵概念以及術語,
-
Leader:Leader負責提取新的日志條目,將它們復制到跟隨者訂購節點,以及管理何時認為條目已提交,在Hyperledger Fabric中,其中一個排序節點將擔任Leader,
-
Follower:Follower從Leader那里接收日志并確定性地復制它們,確保日志保持一致,Follower也會收到來自Leader的"心跳"資訊,如果Leader停止在可配置的時間內發送這些訊息,Follower將轉換為候選狀態,
-
候選狀態(candidate):處于候選狀態的節點會發起選舉,如果它收到集群中大多數成員的投票認可,就轉換為Leader,
-
日志條目:Raft排序服務中的主要作業單元是"日志條目",這些條目的完整序列稱為"日志",如果成員的多數(法定人數,換言之)成員到條目及其順序達成一致,我們認為日志是一致的,
-
有限狀態機(FSM):Raft中的每個排序節點都有一個FSM,它們共同用于確保各個排序節點中的日志序列是確定性的(以相同的順序撰寫),
-
Consenter設定:排序節點主動參與給定信道的共識機制并接收信道的復制日志,這可以是所有可用節點(在單個群集中或在對系統通道有貢獻的多個群集中),或者是這些節點的子集,
-
法定人數:描述需要確認提案的最少數量的同意者,以便可以提交交易,對于每個consenter集,這是大多數節點,在具有五個節點的群集中,必須有三個節點才能存在仲裁,如果由于任何原因導致法定數量的節點不可用,則排序節點將無法用于通道上的讀取和寫入操作,并且不能提交新日志,
-
任期:每開始一次新的選舉,稱為一個任期(term),每個 term 都有一個嚴格遞增的整數與之關聯,每當 candidate 觸發領匯入選舉時都會增加 term,如果一個 candidate 贏得選舉,他將在本任期中擔任 Leader 的角色,但并不是每個任期都一定對應一個Leader,有時候某個任期內會由于選舉超時導致選不出 Leader,這時candidate會遞增任期號并開始新一輪選舉,
在了解了Raft中的基本概念后,我們再來簡單了解一下Raft演算法的運行程序,
首先,Raft 集群必須存在一個主節點(Leader),沒有主節點集群就無法作業,我們作為客戶端向集群發起的所有操作都必須經由主節點處理,所以 Raft 核心演算法中的第一部分就是領匯入選舉,先票選出一個主節點,再考慮其它事情,其次,主節點負責接收客戶端發過來的操作請求,將操作包裝為日志同步給其它節點,在保證大部分節點都同步了本次操作后,就可以安全地給客戶端回應回應了,這在 Raft中被叫做日志復制,然后,因為主節點的責任是如此之大,所以節點們在領匯入選舉的時候一定要謹慎,只有符合條件的節點才可以當選主節點,此外主節點在處理操作日志的時候也一定要謹慎,為了保證集群對外展現的一致性,不可以覆寫或洗掉前任主節點已經處理成功的操作日志,所謂的"謹慎處理",其實就是在選主和提交日志的時候進行一些限制,這一部分在 Raft 共識演算法中叫安全性保證,
Raft 核心演算法其實就是由這三個子問題組成的:領匯入選舉、日志復制、安全性,這三部分共同實作了 Raft 核心的共識和容錯機制,
3.1.2 領匯入選舉
Raft集群中每個節點都處于Leader,Follower和Candidate三種角色之一,在領匯入選舉的程序中,節點的這些狀態將隨著選舉場景的不同而發生切換,接下來,我們將詳細剖析領匯入選舉的流程,
Raft 的領匯入選舉基于一種心跳機制,集群中每個節點剛啟動時都是 Follower 身份,Leader 會周期性的向所有節點發送心跳包來維持自己的權威,那么首個 Leader 是如何被選舉出來的呢?方法是如果一個 Follower 在一段時間內沒有收到任何心跳,也就是選舉超時,那么它就會主觀認為系統中沒有可用的 Leader,并發起新的選舉,
這里有一個問題,即這個"選舉超時時間"該如何制定?如果所有節點在同一時刻啟動,經過同樣的超時時間后同時發起選舉,整個集群會變得低效不堪,極端情況下甚至會一直選不出一個主節點,Raft 巧妙的使用了一個隨機化的定時器,讓每個節點的"超時時間"在一定范圍內隨機生成,這樣就大大的降低了多個節點同時發起選舉的可能性,
若Follower想發起一次選舉,Follower需要先增加自己的當前任期,并將身份切換為candidate,然后它會向集群其它節點發送"請給自己投票"的訊息,在此之后,系統中會出現三種可能的結果,
第一種,當前candidate節點選舉成功,當candidate從整個集群的大多數(N/2+1)節點獲得了針對同一任期的選票時,它就贏得了這次選舉,立刻將自己的身份轉變為Leader 并開始向其它節點發送心跳來維持自己的權威,每個節點針對每個任期只能投出一張票,并且按照先到先得的原則,這個規則確保只有一個 candidate會成為Leader,
第二種,當前candidate節點選舉失敗,candidate 在等待投票回復的時候,可能會突然收到其它自稱是Leader 的節點發送的心跳包,如果這個心跳包里攜帶的任期號不小于 candidate 當前的任期號,那么candidate 會承認這個Leader,并將身份切回 Follower,這說明其它節點已經成功贏得了選舉,我們只需立刻跟隨即可,但如果心跳包中的任期號比自己小,candidate會拒絕這次請求并保持選舉狀態,
第三種,選舉超時,如果有多個Follower 同時成為 candidate,選票是可能被瓜分的,如果沒有任何一個candidate 能得到大多數節點的支持,那么每一個 candidate都會超時,此時candidate 需要增加自己的任期號,然后發起新一輪選舉,如果這里不做一些特殊處理,選票可能會一直被瓜分,導致選不出Leader來,這里的"特殊處理"指的就是前文所述的隨機化選舉超時時間,
3.1.3 日志復制
前面我們也提到了,Raft共識演算法是基于狀態復制機(RPM)模型實作的,也就是說Raft需要保證集群中所有節點的日志log一致,在Raft模型中,Leader節點承擔了領導集群的任務,所有的日志都需要先交給Leader節點處理,并由Leader節點復制給其他節點(Follower),這個處理程序被稱為日志復制,
一旦Leader被集群中的節點選擇出來,它就開始接收客戶端請求,并將操作包裝成日志,并復制到其它節點上去,日志復制的整體流程如下:
-
Leader為客戶端提供服務,客戶端的每個請求都包含一條即將被RPM執行的指令,
-
Leader把該指令作為一條新的日志附加到自身的日志集合,然后向其它節點發起附加條目請求,來要求它們將這條日志附加到各自本地的日志集合,
-
當這條日志已經確保被安全的復制,即大多數(N/2+1)節點都已經復制后,Leader 會將該日志追加到它本地的狀態機中,然后把操作成功的結果回傳給客戶端,
各節點的每條日志除了存盤狀態機的操作指令外,還會擁有一個index值被用來表明它在日志集合中的位置,此外,每條日志還會存盤一個任期號,該任期號表示Leader收到這條指令時的當前任期,任期號相同的日志條目是由同一個Leader在其任期內發送的,當一條日志被Leader節點認為可以安全的應用到狀態機時,稱這條日志是committed,那么什么樣的日志可以被 commit 呢?只有當Leader 得知這條日志被集群過半的節點復制成功時,這條日志才可以被commit,Raft 保證所有 committed 日志都已經被持久化,且"最終"一定會被狀態機apply,
當集群中各節點都正常作業的時候,Raft演算法的這種日志復制機制可以保證一致性,那么當節點可能出現宕機等特殊情況下,Raft又是如何保持集群日志一致的呢?
Raft對于當節點出現意外情況宕機后出現的不一致問題也是有解決方法,但是這需要遵循一些規則,其中,最重要的一條就是,Raft 強制要求Follower必須復制Leader的日志集合來解決不一致問題,換句話說,Follower節點上任何與Leader不一致的日志,都會被Leader節點上的日志所強制覆寫,這并不會產生什么問題,因為某些選舉上的限制,如果Follower上的日志與Leader不一致,那么該日志在Follower上一定是未提交的,而未提交的日志并不會應用到狀態機,當然也不會被外部的客戶端感知到,
要使得Follower的日志集合跟自己保持完全一致,Leader 必須先找到二者間最后一次達成一致的地方,因為一旦這條日志達成一致,在這之前的日志一定也都一致,這個確認操作是在一致性檢查步驟完成的,Leader 針對每個Follower 都維護一個next index,表示下一條需要發送給該Follower的日志索引,當一個 Leader 剛剛上任時,它初始化所有next index值為自己最后一條日志的index+1,但凡某個Follower的日志跟Leader不一致,那么下次日志復制時的一致性檢查就會失敗,在被Follower 拒絕這次日志復制請求后,Leader會減少next index的值并進行重試,最終一定會存在一個next index使得Leader和Follower在這之前的日志都保持一致,
針對每個Follower,一旦確定了next index的值,Leader便開始從該 index 同步日志,follower會洗掉掉現存的不一致的日志,保留Leader 最新同步過來的,整個集群的日志會在這個簡單的機制下自動趨于一致,此外要注意,Leader 從來不會覆寫或者洗掉自己的日志,而是強制Follower與它保持一致,
3.1.4 安全性保證
前一節也提到了,為了保證集群的日志一致性,Raft 強制要求Follower必須復制Leader的日志集合來解決不一致問題,這樣做的前提是需要保證每一輪選舉出來的Leader具備"日志的正確性",這也就是前面著重強調的"選舉上的限制",
我們假設有以下的場景:
-
Leader 將一些日志復制到了大多數節點上,進行commit后發生了宕機,
-
某個Follower 并沒有被復制到這些日志,但它參與選舉并當選了下一任 leader,
-
新的Leader又同步并commit了一些日志,這些日志覆寫掉了其它節點上的上一任committed日志,
-
各個節點的狀態機可能apply了不同的日志序列,出現了不一致的情況,
僅僅依靠前面兩個小節提到的領匯入選舉和日志復制機制并不能保證在這種情況下的節點一致性,為了解決這類問題,Raft加上了一些額外的限制來保證狀態機的安全性和共識演算法的準確性,
Raft首先采取的一個措施就是增加了對選舉的限制,我們再來分析下前文所述的場景,根本問題其實發生在第2步,candidate 必須有足夠的資格才能當選Leader,否則它就會給集群帶來不可預料的錯誤,這需要增加一個判斷,即每個 candidate 必須在競選投票請求中攜帶自己本地日志的最新 (term, index),如果 Follower發現這個candidate 的日志還沒有自己的新,則拒絕投票給該candidate,candidate想要贏得選舉成為Leader,必須得到集群大多數節點的投票,那么它的日志就一定至少不落后于大多數節點,又因為一條日志只有復制到了大多數節點才能被commit,因此能贏得選舉的candidate一定擁有所有committed日志,而比較兩個 (term, index) 的邏輯非常簡單:如果任期號不同則任期號更大的日志更新,否則index大的日志更新,
其次,Raft規定了Leader只允許commit包含當前任期號的日志,所謂 commit 其實就是對日志簡單進行一個標記,表明其可以被 apply 到狀態機,并針對相應的客戶端請求進行回應,之所以有這個限制,Raft主要考慮到以下的場景:

圖3.1 Leader對不含當前任期號的日志進行commit引發的例外情況
a) S1是Leader,收到請求后將 (term2, index2) 只復制給了S2,尚未復制給S3 ~ S5,
b) S1宕機,S5當選term3的Leader(S3、S4、S5 三票),收到請求后保存了 (term3, index2),尚未復制給任何節點,
c) S5 宕機,S1恢復,S1重新當選term4的Leader,繼續將 (term2, index2) 復制給了 S3,已經滿足大多數節點,我們將其 commit,
d) S1又宕機,S5 恢復,S5 重新當選Leader(S2、S3、S4 三票),將 (term3, inde2) 復制給了所有節點并 commit,注意,此時發生了致命錯誤,已經 committed 的 (term2, index2) 被 (term3, index2) 覆寫了,
在上述場景中,問題的根源發生在階段c,即使作為term4 leader 的 S1 將 (term2, index2) 復制給了大多數節點,它也不能直接將其commit,而是必須等待term4的日志到來并成功復制后,一并進行commit,
為了解決這個問題,Raft規定了Leader只允許commit包含當前任期號的日志,在增加了這條限制后,我們再來看階段e,
e) 在添加了這個限制后,要么 (term2, index2) 始終沒有被 commit,這樣S5 在階段d將其覆寫就是安全的;要么 (term2, index2) 同 (term4, index3) 一起被 commit,這樣 S5 根本就無法當選 leader,因為大多數節點的日志都比它新,也就不存在上圖中出現的問題了,
在對Raft共識演算法增加了這兩個限制后,狀態機的安全性得到了極大的保證,更有效地實作了集群日志的一致性,
3.2 Fabric中Raft演算法的原始碼分析
其實,采用 Raft 的系統最著名的當屬etcd(一個高可用的分布式鍵值資料庫),一般認為etcd的核心就是 Raft 演算法的實作,作為一個分布式kv系統,etcd 使用Raft在多節點間進行資料同步,每個節點都擁有全量的狀態機資料,值得說明的是,Hyperledger Fabric對于Raft共識演算法的實作也是參考或者說基于etcd中已經實作的Raft演算法,這一點在Fabric的原始碼中也可以得到充分的體現,更重要的是,Fabric在原始碼中便將Raft模塊的實作命名為etcdraft,這進一步體現出Hyperledger Fabric中的Raft只是對etcd中的Raft做了一層封裝來實作聯盟鏈中的節點共識,
接下來,我將詳細介紹Hyperledger Fabric中對Raft共識演算法的實作與封裝細節,從這里我們也可以進一步了解到Raft演算法的細節,
3.2.1 Fabric中Raft原始碼的核心資料結構
從Fabric中的原始碼可以窺見,其底層呼叫了etcd已經實作的成熟的Raft演算法作為Fabric中共識演算法的核心,etcd中的Raft實作了領導者選舉,日志復制等核心操作,而把應用層相關的操作如節點間的通信以及存盤等交給上層應用層也就是這里的Hyperledger Fabric,
我們首先來看etcd/raft中涉及到的核心資料結構:Node介面和node結構體,
Node介面主要定義了一些Raft演算法實作所必須的方法,這也是根據Raft的理論模型來定義的,主要包括時鐘,選舉等操作,
// Node represents a node in a raft cluster.
type Node interface {
Tick() //時鐘的實作,選舉超時和心跳超時基于此實作
Campaign(ctx context.Context) error //參與Leader競爭
Propose(ctx context.Context, data []byte) error //在日志中追加資料,需要實作方保證資料追加的成功
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更
Step(ctx context.Context, msg pb.Message) error //根據訊息變更狀態機的狀態
//標志某一狀態的完成,收到狀態變化的節點必須提交變更,Raft底層的任何變動都會通知到這里
Ready() <-chan Ready
//進行狀態的提交,收到完成標志后,必須提交過后節點才會實際進行狀態機的更新,在包含快照的場景,為了避免快照落地帶來的長時間阻塞,允許繼續接受和提交其他狀態,即使之前的快照狀態變更并沒有完成,
Advance()
//進行集群配置變更
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
//變更leader
TransferLeadership(ctx context.Context, lead, transferee uint64)
//保證線性一致性讀,
ReadIndex(ctx context.Context, rctx []byte) error
//狀態機當前的配置
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
//上報節點的不可達
ReportUnreachable(id uint64)
//上報快照狀態
ReportSnapshot(id uint64, status SnapshotStatus)
//停止節點
Stop()
}
此外,node結構體也是etcd/raft中的一個核心資料結構,其主要定義了一系列的通道(go語言中的一種資料型別)來實作Raft中的資訊傳遞,而且節點的不同狀態之間的切換也是通過這些通道實作的,
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
以上的兩個資料結構是etcd/raft實作的,并不是Fabric自己實作的,Fabric只是單純地呼叫了它們來完成節點間的共識,但是要怎么在Fabric中實作與etcd/raft的有機整合就是Fabric需要考慮的事了,
Hyperledger Fabric對Raft演算法的核心實作代碼都是放在fabric/orderer/consensus/etcdraft包下的,這里主要包含幾個核心的資料結構,即Chain介面,Chain結構體和node結構體,
首先,Chain介面的定義在fabric/orderer/consensus/etcdraft/consensus.go檔案下,它主要定義了排序節點對接收到的客戶端發送來的訊息的處理操作,它的詳細定義如下:
// Chain defines a way to inject messages for ordering.
type Chain interface {
// 負責對普通交易訊息進行處理排序
Order(env *cb.Envelope, configSeq uint64) error
// 負責對配置交易訊息進行處理和排序,當排序服務在 BroadCast 介面收到訊息進行校驗和過濾之后,就交由對應 Chain 實體進行處理,
Configure(config *cb.Envelope, configSeq uint64) error
WaitReady() error
Errored() <-chan struct{}
// Start()負責啟動此 Chain 服務,
Start()
Halt()
}
其次,Chain結構體實作了Chain介面,它里面主要定義了一些通道(channel)用于節點間的通信,以便根據通信訊息做相應的操作,
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
rpc RPC // 節點與外部節點進行通信的物件,RPC 是一個介面,包含兩個方法SendConsensus 和 SendSubmit,前面這種用于節點間 raft 資訊的通訊,后者用于轉發交易請求給 leader 節點,
raftID uint64
channelID string
lastKnownLeader uint64
ActiveNodes atomic.Value
submitC chan *submit // 接收 Orderer 客戶端提交的共識請求訊息的通道
applyC chan apply // 接收 raft 節點間應用訊息的通道
observeC chan<- raft.SoftState
haltC chan struct{}
doneC chan struct{}
startC chan struct{}
snapC chan *raftpb.Snapshot //接收 raft 節點快照資料的通道
gcC chan *gc
…
Node *node // 封裝了底層 raft 庫的節點實體
…
}
最后,node結構體主要用于將Fabric自己實作的Raft上層應用和etcd的底層Raft實作連接起來,可以說node結構體是它們之間通信的橋梁,正是它的存在屏蔽了Raft實作的細節,
type node struct {
chainID string
logger *flogging.FabricLogger
metrics *Metrics
unreachableLock sync.RWMutex
unreachable map[uint64]struct{}
tracker *Tracker
storage *RaftStorage
config *raft.Config
rpc RPC
chain *Chain // 前面定義的Fabric自己實作的Chain結構體
tickInterval time.Duration
clock clock.Clock
metadata *etcdraft.BlockMetadata
subscriberC chan chan uint64
raft.Node // etcd底層的Raft中的節點介面
}
3.2.2 Fabric Raft機制的啟動程序原始碼分析
Raft的啟動入口位于fabric/orderer/consensus/etcdraft/chain.go檔案中,在Chain的Start()方法中會啟動etcdraft/node.go中的node.start(),而node.start()方法中進而啟動etcd已經封裝好的raft.StartNode()方法,
Chain中的Start()方法定義如下:
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
…
// 這里又啟動了etcdraft/node中的start方法
c.Node.start(c.fresh, isJoin)
close(c.startC)
close(c.errorC)
go c.gc()
go c.run()
es := c.newEvictionSuspector()
interval := DefaultLeaderlessCheckInterval
if c.opts.LeaderCheckInterval != 0 {
interval = c.opts.LeaderCheckInterval
}
c.periodicChecker = &PeriodicCheck{
Logger: c.logger,
Report: es.confirmSuspicion,
ReportCleared: es.clearSuspicion,
CheckInterval: interval,
Condition: c.suspectEviction,
}
c.periodicChecker.Run()
}
Chain中的Start方法主要完成了啟動etcdraft.Node端的回圈來初始化Raft集群節點,而且Chain里面通過呼叫c.run()實作了通過回圈處理客戶端和Raft底層發送的訊息,
我們再來看etcdraft.Node端的Start方法,它作為Chain端和raft/node端的橋梁,會根據Chain中傳遞的元資料配置資訊獲取啟動Raft節點的ID資訊,并且呼叫底層的Raft.StartNode方法啟動節點,并且像Chain端中一樣會啟動n.run()來回圈處理訊息,
func (n *node) start(fresh, join bool) {
…
var campaign bool
if fresh {// 是否是新節點標記位
if join {// 是否是新加入節點標記位
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")
} else {
n.logger.Info("Starting raft node as part of a new channel")
sha := sha256.Sum256([]byte(n.chainID))
number, _ := proto.DecodeVarint(sha[24:])
if n.config.ID == number%uint64(len(raftPeers))+1 {
campaign = true
}
}
// 呼叫raft/node中的啟動節點函式,初始化raft
n.Node = raft.StartNode(n.config, raftPeers)
} else {
n.logger.Info("Restarting raft node")
n.Node = raft.RestartNode(n.config)
}
n.subscriberC = make(chan chan uint64)
// run方法中會啟動一個回圈用來接收raft節點發來的訊息,在這里經過進一步處理后,轉發給Chain層進行處理,訊息的轉發機制都是通過通道來完成的,
go n.run(campaign)
}
最后,在etcdraft/node中啟動的raft.StartNode()表示進一步啟動了Raft底層的Node節點,在這里會進行Raft的初始化,讀取配置啟動各個節點以及初始化logindex等,與前面的啟動流程一樣,它同樣會開啟一個run方法以回圈的方法不斷監聽各通道的資訊來實作狀態的切換和做出相應的動作,
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
r.becomeFollower(1, None)
for _, peer := range peers {
// 將配置中給定的所有節點加入集群
…
}
//初始化logindex
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}
結合上述的原始碼分析,圖3.2更加詳細地描述了Raft的啟動流程,

圖3.2 Raft啟動流程圖
3.2.3 Fabric Raft機制的交易處理流程原始碼分析
我們在上一節已經根據原始碼仔細分析了Raft的啟動流程,接下來Fabric中的排序節點便可以開始接收交易并開始排序和打包成區塊了,這個交易處理流程可以說是Fabric中交易的核心,下面我們也跟著原始碼來詳細分析這部分的實作細節,
1. 提案的提交
首先,客戶端將會把已經背書的交易提案以broadcast請求的形式轉發給Raft集群的Leader進行處理,我們在第二節中也提到了,Fabric中的交易可以分為兩類,一類是普通交易,另一類是部署交易(也叫做配置交易),這兩類請求將分別呼叫不同的函式,即Order和Configure函式來完成交易提案的提交,
// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
2. 轉發交易提案到Leader
我們從上面的源代碼中可以注意到,不論是何種交易型別,里面都會呼叫Submit方法來提交交易提案,在Submit方法中,主要做的事就是將請求訊息封裝為結構體并且寫入指定的一個通道中(submitC)以便傳遞給Chain進行處理,此外,它還會判斷當前節點是否是Leader,如果不是,還會將訊息重定向給Leader節點,
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
…
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}: // 將訊息封裝并且寫入submitC通道
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID { // 當前節點不是Leader,則轉發訊息給Leader
if err := c.forwardToLeader(lead, req); err != nil {
return err
}
}
…
return nil
}
3. 對交易排序
前面也提到了,提案將被轉發給Leader,并且訊息被封裝為訊息結構體后寫入了submitC通道中傳遞到了Chain端,Chain端將不斷接收交易并將它們進行排序處理,
在ordered方法中,將根據不同型別的訊息執行不同的排序操作,對于接收到是通道配置訊息,比如通道創建、通道配置更新等,先呼叫ConsensusSupport對配置訊息進行檢查和應用,然后直接呼叫 BlockCutter.Cut() 對報文進行切塊,這是因為配置資訊都是單獨成塊;而對于普通交易訊息,則直接校驗之后,呼叫 BlockCutter.Ordered() 進入快取排序,并根據出塊規則決定是否出塊,
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]* common.Envelope, pending bool, err error) {
if c.isConfig(msg.Payload) {
// 配置訊息
…
batch := c.support.BlockCutter().Cut()
batches = [][]*common.Envelope{}
if len(batch) != 0 {
batches = append(batches, batch)
}
batches = append(batches, []*common.Envelope{msg.Payload})
return batches, false, nil
}
// 普通交易資訊
if msg.LastValidationSeq < seq {
…
}
batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
return batches, pending, nil
}
4. 打包區塊
交易訊息經c.ordered處理之后,會得到由BlockCutter回傳的資料包bathches(可打包成塊的資料)和快取是否還有資料的資訊,如果快取還有余留資料未出塊,則啟動計時器,否則重置計時器,這里的計時器由case timer.C處理,
接下來,將會呼叫propose方法來打包交易為區塊,propose會根據batches資料包呼叫createNextBlock打包出block ,并將block傳遞給c.ch通道(只有Leader具有propose的權限),而如果當前交易是配置資訊,還需要標記處當前正在進行配置更新的狀態,
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch) // 根據當前批次創建一個區塊
c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b: // 將block傳遞給c.ch通道,Leader可以通過這個通道收到這個區塊
default:
c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
}
// if it is config block, then we should wait for the commit of the block
if protoutil.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
}
5. Raft對區塊的共識
Leader將會前面說的區塊通過呼叫c.Node.Propose將資料傳遞給底層Raft狀態機,這里的Propose就是提議將資料寫入到各節點的日志中,這里也是實作節點間共識的入口方法,
Propose就是將日志廣播出去,要所有節點都盡量保存起來,但還沒有提交,等到Leader收到半數以上的節點都回應說已經保存完了,Leader這時就可以提交了,下一次Ready的時候就會帶上committedindex,
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
這里涉及到了Raft演算法的具體共識步驟,這里就不詳細深入了,這部分的內容將在3.2.4節深入剖析,
6. 保存區塊
經過Raft共識后,節點需要將區塊寫入到本地,這里Raft底層會通過通道的方式傳遞保存區塊到本地的訊息(即CommittedEntries不為空的訊息),在這里,Fabric通過實作apply方法完成了保存區塊的功能,
func (c *Chain) apply(ents []raftpb.Entry) {
…
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:// 如果是普通entry訊息
…
block := protoutil.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index) // 寫入區塊到本地
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:// 如果是配置entry訊息
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
…
if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}
}
在apply方法中,如果是普通entry,則會呼叫writeblock寫入區塊到本地,如果這個 block 是配置塊,則將配置塊寫入到 orderer 的賬本中,同時需要決議出其中的配置資訊,看看是否存在 raft 配置項和 raft 節點變動,如果存在變動,則呼叫 raft 狀態機的 ProposeConfChange 應用此變更,應用層也進行相關的資訊更新;如果是配置entry,決議出其中的配置更新資訊,先呼叫底層raft 狀態機的ApplyConfChange 應用此配置更新,
結合上述的原始碼分析,圖3.3以流程圖的形式更加詳細地描述了Raft的交易流程,

圖3.3 Raft交易原始碼分析流程圖
至此,Fabric中關于Raft機制的交易處理流程已經大致分析完成了,限于篇幅,我們僅著重分析了從交易提案提交到保存區塊到各節點的程序,而忽略了背書和驗證等流程的細節,這部分的內容與Fabric中Raft共識演算法的實作關系較小,就不在本文中詳細介紹相關的原始碼實作了,
3.2.4 Fabric Raft底層核心演算法實作細節原始碼分析
在前一節即3.2.3節中我已經從原始碼的角度詳細描述了Fabric中交易提案的提交,交易的打包和區塊的保存等核心內容,然而,前一節中對于Raft實作共識的細節并沒有涉及太多,這部分的內容Fabric本來就沒有自己去實作,而是呼叫的第三方(etcd)中已經實作好了的Raft演算法,Fabric做的只是實作了將發送提案到Leader以及保存共識后的區塊這些應用層的功能以及實作了與Raft集群底層的訊息互動,為了更好地理解Raft的精髓,我們還是不得不進入到etcd的Raft原始碼中一探究竟,
3.2.4.1 領導者選舉
當Follower節點發現Leader的心跳超時,會觸發etcd/raft/node.go檔案中的run函式中的tickc信道,通過呼叫tickElection函式實作了超時選舉的功能,
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
超時選舉函式中呼叫Step函式,發送MsgHup訊息,并呼叫campaign函式發布競選訊息,在campaign函式中,節點會將自己的Follower狀態設定為candidate狀態,與此同時遞增任期號,最后candidate節點將會向其他節點發送競選訊息,
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1 // 當前任期號自增一
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
// 如果集群是單節點,那節點將投票給自己,則獲取的票數一定超過了一半,當選為Leader
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
// 向其他節點發送競選領導者的訊息
for id := range r.prs {
if id == r.id {
continue
}
…
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
其他節點通過Step函式實作對競選訊息的判斷,并依據相應的判斷決定是否給candidate節點投票,其中投票的判斷邏輯主要分兩步,第一步,如果投票資訊中的任期號小于自身的任期號,則直接回傳nil,不予投票回應,第二步,通過和本地已存在的最新日志做比較來判斷,首先看訊息中的任期號是否大于本地最大任期號,如果是則投票,否則如果任期號相同則要求競選訊息中有最大的日志索引,
func (r *raft) Step(m pb.Message) error {
switch {
case m.Term > r.Term:// 節點只會投票給任期號大于自己任期號的candidate
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
….
return nil
}
}
case m.Term < r.Term:
return nil
}
switch m.Type {
case pb.MsgVote, pb.MsgPreVote: // 如果candidate擁有最新的日志則發送投票給該candidate
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
…
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {// 否則當前節點會拒絕給此次參與領導者選舉的candidate投票
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
}
return nil
}
candidate節點收到其他節點的回復后,判斷獲取的票數是否超過半數,如果是則設定自身為Leader,否則還是設定為follower,說明本輪競選領導者失敗,
func stepCandidate(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
…
case myVoteRespType:// 統計投票結果
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr:// 判斷票數是否超過半數
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()// 如果票數超過一般則當選為Leader
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
…
}
return nil
}
最后,結合上述原始碼分析,圖3.4更加詳細地描述了Raft的領導者選舉流程,

圖3.4 Raft領導者選舉流程圖
3.2.4.2 日志復制
在3.2.3節中我們也分析了,對于Leader中生成的塊,Leader會呼叫etcd的Node介面中的Propose方法來提交寫日志請求,Propose 內部具體呼叫stepWithWaitOption實作日志訊息的傳遞,并阻塞/非阻塞地等待結果的回傳,
Leader節點呼叫appendEntry將訊息追到Leader的日志之中,但不進行資料的commit,之后呼叫bcastAppend 將訊息廣播至其他follower節點,
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
…
case pb.MsgProp:
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
if !r.appendEntry(m.Entries...) {// appendEntry將訊息追到Leader的日志之中
return ErrProposalDropped
}
r.bcastAppend()/ /bcastAppend 將訊息廣播至其他Follower節點,
return nil
}
}
Follower節點接收到請求后,會呼叫handleAppendEntries函式來判斷是否接受Leader提交的日志,判斷邏輯如下:如果Leader提交的日志index小于本地已經提交的日志index則將本地的index回復給Leader,查找追加的日志和本地log的沖突,如果有沖突,則先找到沖突的位置,用Leader的日志從沖突位置開始進行覆寫,日志追加成功后,回傳最新的日志index至Leader,如何任期資訊不一致,則直接拒絕Leader的追加請求,
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {// 當前Follower追加日志,可能存在沖突的情況,需要找到沖突的位置用Leader的日志進行覆寫
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {// 如果兩者的任期資訊不一致,當親節點拒絕此次追加日志請求,并把最新的日志index回復給Leader,便于進行追加
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
當Leader接收到Follower的回應后,針對拒絕和接收的兩個場景有不同的處理邏輯,這也是保證follower一致性的關鍵環節,
-
當Leader 確認Follower已經接收了日志的append請求后,則呼叫maybeCommit進行提交,在提交程序中確認各個節點回傳的matchindex,排序后取中間值比較,如果中間值比本地的commitindex大,就認為超過半數已經認可此次提交,可以進行commit,之后呼叫sendAppend向所有節點廣播訊息,follower接收到請求后呼叫maybeAppend進行日志的提交,
-
如果Follower拒絕Leader的日志append請求,Leader接收到拒絕請求后會進入探測狀態,探測follower最新匹配的位置,
1) func stepLeader(r *raft, m pb.Message) error {
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {// Follower發送的是拒絕append的回應
if pr.maybeDecrTo(m.Index, m.RejectHint) {
if pr.State == ProgressStateReplicate {
pr.becomeProbe() // 進入試探append階段, 繼續探測follower最新匹配的位置,
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()// 日志追加成功,狀態轉換為復制狀態
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
if r.maybeCommit() {// 如果超過半數已經認可此次提交,Leader可以進行commit
r.bcastAppend()// 廣播通知所有Follower進行日志的提交
} else if oldPaused {
r.sendAppend(m.From)
}
}
}
}
最后,結合上述原始碼分析,圖3.5以流程圖的形式更加詳細地描述了Raft的日志復制流程,

圖3.5 Raft日志復制原始碼分析流程圖
至此,我們已經基本通過原始碼來進一步對Raft共識演算法進行分析和理解,特別是該演算法的一些實作細節,需要說明的是,在Raft原始碼分析這一節,我并沒有將如何保證安全性這一特性單獨拿出來分析,這是與我們前面的理論部分不同的一個地方,因為Raft共識的安全性主要是通過給演算法添加一些限制條件來保證的,這些特性最終都能在領導者選舉和日志復制這兩部分的原始碼內容中得到體現,所以在原始碼分析階段沒有必要單獨成節,
總結
本次原始碼與結構分析我選擇的目標區塊鏈系統是Hyperledger Fabric,而我選擇Fabric的原因主要是因為在當前已有的較為成熟的聯盟鏈中,Fabric可以說是最受歡迎的也是應用最廣泛的一個區塊鏈系統,而且它還是現有其他聯盟鏈實作的基礎,很多其他聯盟鏈中都能看到Fabric的設計原理,
本文首先介紹了Fabric中的交易的基本流程,其主要分成提議、排序打包以及驗證提交這三個階段,交易可以說是區塊鏈系統的核心功能,而與其他區塊鏈系統的交易有很大的不同的是,Fabric更加注重交易的隱私性和安全性,通過引入背書機制來加強這些特性,其次,說到交易流程就不得不涉及到共識,因為Fabric也是一個去中心化的分布式賬本,需要完成去中心化系統中各節點對交易的共識才能保證系統的一致性,Fabric提供了可插拔的共識組件,允許用戶選擇不同的適用于不同場景的共識演算法,Fabric官方推薦的是Raft共識演算法,該演算法目前也是比較成熟的一個共識演算法,它是Paxos演算法的一種延伸實作,可以容忍少部分的節點崩潰,本文基于Fabric開源的源代碼對它的交易流程和共識流程都做了詳細的理論分析,特別是針對Fabric中Raft共識模塊,本文花費了大量的篇幅從源代碼出發來詳細剖析它的實作原理,
Either Excellent or Rusty轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/458012.html
標籤:區塊鏈
上一篇:c 空模板函式回傳非空函式指標
