編解碼器
每個網路應用程式都必須定義如何決議在兩個節點之間來回傳輸的原始位元組,以及如何將其和目標應用程式的資料格式做相互轉換,這種轉換邏輯由編解碼器處理,編解碼器由編碼器和解碼器組成,它們每種都可以將位元組流從一種格式轉換為另一種格式
- 編碼器將訊息轉換為適合于傳輸的格式(最有可能的就是位元組流)
- 解碼器則是將 網路位元組流轉換回應用程式的訊息格式
因此,編碼器操作出站資料,而解碼器處理入站資料
1. 解碼器
在這一節,我們將研究 Netty 所提供的解碼器類,并提供關于何時以及如何使用它們的具體示例,這些類覆寫了兩個不同的用例:
- 將位元組解碼為訊息 —— ByteToMessageDecoder 和 ReplayingDecoder
- 將一種訊息型別解碼為另一種 —— MessageToMessageDecoder
什么時候會用到解碼器呢?很簡單,每當需要為 ChannelPipeline 中的下一個 ChannelInboundHandler 轉換入站資料時會用到,此外,得益于 ChannelPipeline 的設計,可以將多個解碼器鏈接在一起,以實作任意復雜的轉換邏輯
1.1 抽象類 ByteToMessageDecoder
將位元組解碼為訊息是一項常見的任務,Netty 它提供了一個 抽象基類 ByteToMessageDecoder,這個類會對入站資料進行緩沖,直到它準備好處理

下面舉一個如何使用這個類的示例,假設你接收了一個包含簡單 int 的位元組流,每個 int 都需要被單獨處理,在這種情況下,你需要從入站 ByteBuf 中讀取每個 int,并將它傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler,為了解碼這個位元組流,你要擴展 ByteToMessageDecoder 類(需要注意的是,原子型別的 int 在被添加到 List 中時,會被自動裝箱為 Integer)
// 擴展 ByteToMessageDecoder,以將位元組解碼為特定的格式
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//檢查是否至少有 4 位元組可讀(1 個int的位元組長度)
if (in.readableBytes() >= 4) {
//從入站 ByteBuf 中讀取一個 int,并將其添加到解碼訊息的 List 中
out.add(in.readInt());
}
}
}
雖然 ByteToMessageDecoder 使得可以很簡單地實作這種模式,但是你可能會發現,在呼叫 readInt()方法前不得不驗證所輸入的 ByteBuf 是否具有足夠的資料有點繁瑣,下面說的 ReplayingDecoder,它是一個特殊的解碼器,以少量的開銷消除了這個步驟
1.2 抽象類 ReplayingDecoder
ReplayingDecoder 擴展了 ByteToMessageDecoder 類,使得我們不必呼叫 readableBytes() 方法,它通過使用一個自定義的 ByteBuf 實作,ReplayingDecoderByteBuf,包裝傳入的 ByteBuf 實作了這一點,其將在內部執行該呼叫
這個類的完整宣告是:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
型別引數 S 指定了用于狀態管理的型別,其中 Void 代表不需要狀態管理,下述代碼展示了基于 ReplayingDecoder 重新實作的 ToIntegerDecoder
// 擴展ReplayingDecoder<Void> 以將位元組解碼為訊息
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
// 傳入的 ByteBuf 是 ReplayingDecoderByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 從入站 ByteBuf 中讀取一個 int,并將其添加到解碼訊息的 List 中
out.add(in.readInt());
}
}
和之前一樣,從 ByteBuf 中提取的int將會被添加到List中,如果沒有足夠的位元組可用,這 個 readInt() 方法的實作將會拋出一個 Error,其將在基類中被捕獲并處理,當有更多的資料可供讀取時,該 decode() 方法將會被再次呼叫
請注意 ReplayingDecoderByteBuf 的下面這些方面:
- 并不是所有的 ByteBuf 操作都被支持,如果呼叫了一個不被支持的方法,將會拋出一個 UnsupportedOperationException
- ReplayingDecoder 稍慢于 ByteToMessageDecoder
下面這些類用于處理更加復雜的用例:
- io.netty.handler.codec.LineBasedFrameDecoder —— 這個類在 Netty 內部也有使用,它使用了行尾控制字符(\n 或者 \r\n)來決議訊息資料
- io.netty.handler.codec.http.HttpObjectDecoder —— HTTP 資料解碼器
1.3 抽象類 MessageToMessageDecoder
在這一節,我們將解釋如何在兩個訊息格式之間進行轉換,例如,從一種 POJO 型別轉換為另一種
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
引數型別 I 指定了 decode() 方法的輸入引數 msg 的型別,它是你必須實作的唯一方法
我們將撰寫一個 IntegerToStringDecoder 解碼器來擴展 MessageToMessageDecoder,它的 decode() 方法會把 Integer 引數轉換為 String 表示,和之前一樣,解碼的 String 將被添加到傳出的 List 中,并轉發給下一個 ChannelInboundHandler
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
//將 Integer 訊息轉換為它的 String 表示,并將其添加到輸出的 List 中
out.add(String.valueOf(msg));
}
}
1.4 TooLongFrameException
由于 Netty 是一個異步框架,所以需要在位元組可以解碼之前在記憶體中緩沖它們,因此,不能讓解碼器緩沖大量的資料以至于耗盡可用的記憶體,為了解除這個常見的顧慮,Netty 提供了 TooLongFrameException 類,其將由解碼器在幀超出指定的大小限制時拋出
為了避免這種情況,你可以設定一個最大位元組數的閾值,如果超出該閾值,則會導致拋出一個 TooLongFrameException(隨后會被 ChannelHandler.exceptionCaught() 方法捕獲),然后,如何處理該例外則完全取決于該解碼器的用戶,某些協議(如 HTTP)可能允許你回傳一個特殊的回應,而在其他的情況下,唯一的選擇可能就是關閉對應的連接
下面的示例使用 TooLongFrameException 來通知 ChannelPipeline 中的其他 ChannelHandler 發生了幀大小溢位的,需要注意的是,如果你正在使用一個可變幀大小的協議,那么這種保護措施將是尤為重要的
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
public static final int MAX_FRAME_SIZE = 1024;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
// 檢查緩沖區是否有超過 MAX_FRAME_SIZE 個位元組
if (readable > MAX_FRAME_SIZE) {
// 跳過所有的可讀位元組,拋出 TooLongFrameException 并通知 ChannelHandler
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
//do something
}
}
2. 編碼器
編碼器實作了 ChannelOutboundHandler,并將出站資料從一種格式轉換為另一種格式,和我們方才學習的解碼器的功能正好相反,Netty 提供了一組類,用于幫助你撰寫具有以下功能的編碼器:
- 將訊息編碼為位元組
- 將訊息編碼為訊息
2.1 抽象類 MessageToByteEncoder
前面我們看到了如何使用 ByteToMessageDecoder 來將位元組轉換為訊息,現在我們使用 MessageToByteEncoder 來做逆向的事情

這個類只有一個方法,而解碼器有兩個,原因是解碼器通常需要在 Channel 關閉之后產生最后一個訊息(因此也就有了 decodeLast() 方法,顯然這不適用于編碼器的場景 —— 在連接被關閉之后仍然產生一個訊息是毫無意義的
下述代碼展示了 ShortToByteEncoder,其接受一個 Short 型別的實體作為訊息,將它編碼為Short的原子型別值,并將它寫入 ByteBuf 中,其將隨后被轉發給 ChannelPipeline 中的 下一個 ChannelOutboundHandler,每個傳出的 Short 值都將會占用 ByteBuf 中的 2 位元組,
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
// 將 Short 寫入 ByteBuf
out.writeShort(msg);
}
}
2.2 抽象類 MessageToMessageEncoder
MessageToMessageEncoder 類的 encode() 方法提供了將入站資料從一個訊息格式解碼為另一種
下述代碼使用 IntegerToStringEncoder 擴展了 MessageToMessageEncoder,編碼器將每個出站 Integer 的 String 表示添加到了該 List 中
public class IntegerToStringEncoder extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
out.add(String.valueOf(msg));
}
}
抽象的編解碼器類
雖然我們一直將解碼器和編碼器作為單獨的物體討論,但是你有時將會發現在同一個類中管理入站和出站資料和訊息的轉換是很有用的,Netty 的抽象編解碼器類正好用于這個目的,因為它們每個都將捆綁一個解碼器/編碼器對,以處理我們一直在學習的這兩種型別的操作,正如同你可能已經猜想到的,這些類同時實作了 ChannelInboundHandler 和 ChannelOutboundHandler 介面
為什么我們并沒有一直優先于單獨的解碼器和編碼器使用這些復合類呢?因為通過盡可能地將這兩種功能分開,最大化了代碼的可重用性和可擴展性,這是 Netty 設計的一個基本原則
1. 抽象類 ByteToMessageCodec
讓我們來研究這樣的一個場景:我們需要將位元組解碼為某種形式的訊息,可能是 POJO,隨后再次對它進行編碼,ByteToMessageCodec 將為我們處理好這一切,因為它結合了 ByteToMessageDecoder 以及它的逆向 —— MessageToByteEncoder
任何的請求/回應協議都可以作為使用 ByteToMessageCodec 的理想選擇,例如,在某個 SMTP 的實作中,編解碼器將讀取傳入位元組,并將它們解碼為一個自定義的訊息型別,如 SmtpRequest,而在接收端,當一個回應被創建時,將會產生一個 SmtpResponse,其將被編碼回位元組以便進行傳輸

2. 抽象類 MessageToMessageCodec
通過使用 MessageToMessageCodec,我們可以在一個單個的類中實作該轉換的往返程序,MessageToMessageCodec 是一個引數化的類,定義如下:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

decode() 方法是將 INBOUND_IN 型別的訊息轉換為 OUTBOUND_IN 型別的訊息,而 encode() 方法則進行它的逆向操作,將 INBOUND_IN 型別的訊息看作是通過網路發送的型別, 而將 OUTBOUND_IN 型別的訊息看作是應用程式所處理的型別,將可能有所裨益
WebSocket 協議
下面關于 MessageToMessageCodec 的示例參考了一個新出的 WebSocket 協議,這個協議能實作 Web 瀏覽器和服務器之間的全雙向通信
我們的 WebSocketConvertHandler 在引數化 MessageToMessageCodec 時將使用 INBOUND_IN 型別的 WebSocketFrame,以及 OUTBOUND_IN 型別的 MyWebSocketFrame,后者是 WebSocketConvertHandler 本身的一個靜態嵌套類
public class WebSocketConvertHandler
extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
// 實體化一個指定子型別的 WebSocketFrame
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
// 將 WebSocketFrame 解碼為 MyWebSocketFrame,并設定 FrameType
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf paload = msg.content().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload));
} else
if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload));
} else
if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload));
} else
if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload));
} else
if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload));
} else
if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
public static final class MyWebSocketFrame {
public enum FrameType {
BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION
}
private final FrameType type;
private final ByteBuf data;
public MyWebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = https://www.cnblogs.com/Yee-Q/archive/2021/06/26/data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
}
}
3. CombinedChannelDuplexHandler 類
正如我們前面所提到的,結合一個解碼器和編碼器可能會對可重用性造成影響,但是,有一 種方法既能夠避免這種懲罰,又不會犧牲將一個解碼器和一個編碼器作為一個單獨的單元部署所 帶來的便利性,CombinedChannelDuplexHandler 提供了這個解決方案,其宣告為:
public class CombinedChannelDuplexHandler
<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
這個類充當了 ChannelInboundHandler 和 ChannelOutboundHandler(該類的型別引數 I 和 O)的容器,通過提供分別繼承了解碼器類和編碼器類的型別,我們可以實作一個編解碼器,而又不必直接擴展抽象的編解碼器類
首先,讓我們研究下述代碼,該實作擴展了 ByteToMessageDecoder,因為它要從 ByteBuf 讀取字符
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
這里的 decode() 方法一次將從 ByteBuf 中提取 2 位元組,并將它們作為 char 寫入到 List 中,其將會被自動裝箱為 Character 物件
下述代碼將 Character 轉換回位元組,這個類擴展了 MessageToByteEncoder,因為它需要將 char 訊息編碼到 ByteBuf 中,這是通過直接寫入 ByteBuf 做到的
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
out.writeChar(msg);
}
}
既然我們有了解碼器和編碼器,我們可以結合它們來構建一個編解碼器
// 通過該解碼器和編碼器實作引數化CombinedByteCharCodec
public class CombinedChannelDuplexHandler extends
io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedChannelDuplexHandler() {
// 將委托實體傳遞給父類
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/288499.html
標籤:其他
