作者:京東物流 王奕龍
Netty是一個異步基于事件驅動的高性能網路通信框架,可以看做是對NIO和BIO的封裝,并提供了簡單易用的API、Handler和工具類等,用以快速開發高性能、高可靠性的網路服務端和客戶端程式,
1. 創建服務端
服務端啟動需要創建 ServerBootstrap 物件,并完成初始化執行緒模型,配置IO模型和添加業務處理邏輯(Handler) ,在添加業務處理邏輯時,呼叫的是 childHandler() 方法添加了一個ChannelInitializer,代碼示例如下
// 負責服務端的啟動
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下兩個物件可以看做是兩個執行緒組
// boss執行緒組負責監聽埠,接受新的連接
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker執行緒組負責讀取資料
NioEventLoopGroup worker = new NioEventLoopGroup();
// 配置執行緒組并指定NIO模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
// 定義后續每個 新連接 的讀寫業務邏輯
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 添加業務處理邏輯
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
});
}
});
// 系結埠號
serverBootstrap.bind(2002);
通過呼叫 .channel(NioServerSocketChannel.class) 方法指定 Channel 型別為NIO型別,如果要指定為BIO型別,引數改成 OioServerSocketChannel.class 即可,
其中 nioSocketChannel.pipeline() 用來獲取 PipeLine 物件,呼叫方法 addLast() 添加必要的業務處理邏輯,這里采用的是責任鏈模式,會將每個Handler作為一個節點進行處理,
1.1 創建客戶端
客戶端與服務端啟動類似,不同的是,客戶端需要創建 Bootstrap 物件來啟動,并指定一個客戶端執行緒組,相同的是都需要完成初始化執行緒模型,配置IO模型和添加業務處理邏輯(Handler) , 代碼示例如下
// 負責客戶端的啟動
Bootstrap bootstrap = new Bootstrap();
// 客戶端的執行緒模型
NioEventLoopGroup group = new NioEventLoopGroup();
// 指定執行緒組和NIO模型
bootstrap.group(group).channel(NioSocketChannel.class)
// handler() 方法封裝業務處理邏輯
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
// 添加業務處理邏輯
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
});
}
});
// 連接服務端IP和埠
bootstrap.connect("127.0.0.1", 2002);
(注意:下文中內容均以服務端代碼示例為準)
2. 編碼和解碼
客戶端與服務端進行通信,通信的訊息是以二進制位元組流的形式通過 Channel 進行傳遞的,所以當我們在客戶端封裝好Java業務物件后,需要將其按照協議轉換成位元組陣列,并且當服務端接受到該二進制位元組流時,需要將其根據協議再次解碼成Java業務物件進行邏輯處理,這就是編碼和解碼的程序,Netty 為我們提供了MessageToByteEncoder 用于編碼,ByteToMessageDecoder 用于解碼,
2.1 MessageToByteEncoder
用于將Java物件編碼成位元組陣列并寫入 ByteBuf,代碼示例如下
public class TcpEncoder extends MessageToByteEncoder<Message> {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpEncoder(Serializer serializer) {
this.serializer = serializer;
}
/**
* 編碼的執行邏輯
*
* @param message 需要被編碼的訊息物件
* @param byteBuf 將位元組陣列寫入ByteBuf
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 通過自定義的序列化器將物件轉換成位元組陣列
byte[] bytes = serializer.serialize(message);
// 將位元組陣列寫入 ByteBuf 便完成了物件的編碼流程
byteBuf.writeBytes(bytes);
}
}
2.2 ByteToMessageDecoder
它用于將接收到的二進制資料流解碼成Java物件,與上述代碼類似,只不過是將該程序反過來了而已,代碼示例如下
public class TcpDecoder extends ByteToMessageDecoder {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpDecoder(Serializer serializer) {
this.serializer = serializer;
}
/**
* 解碼的執行邏輯
*
* @param byteBuf 接收到的ByteBuf物件
* @param list 任何完成解碼的Java物件添加到該List中即可
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
// 根據協議自定義的解碼邏輯將其解碼成Java物件
Message message = serializer.deSerialize(byteBuf);
// 解碼完成后添加到List中即可
list.add(message);
}
}
2.3 注意要點
ByteBuf默認情況下使用的是堆外記憶體,不進行記憶體釋放會發生記憶體溢位,不過 ByteToMessageDecoder 和 MessageToByteEncoder 這兩個解碼和編碼Handler 會自動幫我們完成記憶體釋放的操作,無需再次手動釋放,因為我們實作的 encode() 和 decode() 方法只是這兩個 Handler 原始碼中執行的一個環節,最侄訓在 finally 代碼塊中完成對記憶體的釋放,具體內容可閱讀 MessageToByteEncoder 中第99行 write() 方法原始碼,
2.4 在服務端中添加編碼解碼Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 接收到請求時進行解碼
.addLast(new TcpDecoder(serializer))
// 發送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
3. 添加業務處理Handler
在Netty框架中,客戶端與服務端的每個連接都對應著一個 Channel,而這個 Channel 的所有處理邏輯都封裝在一個叫作ChannelPipeline 的物件里,ChannelPipeline 是一個雙向鏈表,它使用的是責任鏈模式,每個鏈表節點都是一個 Handler,能通它能獲取 Channel 相關的背景關系資訊(ChannelHandlerContext),
Netty為我們提供了多種讀取 Channel 中資料的 Handler,其中比較常用的是 ChannelInboundHandlerAdapter 和SimpleChannelInboundHandler,下文中我們以讀取心跳訊息為例,
3.1 ChannelInboundHandlerAdapter
如下為處理心跳業務邏輯的 Handler,具體執行邏輯參考代碼和注釋即可
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
* channel中有資料可讀時,會回呼該方法
*
* @param msg 如果在該Handler前沒有解碼Handler節點處理,該物件型別為ByteBuf;否則為解碼后的Java物件
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message) msg;
// 處理心跳訊息
processHeartBeatMessage(message);
// 初始化Ack訊息
Message ackMessage = initialAckMessage();
// 回寫給客戶端
ctx.channel().writeAndFlush(ackMessage);
}
}
3.2 SimpleChannelInboundHandler
SimpleChannelInboundHandler 是ChannelInboundHandlerAdapter 的實作類,SimpleChannelInboundHandler 能夠指定泛型,這樣在處理業務邏輯時,便無需再添加上文代碼中物件強轉的邏輯,這部分代碼實作是在 SimpleChannelInboundHandler 的 channelRead() 方法中完成的,它是一個模版方法,我們僅僅需要實作 channelRead0() 方法即可,代碼示例如下
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {
/**
* @param msg 注意這里的物件型別即為 Message
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 處理心跳訊息
processHeartBeatMessage(message);
// 初始化Ack訊息
Message ackMessage = initialAckMessage();
// 回寫給客戶端
ctx.channel().writeAndFlush(ackMessage);
}
}
3.3 在服務端中添加心跳處理Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 接收到進行解碼
.addLast(new TcpDecoder(serializer))
// 心跳業務處理Handler
.addLast(new HeartBeatHandler())
// 發送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
4. ChannelHandler的生命周期
在 ChannelInboundHandlerAdapter 可以通過實作不同的方法來完成指定時機的方法回呼,具體可參考如下代碼
public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
/**
* 當檢測到新連接之后,呼叫 ch.pipeline().addLast(...); 之后的回呼
* 表示當前channel中成功添加了 Handler
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("邏輯處理器被添加時回呼:handlerAdded()");
super.handlerAdded(ctx);
}
/**
* 表示當前channel的所有邏輯處理已經和某個NIO執行緒建立了系結關系
* 這里的NIO執行緒通常指的是 NioEventLoop
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 系結到執行緒(NioEventLoop)時回呼:channelRegistered()");
super.channelRegistered(ctx);
}
/**
* 當Channel的所有業務邏輯鏈準備完畢,連接被激活時
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 準備就緒時回呼:channelActive()");
super.channelActive(ctx);
}
/**
* 客戶端向服務端發送資料,表示有資料可讀時,就會回呼該方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channel 有資料可讀時回呼:channelRead()");
super.channelRead(ctx, msg);
}
/**
* 服務端每完整的讀完一次資料,都會回呼該方法
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 某次資料讀完時回呼:channelReadComplete()");
super.channelReadComplete(ctx);
}
// ---斷開連接時---
/**
* 該客戶端與服務端的連接被關閉時回呼
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 被關閉時回呼:channelInactive()");
super.channelInactive(ctx);
}
/**
* 對應的NIO執行緒移除了對這個連接的處理
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 取消執行緒(NioEventLoop) 的系結時回呼: channelUnregistered()");
super.channelUnregistered(ctx);
}
/**
* 為該連接添加的所有業務邏輯Handler被移除時
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("邏輯處理器被移除時回呼:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
5. 解決粘包和半包問題
即使我們發送訊息的時候是以 ByteBuf 的形式發送的,但是到了底層作業系統,仍然是以位元組流的形式對資料進行發送的,而且服務端也以位元組流的形式讀取,因此在服務端對位元組流進行拼接時,可能就會造成發送時 ByteBuf 與讀取時的 ByteBuf 不對等的情況,這就是所謂的粘包或半包現象,
以如下情況為例,當客戶端頻繁的向服務端發送心跳訊息時,讀取到的ByteBuf資訊如下,其中一個心跳請求是用紅框圈出的部分
可以發現多個心跳請求"粘"在了一起,那么我們需要對它進行拆包處理,否則只會讀取第一條心跳請求,之后的請求會全部失效
Netty 為我們提供了基于長度的拆包器LengthFieldBasedFrameDecoder 來進行拆包作業,它能對超過所需資料量的包進行拆分,也能在資料不足的時候等待讀取,直到資料足夠時,構成一個完整的資料包并進行業務處理,
5.1 LengthFieldBasedFrameDecoder
以標準介面檔案中的協議(圖示)為準,代碼示例如下,其中的四個引數比較重要,詳細資訊可見注釋描述
public class SplitHandler extends LengthFieldBasedFrameDecoder {
/**
* 在協議中表示資料長度的欄位在位元組流首尾中的偏移量
*/
private static final Integer LENGTH_FIELD_OFFSET = 10;
/**
* 表示資料長度的位元組長度
*/
private static final Integer LENGTH_FIELD_LENGTH = 4;
/**
* 資料長度后邊的頭資訊中的位元組偏移量
*/
private static final Integer LENGTH_ADJUSTMENT = 10;
/**
* 表示從第一個位元組開始需要舍去的位元組數,在我們的協議中,不需要進行舍去
*/
private static final Integer INITIAL_BYTES_TO_STRIP = 0;
public SplitHandler() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
}
}
之后將其添加到Handler中即可,如果遇到其他協議,更改其中引數或查看 LengthFieldBasedFrameDecoder 的JavaDoc中詳細描述,
5.2 在服務端中添加拆包Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 接收到進行解碼
.addLast(new TcpDecoder(serializer))
// 心跳業務處理Handler
.addLast(new HeartBeatHandler())
// 發送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
6. Netty性能優化
6.1 Handler對單例模式的應用
Netty 在每次有新連接到來的時候,都會呼叫 ChannelInitializer 的 initChannel() 方法,會將其中相關的 Handler 都創建一次,
如果其中的 Handler 是無狀態且能夠通用的,可以將其改成單例,這樣就能夠在每次連接建立時,避免多次創建相同的物件,
以如下服務端代碼為例,包含如下Handler,可以將編碼解碼、以及業務處理Handler都定義成Spring單例bean的形式注入進來,這樣就能夠完成物件的復用而無需每次建立連接都創建相同的物件了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 日志Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(new TcpDecoder(serializer))
// 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(new HeartBeatHandler(), new ChuteStatusHandler())
.addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler())
.addLast(new ScanReceiveHandler(), new SortResultHandler())
// 編碼Handler
.addLast(new TcpEncoder(serializer));
}
});
改造完成后如下
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 日志Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(tcpDecoder)
// 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(heartBeatHandler, chuteStatusHandler)
.addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler)
.addLast(scanReceiveHandler, sortResultHandler)
// 編碼Handler
.addLast(tcpEncoder);
}
});
不過需要注意在每個單例Handler的類上標注 @ChannelHandler.Sharable 注解,否則會拋出如下例外
io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times
另外,SplitHanlder 不能進行單例處理,因為它的內部實作與每個 Channel 都有關,每個 SplitHandler 都需要維持每個Channel 讀到的資料,即它是有狀態的,
6.2 縮短責任鏈呼叫
對服務端來說,每次解碼出來的Java物件在多個業務處理 Handler 中只會經過一個其中 Handler 完成業務處理,那么我們將所有業務相關的 Handler封裝起來到一個Map中,每次只讓它經過必要的Handler而不是經過整個責任鏈,那么便可以提高Netty處理請求的性能,
定義如下 ServerHandlers 單例bean,并使用 策略模式 將對應的 Handler 管理起來,每次處理時根據訊息型別獲取對應的 Handler 來完成業務邏輯
@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {
@Resourse
private HeartBeatHandler heartBeatHandler;
/**
* 策略模式封裝Handler,這樣就能在回呼 ServerHandler 的 channelRead0 方法時
* 找到具體的Handler,而不需要經過責任鏈的每個 Handler 節點,以此來提高性能
*/
private final Map<Command, SimpleChannelInboundHandler<Message>> map;
public ServerHandler() {
map = new HashMap();
// key: 訊息型別列舉 value: 對應的Handler
map.put(MessageType.HEART_BEAT, heartBeatHandler);
// ...
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 呼叫 channelRead() 方法完成業務邏輯處理
map.get(msg.getMessageType()).channelRead(ctx, msg);
}
}
改造完成后,服務端代碼如下,因為我們封裝了平行的業務處理Handler,所以代碼很清爽
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 日志Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(tcpDecoder)
// serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers)
// 編碼Handler
.addLast(tcpEncoder);
}
});
6.3 合并編碼、解碼Handler
Netty 對編碼解碼提供了統一處理Handler是MessageToMessageCodec,這樣我們就能將編碼和解碼的Handler合并成一個添加介面,代碼示例如下
@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {
/**
* 序列化器
*/
@Resourse
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
// 將位元組陣列寫入 ByteBuf
ByteBuf byteBuf = ctx.alloc().ioBuffer();
serializer.serialize(byteBuf, msg);
// 這個編碼也需要添加到List中
out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
// 根據協議自定義的解碼邏輯將其解碼成Java物件,并添加到List中
out.add(serializer.deSerialize(msg));
}
}
改造完成后,服務端代碼如下,將其放在業務處理Handler前即可,呼叫完業務Handler邏輯,會執行編碼邏輯
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 日志Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼、編碼Handler
.addLast(messageCodecHandler)
// serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers);
}
});
6.4 減少NIO執行緒阻塞
對于耗時的業務操作,需要將它們都丟到業務執行緒池中去處理,因為單個NIO執行緒會管理很多 Channel ,只要有一個 Channel 中的 Handler 的 channelRead() 方法被業務邏輯阻塞,那么它就會拖慢系結在該NIO執行緒上的其他所有 Channel,
為了避免上述情況,可以在包含長時間業務處理邏輯的Handler中創建一個執行緒池,并將其丟入執行緒池中進行執行,偽代碼如下
protected void channelRead(ChannelHandlerContext ctx, Object message) {
threadPool.submit(new Runnable() {
// 耗時的業務處理邏輯
doSomethingSependTooMuchTime();
writeAndFlush();
});
}
6.5 空閑"假死"檢測Handler
如果底層的TCP連接已經斷開,但是另一端服務并沒有捕獲到,在某一端(客戶端或服務端)看來會認為這條連接仍然存在,這就是連接"假死"現象,這造成的問題就是,對于服務端來說,每個連接連接都會耗費CPU和記憶體資源,過多的假死連接會造成性能下降和服務崩潰;對客戶端來說,
連接假死會使得發往服務端的請求都會超時,所以需要盡可能避免假死現象的發生,
造成假死的原因可能是公網丟包、客戶端或服務端網路故障等,Netty為我們提供了 IdleStateHandler 來解決超時假死問題,示例代碼如下
public class MyIdleStateHandler extends IdleStateHandler {
private static final int READER_IDLE_TIME = 15;
public MyIdleStateHandler() {
// 讀超時時間、寫超時時間、讀寫超時時間 指定0值不判斷超時
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒內沒有讀到資料,關閉連接");
ctx.channel().close();
}
}
其構造方法中有三個引數來分別指定讀、寫和讀寫超時時間,當指定0時不判斷超時,除此之外Netty也有專門用來處理讀和寫超時的Handler,分別為 ReadTimeoutHandler, WriteTimeoutHandler,
將其添加到服務端 Handler 的首位即可
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 超時判斷Handler
.addLast(new MyIdleStateHandler())
// 拆包Handler
.addLast(new SplitHandler())
// 日志Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼、編碼Handler
.addLast(messageCodecHandler)
// serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers);
}
});
7. ChannelPipeline
ChannelPipeline 與 Channel 密切相關,它可以看做是一條流水線,資料以位元組流的形式進來,經過不同 Handler 的"加工處理",
最終以位元組流的形式輸出,ChannelPipeline 在每條新連接建立的時候被創建,是一條雙向鏈表,其中每一個節點都是ChannelHadnlerContext 物件,能夠通過它拿到相關的背景關系資訊,默認它有頭節點 HeadContext 和尾結點 TailContext,
7.1 InboundHandler 和 OutboundHandler
定義在 ChannelPipeline 中的 Handler 是可插拔的,能夠完成動態編織,呼叫 ctx.pipeline().remove() 方法可移除,呼叫 ctx.pipeline().addXxx() 方法可進行添加,
InboundHandler 與 OutboundHandler 處理的事件不同,前者處理 Inbound事件,典型的就是讀取資料流并加工處理;后者會對呼叫 writeAndFlush() 方法的 Outbound事件 進行處理,
此外,兩者的傳播機制也是不同的:
InboundHandler 會從鏈表頭逐個向下呼叫,頭節點只是簡單的將該事件傳播下去(ctx.fireChannelRead(mug)),執行程序中呼叫findContextInbound() 方法來尋找 InboundHandler 節點,直到 TailContext 節點執行方法完畢,結束呼叫,
一般自定義的 ChannelInboundHandler 都繼承自ChannelInboundHandlerAdapter, 如果沒有覆寫channelXxx() 相關方法,那么該事件正常會遍歷雙向鏈表一直傳播到尾結點,否則就會在當前節點執行完結束;當然也可以呼叫 fireXxx() 方法讓事件從當前節點繼續向下傳播,
OutboundHandler 是從鏈表尾向鏈表頭呼叫,相當于反向遍歷 ChannelPipeline 雙向鏈表,Outbound事件 會先經過TailContext 尾節點,并在執行程序中不斷尋找OutboundHandler 節點加工處理,直到頭節點 HeadContext 呼叫 Unsafe.write() 方法結束,
7.2 例外傳播
例外的傳播機制和 Inbound事件 的傳播機制類似,在任何節點發生的例外都會向下一個節點傳遞,如果自定義的 Handler 沒有處理例外也沒有實作 exceptionCaught() 方法,最終則會落到 TailContext 節點,控制臺列印例外未處理的警告資訊,
通常例外處理,我們會定義一個例外處理器,繼承自ChannelDuplexHandler ,放在自定義鏈表節點的末尾,這樣就能夠一定捕獲和處理例外,
8. Reactor執行緒模型
8.1 NioEventLoopGroup
創建 new NioEventLoopGroup() 它的默認執行緒數是當前CPU執行緒數的2倍,最侄訓呼叫到如下原始碼
// 這里計算的執行緒數量
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
跟進到構造方法的最終實作,會執行如下業務邏輯
其中在第2步創建 NioEventLoop 時,值得關注的是創建了一個 Selector,以此來實作IO多路復用;另外它還創建了高性能 MPSC(多生產者單消費者)佇列,借助它來協調任務的異步執行,如此單條執行緒(NioEventLoop)、Selector和MPSC它們三者是一對一的關系,而每條連接都對應一個 Channel,每個 Channel 都系結唯一一個 NioEventLoop,因此單個連接的所有操作都是在一個執行緒中執行,是執行緒安全的,
第3步驟創建執行緒選擇器,它的作用是為連接在NioEventLoopGroup 中選擇一個 NioEventLoop,并將該連接與 NioEventLoop 中的 Selector 完成系結,
在底層有兩種選擇器的實作,分別是PowerOfTowEventExecutorChooser 和GenericEventExecutorChooser,它們的原理都是從執行緒池里回圈選擇執行緒,不同的是前者計算回圈的索引采用的是位運算而后者采用的是取余運算,
8.2 Reactor執行緒 select 操作
原始碼位置 NioEventLoop 的 run() 方法, select 操作會不斷輪詢是否有IO事件發生,并且在輪詢程序中不斷檢查是否有任務需要執行,保證Netty任務佇列中的任務能夠及時執行,輪詢程序使用一個計數器避開了 JDK 的空輪詢Bug
8.3 處理產生IO事件的Channel
在 Netty 的 Channel 中,有兩大型別的 Channel,一個是 NioServerSocketChannel,由 boss NioEventLoop 處理;另一個是 NioSocketChannel,由worker NioEventLoop 處理,所以
- 對于 boss NioEventLoop 來說,輪詢到的是連接事件,后續通過 NioServerSocketChannel 的 Pipeline 將連接交給一個 work NioEventLoop 處理
- 對于 work NioEventLoop 來說,輪詢到的是讀寫事件,后續通過 NioSocketChannel 的 Pipeline 將讀取到的資料傳遞給每個ChannelHandler 處理
注意任務的執行都是異步的,
8.4 任務的收集和執行
上文中提到了我們創建了高性能的MPSC佇列,它是用來聚集非Reactor執行緒創建的任務的,NioEventLoop 會在執行的程序中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之后再處理非Reactor執行緒創建的任務,在檢測是否有事件發生的時候,為了保證異步任務的及時處理,只要有任務要處理,就會停止任務檢測,去處理任務,處理任務時是Reactor單執行緒執行,
8.5 注冊連接的流程
當 boss Reactor執行緒檢測到 ACCEPT 事件之后,創建一個 NioSocketChannel,并把用戶設定的 ChannelOption(Option引數配置)、ChannelAttr(Channel 引數)、ChannelHandler(ChannelInitializer)封裝到 NioSocketChannel 中,接著,使用執行緒選擇器在NioEventLoopGroup 中選擇一條 NioEventLoop (執行緒),把 NioSocketChannel 中包裝的JDK Channel 當做Key,自身(NioSocketChannel)作為 attachment,注冊 NioEventLoop 對應的 Selector上,這樣,后續有讀寫事件發生,就可以直接獲取 attachment 來處理讀寫資料的邏輯,
8.6 如何理解IO多路復用
簡單地說:IO多路復用是指可以在一個執行緒內處理多個連接的IO事件請求,以Java中的IO多路復用為例,服務端創建 Selector 物件不斷的呼叫 select() 方法來處理各個連接上的IO事件,之后將這些IO事件交給任務執行緒異步去執行,這就達到了在一個執行緒內同時處理多個連接的IO請求事件的目的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/551690.html
標籤:其他
下一篇:返回列表
