概述
網路資料的基本單位總是位元組,Java NIO 提供了 ByteBuffer 作為它的位元組容器,但這個類的使用過于復雜,Netty 的 ByteBuf 具有卓越的功能性和靈活性,可以作為 ByteBuffer 的替代品
Netty 的資料處理 API 通過兩個組件暴露 —— abstract class ByteBuf 和 interface ByteBufHolder,下面是 ByteBuf API 的優點:
- 可以被用戶自定義的緩沖區型別擴展
- 通過內置的復合緩沖區型別實作透明的零拷貝
- 容量可以按需增長
- 在讀和寫這兩種模式之間切換不需要呼叫 ByteBuffer 的 flip() 方法
- 在讀和寫使用了不同的索引
- 支持方法的鏈式呼叫
- 支持參考計數
- 支持池化
ByteBuf
1. 作業原理
ByteBuf 維護了兩個不同的索引:一個用于讀取,一個用于寫入,當你從 ByteBuf 讀取時,readIndex 會遞增已經被讀取的位元組數,同樣的,當你寫入 ByteBuf 時,它的 writeIndex 也會遞增,readIndex 和 writeIndex 的起始位置都為 0
如果 readIndex 和 writeIndex 的值相等,也即此時已經到了可讀取資料的末尾,就如同達到陣列末尾一樣,試圖讀取超出該點的資料將觸發一個 IndexOutOfBoundsException
名稱以 read 或 write 開頭的 ByteBuf 方法,將會推進其對應的索引,而名稱以 set 或 get 開頭的操作則不會
2. ByteBuf 的使用模式
2.1 堆緩沖區
最常用的 ByteBuf 模式是將資料存盤在 JVM 的堆空間中,這種模式被稱為支撐陣列(backing array)它能在沒有使用池化的情況下提供快速的分配和釋放,適合于有遺留的資料需要處理的情況
ByteBuf heapBuf = ...;
// 檢查 ByteBuf 是否有一個支撐陣列
if(heapBuf.hasArray()) {
// 獲取對該陣列的參考
byte[] array = heapBuf.array();
// 計算第一個位元組的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
// 獲得可讀位元組數
int length = heapBuf.readableBytes();
// 使用陣列、偏移量和長度作為引數呼叫你的方法
handleArray(array, offset, length);
}
2.2 直接緩沖區
直接緩沖區使用本地記憶體存盤資料,更適合用于網路傳輸,但相對于堆緩沖區,其分配和釋放都較為昂貴,另外,如果你正在處理遺留代碼,處理直接緩沖區內容時,你必須將其內容進行一次復制
ByteBuf directBuf = ...;
// 不是支撐陣列就是直接緩沖區
if(!directBuf.hasArray()) {
// 獲取可讀位元組數
int length = directBuf.readableBytes();
// 分配一個新的陣列來保存具有該長度的位元組陣列
byte[] array = new byte[length];
// 將位元組復制到該陣列
directBuf.getBytes(directBuf.readerIndex(), array);
// 使用陣列、偏移量和長度作為引數呼叫你的方法
handleArray(array, 0, length);
}
2.3 復合緩沖區
復合緩沖區為多個 ByteBuf 提供了一個聚合視圖,可以根據需要添加或洗掉 ByteBuf 實體,Netty 通過一個 ByteBuf 子類 —— CompositeByteBuf 實作這個模式,它提供了一個將多個緩沖區表示為單個合并緩沖區的虛擬表示
CompositeByteBuf 中的 ByteBuf 實體可能同時包含直接記憶體和非直接記憶體分配,如果其中只有一個實體,那么對 CompositeByteBuf 上的 hasArray() 方法的呼叫將回傳該陣列上的 hasArray() 方法的值,否則回傳 false
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
// 將 ByteBuf 實體追加到 CompositeByteBuf
messageBuf.addComponents(headerBuf, bodyBuf);
...
// 洗掉第位于索引位置為 0 的 ByteBuf
messageBuf.removeComponent(0);
// 回圈遍歷所有的 ByteBuf 實體
for(ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
位元組級操作
1. 隨機訪問索引
如同普通的 Java 位元組陣列一樣,ByteBuf 的索引是從零開始的:第一個位元組的索引是 0,最后一個位元組的索引總是 capacity() - 1
ByteBuf buffer = ...;
for(int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char) b)
}
這種需要一個索引值引數的方法訪問資料不會改變 readerIndex 也不會改變 writerIndex,如果需要改變,也可以通過呼叫 readerIndex(index) 或者 writerIndex(index) 來手動移動這兩者
2. 順序訪問索引
雖然 ByteBuf 同時具有讀索引和寫索引,但是 JDK 的 ByteBuf 卻只有一個索引,這也就是為什么必須呼叫 flip() 方法來在讀模式和寫模式之間進行切換的原因

3. 可丟棄位元組
可丟棄位元組的分段包含了已經被讀過的位元組,通過呼叫 discardReadBytes() 方法,可以丟棄它們并回收空間,這個分段的初始大小為 0,存盤在 readerIndex 中,會隨著 read 操作的執行而增加
可能你會想到頻繁呼叫 discardReadBytes() 方法以確保可寫分段的最大化,但這極有可能會導致記憶體復制,因為可讀欄位必須被移動到緩沖區的開始位置
4. 可讀位元組
ByteBuf 的可讀位元組分段存盤了實際資料,新分配的、包裝的或者復制的緩沖區的默認的 readerIndex 值為 0,任何名稱以 read 或者 skip 開頭的操作都將檢索或者跳過位于當前 readerIndex 的資料,并且將它增加已讀位元組數
如果嘗試在緩沖區的可讀位元組數已經耗盡時從中讀取資料,那么將會引發一個 IndexOutOfBoundsException
ByteBuf buffer = ...;
while(buffer.isReadable()) {
System.out.println(buffer.readByte());
}
5. 可寫位元組
可寫位元組分段是指一個擁有未定義內容、寫入就緒的記憶體區域,新分配的緩沖區的 writerIndex 的默認值為 0.任何名稱以 write 開頭的操作都將從當前的 writerIndex 處開始寫資料,并將它增加已經寫入的位元組數,如果寫操作的目標是 ByteBuf,并且沒有指定源索引的值,則緩沖區的 readerIndex 也同樣會被增加相同的大小
writeBytes(ByteBuf dest)
如果嘗試往目標寫入超過目標容量的資料,將會引發一個 IndexOutOfBoundException
ByteBuf buffer = ...;
while(buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
6. 索引管理
JDK 的 InputStream 定義了 mark(int readlimit) 和 reset() 方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置
同樣,可以通過 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex() 來標記和重置 ByteBuf 的 readerIndex 和 writerIndex
也可以通過 readerIndex(int) 或者 writerIndex(int) 來將索引移動到指定位置,任何試圖將索引設定到無效位置都將導致 IndexOutOfBoundsException
可以通過呼叫 clear() 方法來將 readerIndex 和 writerIndex 都設定為 0,這樣并不會清除記憶體中的內容,呼叫 clear() 比呼叫 discardReadBytes() 輕量得多,因為它只是重置索引
7. 查找操作
在 ByteBuf 中有多種可以用來確定指定值的索引的方法,最簡單的是 indexOf() 方法,較為復雜的查找可以通過那些需要一個 ByteBufProcessor 作為引數的方法達成,這個介面只定義了一個方法
boolean process(byte value);
它將檢查輸入值是否是正在查找的值,ByteBufProcessor 針對一些常見的值定義了許多便利方法
ByteBuf buffer = ...;
// 查找回車符 \r
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
8. 派生緩沖區
派生緩沖區為 ByteBuf 提供了以專門的方式來呈現其內容的視圖,這些視圖通過以下方法被創建
- duplicate()
- slice()
- slice(int, int)
- Unpooled.unmodifiableBuffer(...)
- order(ByteOrder)
- readSlice(int)
這些方法都將回傳一個新的 ByteBuf 實體,其內部存盤和 JDK 的 ByteBuffer 共享,這也意味著,如果你修改了它的內容,也即同時修改了其對應的源實體,如果需要一個現有緩沖區的真實副本,請使用 copy() 或 copy(int, int) 方法
// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 創建該 ByteBuf 從索引 0 到 15 結束的一個新切片
ByteBuf sliced = buf.slice(0, 15);
// 更新索引 0 處的位元組
buf.setByte(0, (byte) 'J');
// 成功,因為資料是共享的
assert buf.getByte(0) == sliced.getByte(0);
// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 創建該 ByteBuf 從索引 0 到 15 結束的一個新副本
ByteBuf sliced = buf.copy(0, 15);
// 更新索引 0 處的位元組
buf.setByte(0, (byte) 'J');
// 成功,因為資料不是共享的
assert buf.getByte(0) != sliced.getByte(0);
9. 讀/寫操作
有兩種類別的讀/寫操作:
- get() / set() 操作,從給定的索引開始,并且索引不會改變
- read() / write() 操作,從給定的索引開始,并且會根據已經訪問過的位元組數對索引進行調整
| 方法 | 描述 |
|---|---|
| setBoolean (int , boolean) | 設定給定索引處的 Boolean 值 |
| getBoolean(int) | 回傳給定索引處的 Boolean 值 |
| setByte(int index, int value) | 設定給定索引處的位元組值 |
| getByte(int) | 回傳給定索引處的位元組 |
| getUnsignedByte(int ) | 將給定索引處的無符號位元組值作為 short 回傳 |
| setMedium(int index , int value) | 設定給定索引處的 24 位的中等 int值 |
| getMedium(int) | 回傳給定索引處的 24 位的中等 int 值 |
| getUnsignedMedium (int) | 回傳給定索引處的無符號的 24 位的中等 int 值 |
| setint(int index , int value) | 設定給定索引處的 int 值 |
| getint (int) | 回傳給定索引處的 int 值 |
| getUnsignedint(int) | 將給定索引處的無符號 int 值作為 long 回傳 |
| setLong(int index, long value) | 設定給定索引處的 long 值 |
| getLong(int) | 回傳給定索引處的 long 值 |
| setShort(int index, int value) | 設定給定索引處的 short 值 |
| getShort(int) | 回傳給定索引處的 short 值 |
| getUnsignedShort(int) | 將給定索引處的無符號 short 值作為 int 回傳 |
| getBytes (int, …) | 將該緩沖區中從給定索引開始的資料傳送到指定的目的地 |
read/write 操作的 API 和 set/get 大同小異,只不過會增加索引值
ByteBuf 還提供了其他有用的操作
| 方法 | 描述 |
|---|---|
| isReadable () | 如果至少有一個位元組可供讀取,則回傳 true |
| isWritable () | 如果至少有一個位元組可被寫入,則回傳 true |
| readableBytes() | 回傳可被讀取的位元組數 |
| writableBytes() | 回傳可被寫入的位元組數 |
| capacity() | 回傳 ByteBuf 可容納的位元組數 ,在此之后,它會嘗試再次擴展直到達到maxCapacity () |
| maxCapacity() | 返問 ByteBuf 可以容納的最大位元組數 |
| hasArray() | 如果 ByteBuf 由一個位元組陣列支撐,則回傳 true |
| array () | 如果 ByteBuf 由一個位元組陣列支撐則返問該陣列;否則,它將拋出 一個 UnsupportedOperat工onException 例外 |
ByteBuf 分配
1. 按需分配
為了降低分配和釋放記憶體的開銷,Netty 通過 interface ByteBufAllocator 實作了 ByteBuf 的池化,用于分配 ByteBuf 實體
下面是 ByteBufAllocator 的一些 API
| 方法 | 描述 |
|---|---|
| buffer()buffer(int initialCapacity);buffer(int initialCapacity, int maxCapacity); | 回傳一個基于堆或者直接記憶體存盤的 ByteBuf |
| heapBuffer ()heapBuffer(int initialCapacity)heapBuffer(int initialCapacity, int maxCapacity) | 回傳一個基于堆記憶體存盤的 ByteBuf |
| directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity , int maxCapacity) | 回傳一個基于直接記憶體存盤的 ByteBuf |
| compositeBuffer()compositeBuffer(int maxNumComponents) compositeDirectBuffer()compositeDirectBuffer (int maxNumComponents); compositeHeapBuffer()compositeHeapBuffer(int maxNumComponents); | 回傳一個可以通過添加最大到指定數目的基于堆的或者直接記憶體存盤的緩沖區來擴展的 CompositeByteBuf |
| ioBuffer() | 回傳一個用于套接字的 I/O 操作的 ByteBuf,默認地, 當所運行的環境具有 sun.misc.Unsafe支持時,回傳基于直接記憶體存盤的 ByteBuf,否則回傳基于堆記憶體存盤的 ByteBuf;當指定使用 PreferHeapByteBufAllocator 時,則只會回傳基于堆記憶體存盤的 ByteBuf |
可以通過 Channel 或者系結到 ChannelHandler 的 ChannelHandlerContext 獲取一個 ByteBufAllocator 的參考
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
...
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator = ctx.alloc();
Netty 提供了兩種 ByteBufAllocator 的實作:PooledByteBufAllocator 和 UnpooledByteBufAllocator ,前者池化了 ByteBuf 實體以提供性能,最大限度減少記憶體碎片,后者不池化 ByteBuf 實體,每次呼叫都會回傳一個新的實體
2. Unpooled 緩沖區
如果你未能獲取 ByteBufAllocator 實體,Netty 也提供了名為 Unpooled 的工具類,它提供了靜態的輔助方法來創建未池化的 ByteBuf 實體
| 方法 | 描述 |
|---|---|
| buffer()buffer(int 工nitialCapacity)buffer(int initialCapacity, int maxCapacity) | 回傳一個未池化的基于堆記憶體存盤的ByteBuf |
| directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity, int maxCapacity) | 回傳一個未池化的基于直接記憶體存盤ByteBuf |
| wrappedBuffer() | 回傳一個包裝了給定資料的ByteBuf |
| copiedBuffer() | 回傳一個復制了給定資料的 ByteBuf |
3. ByteBufUtil 類
ByteBufUtil 提供了用于操作 ByteBuf 的靜態的輔助方法,因為這個 API 是通用的,并且和池化無關,所以這些方法已然在分配類的外部實作
這些靜態方法中最有價值的可能就是 hexdump() 方法,它以十六進制的表示形式列印 ByteBuf 的內容, 這在各種情況下都很有用,例如,出于除錯 的目的記錄 ByteBuf 的內容,十六進制的表示通常會提供一個比位元組值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的位元組表示
另一個有用的方法是 boolean equals(ByteBuf , ByteBuf),它被用來判斷兩個 ByteBuf 實體的相等性, 如果你實作自己的 ByteBuf 子類,你可能會發現 ByteBufUtil 的其他有用方法
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/287064.html
標籤:其他
