目錄
- 一、Netty服務端的構建
- 1. 父類建構式
- ① 查找快取
- a) 匹配器
- ② 相關日志
- ① 查找快取
- 2. 服務端構造
- ① 配置讀取
- a) 私鑰讀取
- b) IP埠號
- c) 配置域資訊
- ② 服務端配置
- a) Netty的必要性
- b) Netty的實作原理
- c) 結合原始碼
- ① 配置讀取
- 3. 服務端功能
- ① 通用介面功能
- ② Channel處理器
- a) Channel生命周期
- b) 讀取資料
- 4. 節點通信層已完成
- 1. 父類建構式
- 二、CounterClient入口
- 1. 構建服務代理
- ① 視圖控制器
- a) 配置整合
- b) 視圖存盤
- c) 服務端視圖控制器
- d) 客戶端視圖控制器
- ② 啟動Netty客戶端
- a) 連接到指定節點
- b) Channel處理器
- ① 視圖控制器
- 2. Netty通信原理
- 3. 客戶端功能
- ① 通用介面功能
- a) 發送訊息準備
- b) 訊息簽名
- c) 發送訊息
- d) 關閉通道
- e) 更新連接(視圖)
- ② channel處理器
- a) 定時重連
- ③ 已完成內容
- ① 通用介面功能
- 4. 呼叫排序訊息
- 1. 構建服務代理
- 三、后記
關鍵字:Netty BFT-SMaRt Channel findCache KeyLoader Bootstrap NioEventLoopGroup ChannelFuture 視圖
Netty是目前最高效便捷的NIO框架,Netty可提供更加高可用、更好健壯性的穩定大規模連接的IO通道,任何一款區塊鏈早期的技術產品,都是從聯盟鏈開始演進,因為聯盟鏈降低了很多原教旨的難度,回到BFT-SMaRt,它的網路連接分為節點之間的連接,節點與客戶端之間的連接,節點之間的連接,我們在BFT-SMaRt:用Java位元組點間的可靠信道一文中詳細分析了在共識邏輯之前節點之間能夠做到的連接準備,那么,本文將繼續探索在BFT-SMaRt專案中,節點與客戶端之間的連接是如何實作的,
作為原始碼研究的起點,有兩個現成的入口:
- 服務端:ServerCommunicationSystem建構式的最后一個步驟,即clientsConn的創建,
- 客戶端:CounterClient類的入口命令,將本地作為客戶端對節點發起訪問請求,
一、Netty服務端的構建
首先構建服務端,轉到ServerCommunicationSystem建構式的最后一行,
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
這里采用了工廠模式的設計:構建一個controller基類,業務方可有多個實作類,在工廠get方法中傳入實作類物件,通過不同的實作類,回傳不同的處理物件,BFT-SMaRt并未有多個實作類,這里可以在上層業務方進行豐富,
public class CommunicationSystemServerSideFactory {
public static CommunicationSystemServerSide getCommunicationSystemServerSide(ServerViewController controller) {
return new NettyClientServerCommunicationSystemServerSide(controller);
} // 直接回傳NettyClientServerCommunicationSystemServerSide物件
}
直接回傳NettyClientServerCommunicationSystemServerSide物件,以下稱NettyClientServerCommunicationSystemServerSide類為Netty服務端類,
1. 父類建構式
直接進入NettyClientServerCommunicationSystemServerSide類的建構式,函式體內無super指定父類建構式,因此隱式呼叫父類SimpleChannelInboundHandler的無參建構式,
對于不熟悉繼承關系下建構式的執行順序的朋友,請自行補充上,
protected SimpleChannelInboundHandler() {
this(true);
}
父類的無參建構式指定了本地的有參構造,設定了本地屬性autoRelease為true,
protected SimpleChannelInboundHandler(boolean autoRelease) {
this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
接下來執行TypeParameterMatcher的find方法,find方法主要維護一個查找快取,包括構建和使用,
① 查找快取
該方法首先獲得并配置查找快取findCache:
Map<Class<?>, Map<String, TypeParameterMatcher>> findCache = InternalThreadLocalMap.get().typeParameterMatcherFindCache(); // InternalThreadLocalMap容器
Class<?> thisClass = object.getClass();
Map<String, TypeParameterMatcher> map = (Map)findCache.get(thisClass); // 型別引數匹配器
if (map == null) {
map = new HashMap();
findCache.put(thisClass, map);
}
查找快取會將熱度較高的內容優先快取,以增進查詢速度,
查找快取的容器結構是通過InternalThreadLocalMap來構建,注意從SimpleChannelInboundHandler開始,始終帶著泛型<I>進入,而本例中的泛型類為TOMMessage,該類是共識排序訊息類,將會在BFT-SMaRt共識部分展開介紹,那么,find方法會將泛型類放置到查找快取findCache中,
a) 匹配器
接下來,獲得并配置型別引數匹配器,也是用于增強查找,
TypeParameterMatcher matcher = (TypeParameterMatcher)((Map)map).get(typeParamName);
if (matcher == null) {
matcher = get(find0(object, parametrizedSuperclass, typeParamName));
((Map)map).put(typeParamName, matcher);
}
return matcher;
匹配器使用到Java的反射機制來查找類,
首先通過本地map查找型別引數匹配器,如果沒有查到,則初始構建,使用呼叫find時傳入的型別引數名,呼叫find0方法通過反射機制得到泛型類,然后呼叫get方法通過反射機制獲得對應匹配器,最后填充進匹配器map,共同構成查找快取findCache的內容,最后回顧一下findCache容器的結構,
Map<Class<?>, Map<String, TypeParameterMatcher>>
因此,一個類可以有多個對應不同型別引數名的匹配器,
② 相關日志
該快取的容器結構是InternalThreadLocalMap,類加載進入記憶體,首先執行static靜態方法,
static {
logger.debug("-Dio.netty.threadLocalMap.stringBuilder.initialSize: {}", STRING_BUILDER_INITIAL_SIZE);
STRING_BUILDER_MAX_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.maxSize", 4096);
logger.debug("-Dio.netty.threadLocalMap.stringBuilder.maxSize: {}", STRING_BUILDER_MAX_SIZE);
}
列印出日志,StringBuilder的初始化長度以及最大長度,日志輸出如下:
11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2. 服務端構造
回到NettyClientServerCommunicationSystemServerSide的建構式,首先是配置讀取及分析,通過組態檔獲得私鑰、IP、埠號、節點總數、節點id等資訊,
① 配置讀取
配置讀取可分為三方面:
- 私鑰讀取
- IP加埠號讀取處理
- 配置域資訊讀取
a) 私鑰讀取
Netty服務端類有一個私有屬性欄位privKey,用于存盤私鑰,以備后續簽名使用,
private PrivateKey privKey;
該欄位通過服務端類的建構式賦值,
privKey = controller.getStaticConf().getPrivateKey();
跳轉到Configuration類,呼叫getPrivateKey方法,私鑰內容是從配置域controller中獲取,
return keyLoader.loadPrivateKey();
keyLoader物件是在Configuration類構造時傳入,而Configuration類的構造要追蹤到其子類TOMConfiguration的建構式,繼續TOMConfiguration是在ViewController構造時呼叫,這部分內容將在CounterClient入口時展開,回到keyLoader,它是KeyLoader的實體,而KeyLoader有三個子類,
- RSAKeyLoader,適用于RSA類非對稱加密演算法簇的秘鑰加載,
- ECDSAKeyLoader,適用于ECDSA類非對稱加密演算法簇的秘鑰加載,全稱橢圓曲線數字簽名演算法,是ECC與DSA的結合,Java原生類別庫中在jdk1.7以后已經加入支持,
- SunECKeyLoader,適用于jdk自帶的sunEC加密秘鑰的加載,位于sun.security.ec.SunEC,
下面是他們的類圖關系,

b) IP埠號
接下來是從配置域中讀取節點服務器端的IP埠號,
// 獲取IP、埠號
String myAddress;
String confAddress = controller.getStaticConf().getRemoteAddress(controller.getStaticConf().getProcessId())
.getAddress().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(confAddress)) {
myAddress = InetAddress.getLoopbackAddress().getHostAddress();
}
else if (controller.getStaticConf().getBindAddress().equals("")) {
myAddress = InetAddress.getLocalHost().getHostAddress();
// 如果Netty系結到環回地址,客戶端將無法連接節點,為了解決這個問題,我們系結到config/hosts.config中提供的地址,
if (InetAddress.getLoopbackAddress().getHostAddress().equals(myAddress)
&& !myAddress.equals(confAddress)) {
myAddress = confAddress;
}
} else {
myAddress = controller.getStaticConf().getBindAddress();
}
int myPort = controller.getStaticConf().getPort(controller.getStaticConf().getProcessId());
這段讀取代碼與上一篇節點間通信如出一轍,但值得注意的是,配置域埠號是由兩項組成,我們再次查看配置域內容,
#server id, address and port (the ids from 0 to n-1 are the service replicas)
0 127.0.0.1 11000 11001
1 127.0.0.1 11010 11011
2 127.0.0.1 11020 11021
3 127.0.0.1 11030 11031
IP后面有兩個埠號,第一列為客戶端通信埠,第二列為節點間通信埠,就拿節點id為0的第一行舉例,本地作為節點服務,其他節點要通過(server <-> server)11001埠進行訪問,而其他客戶端需要通過(client <-> server)11000埠進行訪問,這一段在下面日志輸出代碼中也有體現,
logger.info("Port (client <-> server) = "
+ controller.getStaticConf().getPort(controller.getStaticConf().getProcessId()));
logger.info("Port (server <-> server) = "
日志列印:
14:36:19.223 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (client <-> server) = 11000
14:38:02.617 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (server <-> server) = 11001
c) 配置域資訊
最后就是配置中的其他資訊了,首先看代碼,然后看日志輸出,
logger.info("ID = " + controller.getStaticConf().getProcessId()); // 節點id
logger.info("N = " + controller.getCurrentViewN()); // 節點總數
logger.info("F = " + controller.getCurrentViewF()); // 節點最大容錯數
logger.info("requestTimeout = " + controller.getStaticConf().getRequestTimeout());
logger.info("maxBatch = " + controller.getStaticConf().getMaxBatchSize());
// 根據配置中是否使用簽名,列印不同的提示資訊
if(controller.getStaticConf().getUseSignatures() == 1) logger.info("Using Signatures");
else if (controller.getStaticConf().getUseSignatures() == 2) logger.info("Using benchmark signature verification");
logger.info("Binded replica to IP address " + myAddress);
// SSL/TLS 協議版本
logger.info("SSL/TLS enabled, protocol version: {}", controller.getStaticConf().getSSLTLSProtocolVersion());
系統配置中關于是否使用簽名的配置項,用于定義客戶端是否應該對訊息認證碼使用簽名,
system.communication.useSignatures
接下來相關日志輸出內容,
14:36:16.545 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - ID = 0
14:36:16.989 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - N = 4
14:36:17.230 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - F = 1
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - requestTimeout = 2000
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - maxBatch = 1024
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Binded replica to IP address 127.0.0.1
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - SSL/TLS enabled, protocol version: TLSv1.2
② 服務端配置
講到這里,想延伸討論一下Netty的必要性和實作原理,Netty的必要性可以從Socket(Socket之前的網路傳輸技術發展就不贅述了)說起,
a) Netty的必要性
一個Socket通常是由一個執行緒來管理,它實作了最初安全可靠的點到點IO通信,但當客戶端較多時,可能會耗盡服務端的執行緒資源,這是一種阻塞IO的模型,實踐程序中,大量的由執行緒維護的網路連接始終在監聽狀態而沒有資料傳輸,這是對于執行緒資源的浪費,
我們在上一篇關于節點間通信的研究中,使用的就是這種模型,然而那是基于節點數量不多的聯盟鏈場景,4節點的網路只需要6條執行緒即可承擔,不用實作復雜邏輯來處理大流量的資源維護,簡單穩定的阻塞IO模型顯然是更加適用的,但是,本文的研究重點轉向了客戶端通信,這就需要一個能夠處理大流量的新模型,
設想一種任務加執行緒池的方式,執行緒不再死盯著一條兩方參與的連接,而是被執行緒池統一管理起來,網路傳輸的作業會被放到任務中去,執行緒池通過調度機制領取任務并執行網路傳輸作業,在任務多的時候,調度邏輯會將任務排到一個佇列中去,然后根據調度機制,啟動對應規模的執行緒數量來控制處理任務的速度,每條執行緒執行完任務就會自動回歸到執行緒池可用資源庫,等待執行新的通信任務,這就是一種非阻塞IO的模型,
我們繼續延伸,所謂的“調度邏輯”是如何接收任務的,這里參考到linux的多路復用IO模型,即一個select可以通過順序掃描(輪詢)的方式監測多個通道是否有通信就緒的狀態,一旦有,就會啟動一個回呼函式將該通信內容封裝到任務容器,并排到佇列中去,回到函式會啟動執行緒池資源的實體來處理IO的作業,而select將該通信實體交出去以后,就可以釋放資源繼續監聽,
Netty就是對以上內容的封裝框架,更易于使用,
b) Netty的實作原理
Netty是基于事件驅動模式的、Reactor執行緒模型的,事件驅動是相對于主動輪詢提出的,主動輪詢是說主執行緒在不斷檢查是否有事件發生,如果有則呼叫事件處理函式,事件驅動仍舊是主執行緒去檢查事件的發生,當有事件時將事件放到一個佇列,同時還有一條執行緒在不斷的消費這個佇列,消費時呼叫事件的處理函式,事件驅動方式基于主動輪詢,又提出了一個執行緒專門作為事件消費物件,分擔了原主執行緒的作業內容,這取自觀察者模式,也更加符合單一職責原則,Reactor執行緒模型是一種分發機制,首先它會不斷的執行selector.select()方法,用來檢測并產生新的事件,然后分發事件給到適當的執行緒來處理,Reactor模型良好地實作了事件驅動理念,Netty應用Reactor執行緒模型,分為主從關系,主執行緒用于發現事件,從執行緒用于消費事件,
- bossGroup,執行緒池在bind一個埠以后回傳一條執行緒作為主執行緒,接收產生新事件,
- workerGroup,執行緒池用來消費事件,
Netty有一個Bootstrap概念,BoostStrap是引導程式的含義,通過引導程式,可以快速配置,串聯搭建起來一個Netty專案,其中ServerBootstrap是針對服務端的,Bootstrap是針對客戶端的,所以,包括但不限于以上兩個執行緒池的內容,全部被包含在Bootstrap的實體中,Netty同時也是一個異步框架,所有的操作包括系結、IO通信等都會回傳一個ChannelFuture物件,該物件可以判斷isDone,isSuccess,getCause,isCanceled,以及通過addListener加入監聽回呼,Channel是具體的Netty中用于處理通信的組件,針對不同的通信環境,都會有不同的Channel子類來處理,處理內容包括維持通道、連接配置引數、異步IO處理ChannelHandler、回傳ChannelFuture,Selector是Channel的管理器,輪詢器,可以管理Channel,另外,所有Group均為執行緒池的意思,而NioEventLoop的含義是一個維護佇列的執行緒,
c) 結合原始碼
接下來回到Netty服務端的原始碼配置,
ServerBootstrap b = new ServerBootstrap(); // 綜上所述,先構建一個服務端啟動程式,
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads); // 構建主執行緒池,初始容量為8條,用于監聽Accept事件,
EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); // 構建從執行緒池,初始容量為當前系統中所有的可用執行緒數量,
我們按流程構建了ServerBootstrap物件,然后構建了主執行緒池bossGroup,初始設定為8,最后構建了從執行緒池workerGroup,初始設定為系統當前所有可用執行緒數量,這也不難理解,因為實踐程序中,我們總會注意到事件的發現相較于事件的處理是更快速的,因此8條主執行緒可以覆寫事件發現的作業,而為了更高效使用機器性能,剩余的執行緒資源都用來事件的消費了,下面是BFT-SMaRt自定義的編解碼工具工廠,
sessionReplicaToClient = new ConcurrentHashMap<>(); // 并發主流容器,執行緒安全且高效的HashMap
rl = new ReentrantReadWriteLock(); // 可重入讀寫鎖
serverPipelineFactory = new NettyServerPipelineFactory(this, sessionReplicaToClient, controller, rl); // 本地開發的工具工廠,用于編解碼處理,
接下來將以上準備好的資源設定配置到ServerBootstrap,
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_SNDBUF, tcpSendBufferSize)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec)
.option(ChannelOption.SO_BACKLOG, connectionBacklog)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverPipelineFactory.getDecoder());
ch.pipeline().addLast(serverPipelineFactory.getEncoder());
ch.pipeline().addLast(serverPipelineFactory.getHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
首先通過ServerBootstrap的group方法配置主從執行緒池,然后配置channel類,接著配置一系列引數option,最后為事件消費執行緒配置引數和channel初始化initChannel,編輯碼部分不多展開(編解碼也是Netty的功能特色),主要是消費事件的處理類,設定為this,即當前服務端類,接下來給ServerBootstrap系結IP埠 ,
ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();
ServerBootstrap系結IP埠會回傳一個ChannelFuture物件,通過sync()阻塞等待系結完成回傳狀態給新建ChannelFuture物件f,最后,將f的channel賦值給服務端類的私有屬性Channel物件mainChannel,
mainChannel = f.channel();
3. 服務端功能
NettyClientServerCommunicationSystemServerSide類繼承自SimpleChannelInboundHandler<TOMMessage>,實作了CommunicationSystemServerSide介面,其中CommunicationSystemServerSide介面是BFT-SMaRt自定義的,主要用于描述客戶端通信中服務端的常用功能,
① 通用介面功能
下面進入CommunicationSystemServerSide介面,查看介面函式,
public interface CommunicationSystemServerSide {
public void send(int[] targets, TOMMessage sm, boolean serializeClassHeaders);
public int[] getClients();
public void setRequestReceiver(RequestReceiver requestReceiver);
public void shutdown();
}
其中send方法是網路通信中,節點給客戶端發送訊息的具體方法,訊息型別為TOMMessage,getClient方法會遍歷sessionReplicaToClient資料集合,將已建立的節點-客戶端會話連接中的客戶端統計出來并回傳一個整形陣列,目前該方法未被使用,setRequestReceiver是設定本地屬性requestReceiver,shutdown方法是關閉當前Netty系統,
② Channel處理器
前面介紹了,Bootstrap構建的Netty系統中的Channel處理器就是服務端類本身,我們回到該類的宣告部分,也看到了它是繼承自SimpleChannelInboundHandler<TOMMessage>,繼續追本溯源,它是ChannelHandler的子類,這符合處理器的宣告,
public ServerBootstrap childHandler(ChannelHandler childHandler)
下面,我們查看在服務端類中關于Channel處理器的重寫方法,也是包含4個方法:
- channelActive
- channelInactive
- exceptionCaught
- channelRead0
前三個方法都是來自于祖父類ChannelInboundHandlerAdapter,這三個方法是捕捉了channel的三個生命周期,方法體就是在不同的生命周期需要補充做的事,他們的實作更多像是一種標準,第四個方法來自于父類SimpleChannelInboundHandler,是核心的channel讀取資料的方法,
a) Channel生命周期
進入ChannelInboundHandlerAdapter類,該類完整地展示了Channel生命周期中的所有狀態,

一個Channel從最初注冊到Selector上面,變為活躍狀態,便可以讀取資料,期間可以更改可寫入能力,捕獲例外,觸發用戶事件,接著Channel可能變為不活躍狀態,Channel也可以隨時選擇從Selector上解除注冊,
服務端類對于Channel生命周期的3個實作,是一些常規處理和日志提醒,
b) 讀取資料
channelRead0方法是Channel讀取信道中資料的核心方法,
protected void channelRead0(ChannelHandlerContext ctx, TOMMessage sm) throws Exception {
if (this.closed) {
closeChannelAndEventLoop(ctx.channel());
return;
}
// 交付訊息到TOM層
if (requestReceiver == null)
logger.warn("Request receiver is still null!");
else
requestReceiver.requestReceived(sm);
}
closeChannelAndEventLoop是前面生命周期方法中常用的方法,用于清空資料、解除注冊、關閉channel,關閉相關執行緒,下面核心方法是呼叫了requestReceiver的requestReceived方法,requestReceiver是前面介紹的通用介面setRequestReceiver設定的,RequestReceiver介面目前只有一個實作類是TOMLayer,這也放在共識階段再研究,
4. 節點通信層已完成
到目前為止,結合前一篇文章,本地節點的服務端通信系統ServerCommunicationSystem就全部構建完成了,
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
super("Server Comm. System");
this.controller = controller;
messageHandler = new MessageHandler();
inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());
serversConn = new ServersCommunicationLayer(controller, inQueue, replica);
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
}
目前,節點服務部分重新回到bftsmart.tom.ServiceReplica#init方法,
cs = new ServerCommunicationSystem(this.SVController, this);
這一行代碼的內容通過這兩篇文章已經全部介紹完畢,從下一篇開始,繼續向下分析,將initTOMLayer()作為入口介紹節點服務中共識部分的實作原理,
二、CounterClient入口
前文通過一個章節的敘述,分析了BFT-SMaRt在節點客戶端通信程序中Netty服務端的構建,本章介紹另一方,即Netty客戶端的構建,計劃從CounterClient的main函式作為入口開始研究,
public static void main(String[] args) throws IOException {
if (args.length < 2) { // 引數個數校驗,最少2個
System.out.println("CounterClient <process id> <increment> [<operations>]");
System.out.println("if <increment> equals 0, read-only");
System.out.println("default <number of operations> equals 1000");
System.exit(-1);
}
// (1)通過節點id,建立客戶端服務代理
ServiceProxy counterProxy = new ServiceProxy(Integer.parseInt(args[0]));
try {
int inc = Integer.parseInt(args[1]); // 影響值
// 操作次數,默認1000次
int numberOfOps = (args.length > 2) ? Integer.parseInt(args[2]) : 1000;
for (int i = 0; i < numberOfOps; i++) { // 按照操作次數回圈
// 將影響值放入輸出流out
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
new DataOutputStream(out).writeInt(inc);
System.out.print("Invocation " + i);
// (2)呼叫實操方法,通過輸出流傳入影響值
byte[] reply = (inc == 0) ?
counterProxy.invokeUnordered(out.toByteArray()) :
counterProxy.invokeOrdered(out.toByteArray()); //magic happens here
if (reply != null) {
// 通過輸入流讀取回傳值
int newValue = https://www.cnblogs.com/Evsward/p/new DataInputStream(new ByteArrayInputStream(reply)).readInt();
System.out.println(", returned value: " + newValue);
} else {
System.out.println(", ERROR! Exiting.");
break;
}
}
} catch (IOException | NumberFormatException e) {
counterProxy.close(); // 關閉代理
}
}
該方法中比較重要的兩個步驟,
- 其一是為客戶端構建服務代理
- 其二是呼叫實操方法,消費影響值,
1. 構建服務代理
進入ServiceProxy類的建構式,
public ServiceProxy(int processId) {
this(processId, null, null, null, null);
}
跳轉到本地的五引數的建構式,
public ServiceProxy(int processId, String configHome,
Comparator<byte[]> replyComparator, Extractor replyExtractor, KeyLoader loader) {
// 代理服務初始化,包括網路通信、共識視圖、系統配置
if (configHome == null) {
init(processId, loader);
} else {
init(processId, configHome, loader);
}
// 構建一個TOMMessage陣列,大小為節點總數,
replies = new TOMMessage[getViewManager().getCurrentViewN()];
// 比較器,繼承自jdk工具Comparator,重寫方法實作可比較兩個位元組陣列是否相等的功能,
comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return Arrays.equals(o1, o2) ? 0 : -1;
}
};
// 匯出器,繼承自工具Extractor,可構建自定義的回應訊息提取器,
extractor = (replyExtractor != null) ? replyExtractor : new Extractor() {
@Override
public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) {
return replies[lastReceived];
}
};
}
主要代碼為初始化,由于當前configHome傳入為null,呼叫的兩個引數的init方法,由于ServiceProxy類是TOMSender的子類,因此呼叫的init方法是父類的方法,
public void init(int processId, KeyLoader loader) {
// 構建視圖控制器
this.viewController = new ClientViewController(processId, loader);
// 啟動Netty通信
startsCS(processId);
}
① 視圖控制器
ViewController類是系統的背景關系環境,是由系統配置項構建,以下是BFT-SMaRt關于視圖的類圖關系,

視圖層級的根節點是View類,它實作了Serializable介面,所以視圖都是可序列化的,視圖最基本的屬性就是id,容錯數,節點id陣列以及連接地址集合,在視圖控制器ViewController中,最終可以得到所有網路配置屬性及方法,
a) 配置整合
ViewController類是View子類,包含了更全面的屬性欄位,這些屬性的值來自于兩個渠道:
- 組態檔包括host.config以及system.config
- 配置類Configuration,
而TOMConfiguration類通過一個map容器Map<String, String>物件configs,有機地將以上兩個渠道的所有配置全部提取出來,在記憶體中構建了靜態配置物件staticConf,
b) 視圖存盤
繼續ViewController類的研究,視圖除了在記憶體中使用,也可以被持久化存盤在檔案中,這個能力來自于介面ViewStorage,該介面提供了兩個功能,
public interface ViewStorage {
public boolean storeView(View view); // 是否存盤成功
public View readView(); // 讀取視圖
}
目前該介面的實作只有DefaultViewStorage類,它可以將視圖物件通過物件輸出流寫入檔案保存在磁盤上,同時還可以從磁盤上通過物件輸入流將檔案資料恢復成記憶體中的View物件,
c) 服務端視圖控制器
根據上面的類圖,ServerViewController是ViewController的一個子類,作為共識節點服務端,它主要提供了共識方面的屬性功能,核心屬性如下:
private int quorumBFT; // ((n + f) / 2) replicas
private int quorumCFT; // (n / 2) replicas
private int[] otherProcesses;
private int[] lastJoinStet;
private List<TOMMessage> updates = new LinkedList<TOMMessage>();
private TOMLayer tomLayer;
quorumBFT是指在BFT網路中的有效確認數,同理quorumCFT則是在CFT網路中的有效確認數,lastJoinStet是用來記錄最后加入加點的,是指那些配置域以外的節點,可以是TTP,或者是陌生節點,需要重新配置reconfigure視圖引數,updates是共識訊息TOMMessage的容器,tomLayer是共識層的物件,ServerViewController物件是構建節點通信系統的引數,這是前面所遺漏的部分,在此補充上,
首先在節點服務類的構建中包含:
this.SVController = new ServerViewController(id, configHome, loader);
接著建構式內繼續執行init方法,會構建節點通信系統,
cs = new ServerCommunicationSystem(this.SVController, this);
第一個引數傳入的就是服務端視圖物件,該物件在構建節點通信系統時發揮了重要作用,例如讀取系統配置,判斷節點來源等等,那么后續的內容在上一篇博文中就已經非常詳細了,這里就到此為止,
d) 客戶端視圖控制器
我們回到TOMSender的init方法,構建客戶端視圖控制器,相對來講,ClientViewController的內容就很少了,它只有兩個建構式和兩個自有方法,
public ClientViewController(int procId, KeyLoader loader) {
super(procId, loader); // 初始化系統配置
View cv = getViewStore().readView(); // 從磁盤讀取視圖物件
// 呼叫reconfigureTo將視圖內容配置View屬性,
if(cv == null){
// 若未讀取成功,則通過配置引數構建新視圖
reconfigureTo(new View(0, getStaticConf().getInitialView(),
getStaticConf().getF(), getInitAdddresses()));
}else{
reconfigureTo(cv);
}
}
初始化配置,然后讀取視圖,通過ViewController的reconfigureTo方法(注意區分ServiceProxy也有一個reconfigureTo方法,要比這個方法復雜)替換視圖,
public void reconfigureTo(View newView) {
this.lastView = this.currentView; // 將當前視圖變為上一個視圖
this.currentView = newView; // 傳入的新視圖變為當前視圖
}
到此,客戶端的視圖控制器就構建完成了,
② 啟動Netty客戶端
前面第一章節已經詳細介紹了節點Netty服務端的構建,下面就開始啟動相對應的Netty客戶端,通過TOMSender類的startCS方法,可以注意到引數clientId在前面的名字是processId,該引數是用來標識客戶端的,而不是用來指定請求節點的,
private void startsCS(int clientId) {
this.cs = CommunicationSystemClientSideFactory.getCommunicationSystemClientSide(clientId, this.viewController);
this.cs.setReplyReceiver(this); // This object itself shall be a reply receiver
this.me = this.viewController.getStaticConf().getProcessId();
this.useSignatures = this.viewController.getStaticConf().getUseSignatures() == 1;
this.session = new Random().nextInt();
}
我們注意到,Netty服務端的構建只傳入了一個服務端視圖控制器ServerViewController物件,
public NettyClientServerCommunicationSystemServerSide(ServerViewController controller)
而Netty客戶端的構建傳入了客戶端id和客戶端視圖控制器ClientViewController物件兩個引數,
public NettyClientServerCommunicationSystemClientSide(int clientId, ClientViewController controller)
- NettyClientServerCommunicationSystemClientSide,后面簡稱為Netty客戶端類,
- NettyClientServerCommunicationSystemServerSide,前面介紹過了,簡稱為Netty服務端類,
下面進入到客戶端類的建構式,首先執行的父類建構式super(),由于Netty客戶端類和服務端類都是繼承自同一個父類SimpleChannelInboundHandler,因此可參照(一-1)父類建構式的內容,接下來,客戶端類并沒有主執行緒池而只有從執行緒池workerGroup,即bossGroup是Netty服務端類特有的,
this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
構建從執行緒池,初始容量為當前系統中所有的可用執行緒數量,接下來,私鑰的讀取也可參考(一-2-①-a),然后通過視圖物件獲取所有的節點id,
int[] currV = controller.getCurrentViewProcesses();
對節點id陣列進行遍歷,向每個節點發起連接請求,secretKeyFactory是加密工具組件,
ChannelFuture future = connectToReplica(replicaId, secretKeyFactory);
客戶端指定自身id,然后向共識網路發起請求,而不是指定節點,進入共識網路以后,會遍歷節點分別建立連接,這與理論部分中的邏輯圖是吻合的,

a) 連接到指定節點
進入connectToReplica連接到指定節點方法,
public synchronized ChannelFuture connectToReplica(int replicaId, SecretKeyFactory fac)
throws NoSuchAlgorithmException, InvalidKeySpecException, InvalidKeyException {
// 2端參與的連接認證密碼,暫未使用
String str = this.clientId + ":" + replicaId;
PBEKeySpec spec = TOMUtil.generateKeySpec(str.toCharArray());
SecretKey authKey = fac.generateSecret(spec);
// 配置啟動程式
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec);
b.handler(getChannelInitializer()); // 添加channel處理器
// 啟動連接到指定的節點replicaId,回傳ChannelFuture
ChannelFuture channelFuture = b.connect(controller.getRemoteAddress(replicaId));
// 快取連接會話到sessionClientToReplica,這是一個ConcurrentHashMap容器
NettyClientServerSession ncss = new NettyClientServerSession(
channelFuture.channel(), replicaId); // 構建Netty客戶端請求服務端的會話物件
sessionClientToReplica.put(replicaId, ncss);
return channelFuture;
}
相對Netty服務端來講,連接程序差不多但簡單很多,仍舊通過啟動程式Bootstrap完成快速構建,首先為Bootstrap添加了執行緒池workerGroup,然后指定了Channel型別為NioSocketChannel(注意區分服務端的channel型別為NioServerSocketChannel),接著配置引數,添加Channel處理器,然后,通過連接方法建立與指定節點的連接通信,
ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId));
這對應的是Netty服務端的,
ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();
客戶端的connect也可以加一個阻塞等待sync(),即:
ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId)).sync();
服務端成功系結了IP埠,就開始監聽該地址了,此時客戶端通過connect方法連接指定地址的節點,connect方法的引數是通過controller.getRemoteAddress(replicaId)回傳的SocketAddress型別物件,內容就是IP埠號,連接成功以后,客戶端ChannelFuture可以通過isSuccess()方法回傳true來得到結果,回到客戶端類的建構式,
future.awaitUninterruptibly();
將執行緒阻塞在這一行代碼,保持執行緒的通信可用性,直到Channel關閉,此處是通過捕捉connect方法拋出的例外而完成執行緒關閉,
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
} else {
this.validate();
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
}
connect方法中往下呼叫,會根據通道的不同情況拋出例外,
b) Channel處理器
注意觀察以上代碼,作為重要的組成部分,channel處理器是通過getChannelInitializer()構建的,
private ChannelInitializer getChannelInitializer() throws NoSuchAlgorithmException {
final NettyClientPipelineFactory nettyClientPipelineFactory = new NettyClientPipelineFactory(this,
sessionClientToReplica, controller, rl);
ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(nettyClientPipelineFactory.getDecoder());
ch.pipeline().addLast(nettyClientPipelineFactory.getEncoder());
ch.pipeline().addLast(nettyClientPipelineFactory.getHandler());
}
};
return channelInitializer;
}
這部分代碼與Netty服務端也很相似,最終Channel處理器也指向了客戶端類本身,
2. Netty通信原理
到此為止,我們構建了Netty服務端,也建立了客戶端并對服務端發起了連接請求,那么通過以上Netty相關的內容,下面通過一張圖來介紹Netty的通信原理,

首先由服務端啟動程式初始化channel,然后發起系結將channel注冊到主執行緒池的按順序的某一個執行緒的selector,Selector會輪詢channel的accept事件,這時候如果有客戶端啟動程式發起的connect連接觸發accept事件,該事件會執行一個任務并被放到任務佇列中去,等待消費,當runAllTasks消費到該任務,則建立起另一條channel并注冊到從執行緒池的按順序的某一個執行緒的selector,Selector會輪詢讀寫事件,Channel中當資料被接收完成,表示讀就緒就是讀事件;同樣的,Channel中當可以寫資料時,標識寫就緒就是寫事件,讀寫事件發生都會單獨執行一個任務并被放到任務佇列中去,等待任務消費,當runAllTasks消費到該任務,則會處理具體讀寫事件,
3. 客戶端功能
客戶端與節點之間通過Netty建立了通信,客戶端類實作了CommunicationSystemClientSide介面,與服務端同樣繼承了SimpleChannelInboundHandler<TOMMessage>,因此也可分為通用客戶端介面類和Channel生命周期方法,
① 通用介面功能
客戶端類實作了CommunicationSystemClientSide介面,該介面定義了客戶端節點通信中,節點應該具備的功能,
public interface CommunicationSystemClientSide {
public void send(boolean sign, int[] targets, TOMMessage sm);
public void setReplyReceiver(ReplyReceiver trr); // 設定ReplyReceiver欄位
public void sign(TOMMessage sm); // 未使用
public void close();
public void updateConnections();
}
a) 發送訊息準備
首先看send介面,在客戶端類的實作體中,send首先計算出基于BFT或CFT的最少確認數quorum,
int quorum;
Integer[] targetArray = Arrays.stream(targets).boxed().toArray(Integer[]::new);
Collections.shuffle(Arrays.asList(targetArray), new Random());
if (controller.getStaticConf().isBFT()) {
quorum = (int) Math.ceil((controller.getCurrentViewN() + controller.getCurrentViewF()) / 2) + 1;
} else {
quorum = (int) Math.ceil((controller.getCurrentViewN()) / 2) + 1;
}
listener.waitForChannels(quorum); // 等待前面的傳輸完成,收集足夠的訊息確認數
當共識要求的最少確認數達成以后,客戶端才可以發起請求,請求的型別是TOMMessage,被發送前要通過位元組陣列輸出流序列化得到訊息物件sm的序列化訊息serializedMessage,(這個在EOS的合約請求中也是常見的,data物件中除明文引數以外,還會有hex作為請求的序列化訊息,便于傳輸,)
if (sm.serializedMessage == null) {
DataOutputStream dos = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
dos = new DataOutputStream(baos);
sm.wExternal(dos);
dos.flush();
sm.serializedMessage = baos.toByteArray();
} catch (IOException ex) {
logger.debug("Impossible to serialize message: " + sm);
}
}
b) 訊息簽名
接下來,要通過本地私鑰給已序列化的訊息進行簽名,
sm.serializedMessageSignature = signMessage(privKey, sm.serializedMessage);
呼叫本地signMessage方法進行簽名,
public byte[] signMessage(PrivateKey key, byte[] message) {
try {
// 簽名引擎,用于簽名的工具,
if (signatureEngine == null) {
signatureEngine = TOMUtil.getSigEngine();
}
byte[] result = null;
// 載入私鑰 java.security.Signature.initSign(java.security.PrivateKey)
signatureEngine.initSign(key);
// 載入待簽名訊息 java.security.Signature.update(byte[])
signatureEngine.update(message);
// 執行簽名 java.security.Signature.sign()
result = signatureEngine.sign();
// 回傳簽名結果
return result;
} catch (Exception e) {
logger.error("Failed to sign message", e);
return null;
}
}
簽名的底層代碼是由jdk中java.security.Signature包所完成的,感興趣的小伙伴可深入研究,簽名后得到了訊息物件sm的serializedMessageSignature欄位的值,
c) 發送訊息
接下來,會遍歷發送終點,即所有的節點,每拿到一個節點,則將該節點作為目的地存入訊息物件sm的destination欄位,
sm.destination = targets[target];
接下來上鎖并獲得channel,
rl.readLock().lock();
Channel channel = ((NettyClientServerSession) sessionClientToReplica.get(targets[target])).getChannel();
rl.readLock().unlock();
sessionClientToReplica容器在前面建立Netty客戶端時談到過,是一個連接會話的容器,當再次獲取連接時不必重新構建,在這個容器中按照節點查找得到指定節點的連接channel,判斷如果該channel是可用的,則發送,
if (channel.isActive()) {
sm.signed = sign; // 簽名成功后會將該標示位sign置為true,
ChannelFuture f = channel.writeAndFlush(sm);
f.addListener(listener);
sent++; // 發送計數器,用于共識確認數,
}
將訊息寫入channel,然后為該處理添加監聽器,以便能捕捉處理狀態事件,該監聽器在Netty客戶端類構建時被賦值,
this.listener = new SyncListener();
SyncListener類是Netty客戶端類的內部類,它重寫了operationComplete事件,同時,增加了方法waitForChannels,用于共識機制中,收集回復確認數的相關通道的等待,
d) 關閉通道
close方法是用來將通道關閉的,
public void close() {
this.closed = true; // 設定關閉標志位
rl.readLock().lock(); // 上鎖
ArrayList<NettyClientServerSession> sessions = new ArrayList<>(sessionClientToReplica.values()); // 讀取連接會話容器中的所有連接
rl.readLock().unlock();
for (NettyClientServerSession ncss : sessions) { // 遍歷關閉
Channel c = ncss.getChannel(); // 從物件中獲取channel
closeChannelAndEventLoop(c); // 安全關閉channel以及EventLoop執行緒
}
}
e) 更新連接(視圖)
當有新的節點加入時,需要更新視圖,以便于客戶端遍歷節點發送訊息,因此會涉及修改Netty客戶端的連接,我們知道,Netty客戶端與各個節點的連接被放在了連接會話容器sessionClientToReplica,更新連接時,說明視圖已經更新完畢,那么要對視圖中所有節點進行遍歷,
int[] currV = controller.getCurrentViewProcesses();
for (int i = 0; i < currV.length; i++) {
...
}
遍歷每一個節點,然后判斷是否在sessionClientToReplica存在連接,如果存在說明是老節點,如果不存在說明是新節點,那么針對新節點,要建立新的連接并放到sessionClientToReplica,建立連接的方式與新建是相同的,
ChannelFuture future = connectToReplica(replicaId, secretKeyFactory);
future.awaitUninterruptibly();
參考前面(二-1-②-a),
② channel處理器
這部分主要是處理channel的生命周期,與Netty服務端的內容基本一致,也是四個方法:
- channelActive,標識
- channelInactive,呼叫scheduleReconnect
- channelUnregistered,呼叫scheduleReconnect
- exceptionCaught,輸出錯誤日志
這四個實作與Netty服務端完全一致,這里補充一下scheduleReconnect方法的內容,
a) 定時重連
scheduleReconnect顧名思義,是定時重連的含義,
private void scheduleReconnect(final ChannelHandlerContext ctx, int time) {
if (closed) { // 如果是已關閉狀態,則走關閉流程,
closeChannelAndEventLoop(ctx.channel());
return;
}
// 未關閉狀態,則定時重連,
final EventLoop loop = ctx.channel().eventLoop(); // 首先獲得eventLoop執行緒
loop.schedule(new Runnable() { // 為執行緒增加定時任務
@Override
public void run() {
reconnect(ctx); // 任務執行reconnect方法,
}
}, time, TimeUnit.SECONDS);
}
下面進入重連方法reconnect,這個方法在Netty服務端也有,與新建連接差不多,但是重連一般都會在sessionClientToReplica容器已存在,
public void reconnect(final ChannelHandlerContext ctx) {
rl.writeLock().lock();
ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>(
sessionClientToReplica.values());
for (NettyClientServerSession ncss : sessions) { // 遍歷連接
if (ncss.getChannel() == ctx.channel()) {
int replicaId = ncss.getReplicaId();
try {
if (controller.getRemoteAddress(replicaId) != null) {
ChannelFuture future;
try {
// 建立連接
future = connectToReplica(replicaId, secretKeyFactory);
} catch (InvalidKeyException | InvalidKeySpecException e) {
logger.error("Error in key.",e);
}
logger.info("ClientID {}, re-connection to replica {}, at address: {}", clientId, replicaId,
controller.getRemoteAddress(replicaId));
} else {
// 說明該節點已經洗掉,則從sessionClientToReplica洗掉連接,
removeClient(replicaId);
}
} catch (NoSuchAlgorithmException ex) {
logger.error("Failed to reconnect to replica", ex);
}
}
}
rl.writeLock().unlock();
}
建立連接時仍舊呼叫connectToReplica方法,參考前面(二-1-②-a),
③ 已完成內容
致此,CounterClient中通過節點id,建立客戶端服務代理的作業已完成,
4. 呼叫排序訊息
BFT-SMaRt中經常出現的TOM前綴的內容,一般我都會歸并到共識中去,這幾篇文章也未展開,那么TOM的含義是什么?其實就是Total ordered multicast的含義,也就是全排序多點廣播,這就是共識的一種體現,
byte[] reply = (inc == 0) ?
counterProxy.invokeUnordered(out.toByteArray()) :
counterProxy.invokeOrdered(out.toByteArray());
回到CounterClient,當影響值為0時,呼叫無序方法invokeUnordered,對應型別為TOMMessageType.UNORDERED_REQUEST,當影響值為其他值時,呼叫共識方法invokeOrdered,后續我的猜測是通過Netty服務端拿到這個值,然后通過ServerViewController的lastView的值加上影響值,然后變為currentView即可,回傳currentView的最新的值,這部分內容可以在下一篇詳細展開,
三、后記
經過本文以及前面幾篇BFT-SMaRt相關的文章,可靠信道的部分就全部介紹完了,后續會展開節點對于訊息的共識邏輯,以及視圖更換后狀態同步的邏輯的研究,
更多文章請轉到一面千人的博客園,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/17769.html
標籤:其他
