背景
最近訊息中間件專案進行聯調,我負責Server端,使用Java的Netty框架,同事負責Client端,使用Go的net包,訊息使用Protobuf序列化,聯調時Client發送的訊息Server端決議出錯,經過分析發現是Server與Client粘包處理方式不一致導致,Server使用的是Protobuf提供的粘包處理方式,Client使用的是訊息頭定義長度的處理方式,探索一下Protobuf粘包處理方式有何不同,
編碼類
public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
@Override
protected void encode(
ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
int bodyLen = msg.readableBytes();
int headerLen = computeRawVarint32Size(bodyLen);
out.ensureWritable(headerLen + bodyLen);
writeRawVarint32(out, bodyLen);
out.writeBytes(msg, msg.readerIndex(), bodyLen);
}
/**
* Writes protobuf varint32 to (@link ByteBuf).
* @param out to be written to
* @param value to be written
*/
static void writeRawVarint32(ByteBuf out, int value) {
while (true) {
if ((value & ~0x7F) == 0) {
out.writeByte(value);
return;
} else {
out.writeByte((value & 0x7F) | 0x80);
value >>>= 7;
}
}
}
/**
* Computes size of protobuf varint32 after encoding.
* @param value which is to be encoded.
* @return size of value encoded as protobuf varint32.
*/
static int computeRawVarint32Size(final int value) {
if ((value & (0xffffffff << 7)) == 0) {
return 1;
}
if ((value & (0xffffffff << 14)) == 0) {
return 2;
}
if ((value & (0xffffffff << 21)) == 0) {
return 3;
}
if ((value & (0xffffffff << 28)) == 0) {
return 4;
}
return 5;
}
}
encode()方法
protected void encode(
ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
// 獲取訊息長度
int bodyLen = msg.readableBytes();
// 計算表示訊息體長度所需的位元組數量
int headerLen = computeRawVarint32Size(bodyLen);
// 拿到所有需要寫入的資料長度,對緩沖區進行擴容
out.ensureWritable(headerLen + bodyLen);
// 將表示訊息體長度的位元組寫入緩沖區
writeRawVarint32(out, bodyLen);
out.writeBytes(msg, msg.readerIndex(), bodyLen);
}
writeRawVarint32()方法
先看value & ~0x7F、(value & 0x7F) | 0x80、value >>>= 7這幾個看不懂的地方,&、|、~、>>>=這些符號為計算機的位運算子號,分別代表與、或、非、忽略符號位右移(a>>>=n 相當于 a = a>>>n)
計算value & ~0x7F
分別假設value值為100、200
100轉二進制為01100100,200轉二進制為11001000
計算100 & ~0x7F
| 十進制 | 十六進制 | 運算子 | 二 | 進 | 制 | |||||
|---|---|---|---|---|---|---|---|---|---|---|
| 100 | 0x64 | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | |
| -128 | ~0x7f | & | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 0 | 0x00 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
計算200 & ~0x7F
| 十進制 | 十六進制 | 運算子 | 二 | 進 | 制 | |||||
|---|---|---|---|---|---|---|---|---|---|---|
| 200 | 0xc8 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | |
| -128 | ~0x7f | & | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 128 | 0x80 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
這里運算結果使用十進制表示二進制是不準確的,僅作參考,需要根據資料型別進行轉換,比如:
10000000轉換為byte型別是-128,轉換為int是128
通過以上計算可以看出:
可以使用小于7個位表示的數字即可滿足條件,7個位可以表示$2^7=128$個數字,取值范圍是0~127,也就是說0~127可以滿足條件,這一步的目的是保證寫入表示訊息體長度的最后一位位元組是正數,后面會說到,
value=https://www.cnblogs.com/ChaseWindWu/p/100滿足條件,所以向bytebuf中寫入位元組01100100,然后return方法結束,
value=https://www.cnblogs.com/ChaseWindWu/p/200不滿足條件,那么看(value & 0x7F) | 0x80這一步運算,
計算(value & 0x7F) | 0x80
| 十進制 | 十六進制 | 運算子 | 二 | 進 | 制 | |||||
|---|---|---|---|---|---|---|---|---|---|---|
| 200 | 0xc8 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | |
| 127 | 0x7f | & | 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
| - | - | - | - | - | - | - | - | - | - | - |
| 72 | 0x48 | 0 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | |
| 128 | 0x80 | | | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| - | - | - | - | - | - | - | - | - | - | - |
| 200 | 0xc8 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 |
計算結果還是200,我們分析一下步驟:
value & 0x7f:取出最后七個位,|0x80:將首位轉為1
即取出最后7個位,高位補1,正好一個位元組的長度,將11001000寫入bytebuf,再看value >>>= 7,
計算value >>>= 7
| 十進制 | 運算子 | 二 | 進 | 制 | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 200 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | ||||||||
| >>> | 1 | 1 | 0 | 0 | 1 | 0 | 0 | 0 | ||||||||
| 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 |
忽略符號右移其實就是將后7位擠出去,在前邊補7個0,
計算機中一般首位是符號位,0表示正數,1表示負數,
這里需要注意是
>>表示右移,不改變符號,最高位與原來符號保持一致,>>>是忽略符號位右移,最高位補0,
200 >>>= 7的結果為00000001,繼續走if判斷,滿足條件,將00000001寫入bytebuf,
最終value=https://www.cnblogs.com/ChaseWindWu/p/200寫入bytebuf的位元組是11001000、00000001,
至此,三個看不懂的位運算都理解了,那么我們連起來看一下:
如果value可以用7個位元組表示(或者說是value在0~127范圍內),將value轉換為位元組寫入bytebuf,跳出回圈,方法結束,
如果value不能用7個位元組表示(或者說是value不在0~127范圍內),取最后7個位,高位補1,寫入bytebuf中,右移7位(將剛才取出的7位刪掉),再次判斷是否滿足if條件,不滿足就繼續上面的操作,直到滿足條件為止,
總結一下writeRawVarint32方法,其實是把一個整數拆分成多個位元組,倒序寫入bytebuf中,如果將每個位元組轉換為byte型別,最后一個位元組總是正數,前面的位元組都是負數,我們可以猜測,接收訊息時以第一個正數為分割,將表示訊息體長度的位元組與訊息體位元組拆分開,再通過位運算將前者組合起來就得到了訊息體的長度,
computeRawVarint32Size()方法
我們在writeRawVarint32方法分析中了解了位運算,再看computeRawVarint32Size方法就很簡單了,
計算機表示負數
0xffffffff轉換為二進制是11111111 1111111 11111111 11111111轉換為有有符號int型別是-1,為什么是-1?
因為計算機使用二進制可以做加法運算,但是沒辦法做減法運算,加上一個負數就相當于做了減法運算,現在問題是如何表示負數?
曾經有原碼表示法、反碼表示法,這里不做贅述,現在使用的是補碼表示法,
補碼表示法是將正數的二進制取反,然后在最后一位+1,
通過例子看一下:
有符號int型別的1用二進制可以表示為00000000 0000000 00000000 0000001取反得到11111111 11111111 11111111 11111110+1得到11111111 11111111 11111111 11111111轉換為十六進制是0xffffffff,
計算value & (0xffffffff << 7)
<<表示左移,從左邊擠出去7個位,在右邊補7個0,
這里仍然假設value分為為100,200,
// 計算(100 & (0xffffffff << 7))
00000000 0000000 00000000 01100100 // 100
& 11111111 1111111 11111111 10000000 // 0xffffffff << 7
00000000 0000000 00000000 00000000 // 結果:0
// 計算(200 & (0xffffffff << 7))
00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11111111 10000000 // 0xffffffff << 7
00000000 0000000 00000000 10000000 // 結果:128
// 計算(200 & (0xffffffff << 14))
00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11000000 00000000 // 0xffffffff << 14
00000000 0000000 00000000 00000000 // 結果:0
從以上計算可以看出,如果value可以用小于7個位來表示,則左移7個位可以滿足,如果value可以用8~14個位來表示,左移14個位可以滿足,
100、200計算結果分別為1、2,與writeRawVarint32方法寫入的位元組數量一致,
writeRawVarint32是方法7個7個的取出位,這里按7個位來計算所需位元組數量,最侄訓傳表示訊息體長度的位元組數量,
解碼類
public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
// TODO maxFrameLength + safe skip + fail-fast option
// (just like LengthFieldBasedFrameDecoder)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
in.markReaderIndex();
int preIndex = in.readerIndex();
int length = readRawVarint32(in);
if (preIndex == in.readerIndex()) {
return;
}
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
if (in.readableBytes() < length) {
in.resetReaderIndex();
} else {
out.add(in.readRetainedSlice(length));
}
}
/**
* Reads variable length 32bit int from buffer
*
* @return decoded int if buffers readerIndex has been forwarded else nonsense value
*/
private static int readRawVarint32(ByteBuf buffer) {
if (!buffer.isReadable()) {
return 0;
}
buffer.markReaderIndex();
byte tmp = buffer.readByte();
if (tmp >= 0) {
return tmp;
} else {
int result = tmp & 127;
if (!buffer.isReadable()) {
buffer.resetReaderIndex();
return 0;
}
if ((tmp = buffer.readByte()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 127) << 7;
if (!buffer.isReadable()) {
buffer.resetReaderIndex();
return 0;
}
if ((tmp = buffer.readByte()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 127) << 14;
if (!buffer.isReadable()) {
buffer.resetReaderIndex();
return 0;
}
if ((tmp = buffer.readByte()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 127) << 21;
if (!buffer.isReadable()) {
buffer.resetReaderIndex();
return 0;
}
result |= (tmp = buffer.readByte()) << 28;
if (tmp < 0) {
throw new CorruptedFrameException("malformed varint.");
}
}
}
}
return result;
}
}
}
readRawVarint32()方法
方法就不細看了,驗證一下我們之前的猜測,還是使用value=https://www.cnblogs.com/ChaseWindWu/p/200寫入的11001000、00000001位元組舉例看一下,
private static int readRawVarint32(ByteBuf buffer) {
if (!buffer.isReadable()) {
return 0;
}
buffer.markReaderIndex();
// 讀取第一個位元組
byte tmp = buffer.readByte(); // tmp = 11001000 首位是1,是個負數,小于0
// 判斷是否大于等于0,
// 大于等于0說明是最后一個表示訊息體長度的位元組,直接return
if (tmp >= 0) {
return tmp;
} else {
// 小于0 tmp & 127 取出后七位
int result = tmp & 127; // result = 11001000 & 01111111 = 01001000
if (!buffer.isReadable()) {
buffer.resetReaderIndex();
return 0;
}
// 再取第二個位元組,判斷是否大于等于0
if ((tmp = buffer.readByte()) >= 0) { //tmp = 00000001
// 這一步操作相當于是把上一步取出的7個位元組拿出來拼在tmp<<7的后面
result |= tmp << 7;
// result = 01001000 | tmp << 7
// tmp << 7 = 10000000
// 01001000 | 10000000 = 11001000 轉換為int型別是200
}
// 后面的代碼與以上步驟大同小異,不再贅述了
return result;
}
}
總結
涉及到的基礎知識:計算機表示整數、位運算、進制轉換
一般處理粘包的方式有三種:
-
定長訊息:每次發送訊息的長度固定,比如,總是發送100個位元組,
-
特殊符號分割:以特殊字符作為分隔符,讀到特殊字符時,認為上一條訊息結束,
-
訊息頭定義長度:在訊息體前增加訊息體的長度,一般使用四個位元組,讀取訊息時先讀取四個位元組,得到訊息體長度,再根據長度讀取訊息,
Netty Protobuf提供的處理粘包處理方式是在訊息體前加正負數,并且以第一個正數作為分割,可以說是訊息頭定義長度方式+特殊符號分割方式的結合版,巧妙利用二進制的位運算和計算機表示整數的特點實作動態訊息長度,發送較短訊息時可以比訊息頭定義長度的方式節省1-3個位元組,
博客小白的第一篇檔案,如有錯誤,還望指正,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/543374.html
標籤:Java
下一篇:第7章:例外處理
