當網路中兩個行程需要通信時,我們往往會使用 Socket 來實作,Socket 都不陌生,當三次握手成功后,客戶端與服務端就能通信,并且,彼此之間通信的資料包格式都是二進制,由 TCP/IP 協議負責傳輸,
當客戶端和服務端取得了二進制資料包后,我們往往需要『萃取』出想要的資料,這樣才能更好的執行業務邏輯,所以,我們需要定義好資料結構來描述這些二進制資料的格式,這就是通信網路協議,簡單講,就是需要約定好二進制資料包中每一段位元組的含義,比如從第 n 位元組開始的 m 長度是核心資料,有了這樣的約定后,我們就能解碼出想要的資料,執行業務邏輯,這樣我們就能暢通無阻的通信了,
網路協議的設計
概要劃分
一個最基本的網路協議必須包含
- 資料的長度
- 資料
了解 TCP 協議的同學一定聽說過粘包、拆包 這兩個術語,因為TCP協議是資料流協議,它的底層根據二進制緩沖區的實際情況進行包的劃分,所以,不可避免的會出現粘包,拆包 現象 ,為了解決它們,我們的網路協議往往會使用一個 4 位元組的 int 型別來表示資料的大小,比如,Netty 就為我們提供了 LengthFieldBasedFrameDecoder 解碼器,它可以有效的使用自定義長度幀來解決上述問題,
同時一個好的網路協議,還會將動作和業務資料分離,試想一下, HTTP 協議的分為請求頭,請求體——
- 請求頭:定義了介面地址、
Http Method、HTTP版本 - 請求體:定義了需要傳遞的資料
這就是一種分離關注點的思想,所以自定義的網路協議也可以包含:
- 動作指令:比如定義
code來分門別類的代表不同的業務邏輯 - 序列化演算法:描述了
JAVA物件和二進制之間轉換的形式,提供多種序列化/反序列化方式,比如json、protobuf等等,甚至是自定義演算法,比如:rocketmq等等,
同時,協議的開頭可以定義一個約定的魔數,這個固定值(4位元組),一般用來判斷當前的資料包是否合法,比如,當我們使用 telnet 發送錯誤的資料包時,很顯然,它不合法,會導致解碼失敗,所以,為了減輕服務器的壓力,我們可以取出資料包的前4個位元組與固定的魔數對比,如果是非法的格式,直接關閉連接,不繼續解碼,
網路協議結構如下所示:
+--------------+-----------+------------+-----------+----------+
| 魔數(4) | code(1) |序列化演算法(1) |資料長度(4) |資料(n) |
+--------------+-----------+------------+-----------+----------+
RocketMQ 通信網路協議的實作
RocketMQ 網路協議
這一小節,我們從RocketMQ 中,分析優秀通信網路協議的實作,RocketMQ 專案中,客戶端和服務端的通信是基于 Netty 之上構建的,同時,為了更加有效的通信,往往需要對發送的訊息自定義網路協議,
RocketMQ 的網路協議,從資料分類的角度上看,可分為兩大類
- 訊息頭資料(Header Data)
- 訊息體資料(Body Data)

從左到右
-
第一段:4 個位元組整數,等于2、3、4 長度總和
-
第二段:4 個位元組整數,等于3 的長度,特別的
byte[0]代表序列化演算法,byte[1~3]才是真正的長度 -
第三段:代表訊息頭資料,結構如下
{
"code":0,
"language":"JAVA",
"version":0,
"opaque":0,
"flag":1,
"remark":"hello, I am respponse /127.0.0.1:27603",
"extFields":{
"count":"0",
"messageTitle":"HelloMessageTitle"
}
}
- 第四段:代表訊息體資料
RocketMQ 訊息頭協議詳細如下:
| Header 欄位名 | 型別 | Request | Response |
|---|---|---|---|
| code | 整數 | 請求操作代碼,請求接收方根據不同的代碼做不同的操作 | 應答結果代碼,0表示成功,非0表示各種錯誤代碼 |
| language | 字串 | 請求發起方實作語言,默認JAVA | 應答接收方實作語言 |
| version | 整數 | 請求發起方程式版本 | 應答接收方程式版本 |
| opaque | 整數 | 請求發起方在同一連接上不同的請求標識代碼,多執行緒連接復用使用 | 應答方不做修改,直接回傳 |
| flag | 整數 | 通信層的標志位 | 通信層的標志位 |
| remark | 字串 | 傳輸自定義文本資訊 | 錯誤詳細描述資訊 |
| extFields | HashMap<String,String> | 請求自定義欄位 | 應答自定義欄位 |
編碼程序
RocketMQ 的通信模塊是基于 Netty的,通過定義 NettyEncoder 來實作對每一個 Channel的 出堆疊資料進行編碼,如下所示:
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
...
}
}
}
其中,核心的編碼程序位于 RemotingCommand 物件中,encodeHeader 階段,需要統計出訊息總長度,即:
-
定義訊息頭長度,一個整數表示:占4個位元組
-
定義訊息頭資料,并計算其長度
-
定義訊息體資料,并計算其長度
-
額外再加 4是因為需要加入訊息總長度,一個整數表示:占4個位元組
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> 訊息頭長度,一個整數表示:占4個位元組
int length = 4;
// 2> 訊息頭資料
byte[] headerData;
headerData = https://www.cnblogs.com/OceanEyes/p/this.headerEncode();
// 再加訊息頭資料長度
length += headerData.length;
// 3> 再加訊息體資料長度
length += bodyLength;
// 4> 額外加 4是因為需要加入訊息總長度,一個整數表示:占4個位元組
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 5> 將訊息總長度加入 ByteBuffer
result.putInt(length);
// 6> 將訊息的頭長度加入 ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 7> 將訊息頭資料加入 ByteBuffer
result.put(headerData);
result.flip();
return result;
}
其中,encode 階段會將 CommandCustomHeader 資料轉換 HashMap<String,String>,方便序列化
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = https://www.cnblogs.com/OceanEyes/p/null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
特別的,訊息頭序列化支持兩種演算法:
JSONRocketMQ
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
這兒需要值得注意的是,encode階段將當前 RPC 型別和 headerData長度編碼到一個 byte[4] 陣列中,byte[0] 位序列化型別,
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
其中,通過與運算 & 0xFF 取低八位資料,
所以, 最終 length 長度等于序列化型別 + header length + header data + body data 的位元組的長度,
解碼程序
RocketMQ 解碼通過NettyDecoder來實作,它繼承自 LengthFieldBasedFrameDecoder,其中呼叫了父類LengthFieldBasedFrameDecoder的建構式
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
這些引數設定4個位元組代表 length總長度,同時解碼時跳過最開始的4個位元組:
frame = (ByteBuf) super.decode(ctx, in);
所以,得到的 frame= 序列化型別 + header length + header data + body data ,解碼如下所示:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//總長度
int length = byteBuffer.limit();
//原始的 header length,4位
int oriHeaderLen = byteBuffer.getInt();
//真正的 header data 長度,忽略 byte[0]的 serializeType
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = https://www.cnblogs.com/OceanEyes/p/new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
其中,getProtocolType,右移 24位,拿到 serializeType:
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
getHeaderLength 拿到 0-24 位代表的 headerData length:
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
小結
對于諸多中間件而言,底層的網路通信模塊往往會使用 Netty,Netty 提供了諸多的編解碼器,可以快速方便的上手,本文從如何設計一個網路協議入手,最終切入到 RocketMQ 底層網路協議的實作,可以看到,它并不復雜,仔細研讀幾遍變能理解其奧義,具體參考類NettyEncoder、NettyDecoder、RemotingCommand,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/5982.html
標籤:架構設計
上一篇:基于腳本引擎的運費架構分享
下一篇:實作介面冪等性的幾種方案
