主頁 >  其他 > Netty服務端開發及性能優化

Netty服務端開發及性能優化

2023-05-05 09:53:11 其他

作者:京東物流 王奕龍

Netty是一個異步基于事件驅動高性能網路通信框架,可以看做是對NIO和BIO的封裝,并提供了簡單易用的API、Handler和工具類等,用以快速開發高性能、高可靠性的網路服務端和客戶端程式,

1. 創建服務端

服務端啟動需要創建 ServerBootstrap 物件,并完成初始化執行緒模型配置IO模型添加業務處理邏輯(Handler) ,在添加業務處理邏輯時,呼叫的是 childHandler() 方法添加了一個ChannelInitializer,代碼示例如下

// 負責服務端的啟動
ServerBootstrap serverBootstrap = new ServerBootstrap();

// 以下兩個物件可以看做是兩個執行緒組
// boss執行緒組負責監聽埠,接受新的連接
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker執行緒組負責讀取資料
NioEventLoopGroup worker = new NioEventLoopGroup();

// 配置執行緒組并指定NIO模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        // 定義后續每個 新連接 的讀寫業務邏輯
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 添加業務處理邏輯
                        .addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
            }
        });

// 系結埠號
serverBootstrap.bind(2002);

通過呼叫 .channel(NioServerSocketChannel.class) 方法指定 Channel 型別為NIO型別,如果要指定為BIO型別,引數改成 OioServerSocketChannel.class 即可,

其中 nioSocketChannel.pipeline() 用來獲取 PipeLine 物件,呼叫方法 addLast() 添加必要的業務處理邏輯,這里采用的是責任鏈模式,會將每個Handler作為一個節點進行處理,

1.1 創建客戶端

客戶端與服務端啟動類似,不同的是,客戶端需要創建 Bootstrap 物件來啟動,并指定一個客戶端執行緒組,相同的是都需要完成初始化執行緒模型配置IO模型添加業務處理邏輯(Handler) , 代碼示例如下

// 負責客戶端的啟動
Bootstrap bootstrap = new Bootstrap();
// 客戶端的執行緒模型
NioEventLoopGroup group = new NioEventLoopGroup();

// 指定執行緒組和NIO模型
bootstrap.group(group).channel(NioSocketChannel.class)
        // handler() 方法封裝業務處理邏輯
        .handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline()
                    // 添加業務處理邏輯
                    .addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                System.out.println(msg);
                            }
                    });
            }
        });

// 連接服務端IP和埠
bootstrap.connect("127.0.0.1", 2002);

(注意:下文中內容均以服務端代碼示例為準)

2. 編碼和解碼

客戶端與服務端進行通信,通信的訊息是以二進制位元組流的形式通過 Channel 進行傳遞的,所以當我們在客戶端封裝好Java業務物件后,需要將其按照協議轉換成位元組陣列,并且當服務端接受到該二進制位元組流時,需要將其根據協議再次解碼成Java業務物件進行邏輯處理,這就是編碼和解碼的程序,Netty 為我們提供了MessageToByteEncoder 用于編碼,ByteToMessageDecoder 用于解碼,

2.1 MessageToByteEncoder

用于將Java物件編碼成位元組陣列并寫入 ByteBuf,代碼示例如下

public class TcpEncoder extends MessageToByteEncoder<Message> {

    /**
     * 序列化器
     */
    private final Serializer serializer;

    public TcpEncoder(Serializer serializer) {
        this.serializer = serializer;
    }

    /**
     * 編碼的執行邏輯
     *
     * @param message 需要被編碼的訊息物件
     * @param byteBuf 將位元組陣列寫入ByteBuf
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        // 通過自定義的序列化器將物件轉換成位元組陣列
        byte[] bytes = serializer.serialize(message);
        // 將位元組陣列寫入 ByteBuf 便完成了物件的編碼流程
        byteBuf.writeBytes(bytes);
    }
}

2.2 ByteToMessageDecoder

它用于將接收到的二進制資料流解碼成Java物件,與上述代碼類似,只不過是將該程序反過來了而已,代碼示例如下

public class TcpDecoder extends ByteToMessageDecoder {
    /**
     * 序列化器
     */
    private final Serializer serializer;

    public TcpDecoder(Serializer serializer) {
        this.serializer = serializer;
    }

    /**
     * 解碼的執行邏輯
     *
     * @param byteBuf 接收到的ByteBuf物件
     * @param list    任何完成解碼的Java物件添加到該List中即可
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 根據協議自定義的解碼邏輯將其解碼成Java物件
        Message message = serializer.deSerialize(byteBuf);
        // 解碼完成后添加到List中即可
        list.add(message);
    }
}

2.3 注意要點

ByteBuf默認情況下使用的是堆外記憶體,不進行記憶體釋放會發生記憶體溢位,不過 ByteToMessageDecoder 和 MessageToByteEncoder 這兩個解碼和編碼Handler 會自動幫我們完成記憶體釋放的操作,無需再次手動釋放,因為我們實作的 encode() 和 decode() 方法只是這兩個 Handler 原始碼中執行的一個環節,最侄訓在 finally 代碼塊中完成對記憶體的釋放,具體內容可閱讀 MessageToByteEncoder 中第99行 write() 方法原始碼,

2.4 在服務端中添加編碼解碼Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 接收到請求時進行解碼
                        .addLast(new TcpDecoder(serializer))
                        // 發送請求時進行編碼
                        .addLast(new TcpEncoder(serializer));
            }
        });

3. 添加業務處理Handler

在Netty框架中,客戶端與服務端的每個連接都對應著一個 Channel,而這個 Channel 的所有處理邏輯都封裝在一個叫作ChannelPipeline 的物件里,ChannelPipeline 是一個雙向鏈表,它使用的是責任鏈模式,每個鏈表節點都是一個 Handler,能通它能獲取 Channel 相關的背景關系資訊(ChannelHandlerContext),
Netty為我們提供了多種讀取 Channel 中資料的 Handler,其中比較常用的是 ChannelInboundHandlerAdapter 和SimpleChannelInboundHandler,下文中我們以讀取心跳訊息為例,

3.1 ChannelInboundHandlerAdapter

如下為處理心跳業務邏輯的 Handler,具體執行邏輯參考代碼和注釋即可

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    /**
     * channel中有資料可讀時,會回呼該方法
     *
     * @param msg 如果在該Handler前沒有解碼Handler節點處理,該物件型別為ByteBuf;否則為解碼后的Java物件
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message = (Message) msg;
        // 處理心跳訊息
        processHeartBeatMessage(message);

        // 初始化Ack訊息
        Message ackMessage = initialAckMessage();
        // 回寫給客戶端
        ctx.channel().writeAndFlush(ackMessage);
    }
}

3.2 SimpleChannelInboundHandler

SimpleChannelInboundHandler 是ChannelInboundHandlerAdapter 的實作類,SimpleChannelInboundHandler 能夠指定泛型,這樣在處理業務邏輯時,便無需再添加上文代碼中物件強轉的邏輯,這部分代碼實作是在 SimpleChannelInboundHandler 的 channelRead() 方法中完成的,它是一個模版方法,我們僅僅需要實作 channelRead0() 方法即可,代碼示例如下

public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * @param msg 注意這里的物件型別即為 Message
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 處理心跳訊息
        processHeartBeatMessage(message);

        // 初始化Ack訊息
        Message ackMessage = initialAckMessage();
        // 回寫給客戶端
        ctx.channel().writeAndFlush(ackMessage);
    }
}

3.3 在服務端中添加心跳處理Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        // 接收到進行解碼
                        .addLast(new TcpDecoder(serializer))
                        // 心跳業務處理Handler
                        .addLast(new HeartBeatHandler())
                        // 發送請求時進行編碼
                        .addLast(new TcpEncoder(serializer));
            }
        });

4. ChannelHandler的生命周期

在 ChannelInboundHandlerAdapter 可以通過實作不同的方法來完成指定時機的方法回呼,具體可參考如下代碼

public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
    /**
     * 當檢測到新連接之后,呼叫 ch.pipeline().addLast(...); 之后的回呼
     * 表示當前channel中成功添加了 Handler
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被添加時回呼:handlerAdded()");
        super.handlerAdded(ctx);
    }

    /**
     * 表示當前channel的所有邏輯處理已經和某個NIO執行緒建立了系結關系
     * 這里的NIO執行緒通常指的是 NioEventLoop
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 系結到執行緒(NioEventLoop)時回呼:channelRegistered()");
        super.channelRegistered(ctx);
    }

    /**
     * 當Channel的所有業務邏輯鏈準備完畢,連接被激活時
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 準備就緒時回呼:channelActive()");
        super.channelActive(ctx);
    }

    /**
     * 客戶端向服務端發送資料,表示有資料可讀時,就會回呼該方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channel 有資料可讀時回呼:channelRead()");
        super.channelRead(ctx, msg);
    }

    /**
     * 服務端每完整的讀完一次資料,都會回呼該方法
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 某次資料讀完時回呼:channelReadComplete()");
        super.channelReadComplete(ctx);
    }

    // ---斷開連接時---

    /**
     * 該客戶端與服務端的連接被關閉時回呼
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 被關閉時回呼:channelInactive()");
        super.channelInactive(ctx);
    }

    /**
     * 對應的NIO執行緒移除了對這個連接的處理
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 取消執行緒(NioEventLoop) 的系結時回呼: channelUnregistered()");
        super.channelUnregistered(ctx);
    }

    /**
     * 為該連接添加的所有業務邏輯Handler被移除時
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被移除時回呼:handlerRemoved()");
        super.handlerRemoved(ctx);
    }
}

5. 解決粘包和半包問題

即使我們發送訊息的時候是以 ByteBuf 的形式發送的,但是到了底層作業系統,仍然是以位元組流的形式對資料進行發送的,而且服務端也以位元組流的形式讀取,因此在服務端對位元組流進行拼接時,可能就會造成發送時 ByteBuf 與讀取時的 ByteBuf 不對等的情況,這就是所謂的粘包或半包現象,

以如下情況為例,當客戶端頻繁的向服務端發送心跳訊息時,讀取到的ByteBuf資訊如下,其中一個心跳請求是用紅框圈出的部分

粘包日志.png

可以發現多個心跳請求"粘"在了一起,那么我們需要對它進行拆包處理,否則只會讀取第一條心跳請求,之后的請求會全部失效

Netty 為我們提供了基于長度的拆包器LengthFieldBasedFrameDecoder 來進行拆包作業,它能對超過所需資料量的包進行拆分,也能在資料不足的時候等待讀取,直到資料足夠時,構成一個完整的資料包并進行業務處理,

5.1 LengthFieldBasedFrameDecoder

以標準介面檔案中的協議(圖示)為準,代碼示例如下,其中的四個引數比較重要,詳細資訊可見注釋描述

標準協議.png

public class SplitHandler extends LengthFieldBasedFrameDecoder {

    /**
     * 在協議中表示資料長度的欄位在位元組流首尾中的偏移量
     */
    private static final Integer LENGTH_FIELD_OFFSET = 10;

    /**
     * 表示資料長度的位元組長度
     */
    private static final Integer LENGTH_FIELD_LENGTH = 4;

    /**
     * 資料長度后邊的頭資訊中的位元組偏移量
     */
    private static final Integer LENGTH_ADJUSTMENT = 10;

    /**
     * 表示從第一個位元組開始需要舍去的位元組數,在我們的協議中,不需要進行舍去
     */
    private static final Integer INITIAL_BYTES_TO_STRIP = 0;

    public SplitHandler() {
        super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
    }
}

之后將其添加到Handler中即可,如果遇到其他協議,更改其中引數或查看 LengthFieldBasedFrameDecoder 的JavaDoc中詳細描述,

5.2 在服務端中添加拆包Handler

serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                    // 拆包Handler
                    .addLast(new SplitHandler())
                    // 接收到進行解碼
                    .addLast(new TcpDecoder(serializer))
                    // 心跳業務處理Handler
                    .addLast(new HeartBeatHandler())
                    // 發送請求時進行編碼
                    .addLast(new TcpEncoder(serializer));
            }
        });

6. Netty性能優化

6.1 Handler對單例模式的應用

Netty 在每次有新連接到來的時候,都會呼叫 ChannelInitializer 的 initChannel() 方法,會將其中相關的 Handler 都創建一次,
如果其中的 Handler 是無狀態且能夠通用的,可以將其改成單例,這樣就能夠在每次連接建立時,避免多次創建相同的物件,

以如下服務端代碼為例,包含如下Handler,可以將編碼解碼、以及業務處理Handler都定義成Spring單例bean的形式注入進來,這樣就能夠完成物件的復用而無需每次建立連接都創建相同的物件了

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日志Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(new TcpDecoder(serializer))
                        // 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(new HeartBeatHandler(), new ChuteStatusHandler())
                        .addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler())
                        .addLast(new ScanReceiveHandler(), new SortResultHandler())
                        // 編碼Handler
                        .addLast(new TcpEncoder(serializer));
            }
        });

改造完成后如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日志Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(tcpDecoder)
                        // 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(heartBeatHandler, chuteStatusHandler)
                        .addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler)
                        .addLast(scanReceiveHandler, sortResultHandler)
                        // 編碼Handler
                        .addLast(tcpEncoder);
            }
        });

不過需要注意在每個單例Handler的類上標注 @ChannelHandler.Sharable 注解,否則會拋出如下例外

io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times

另外,SplitHanlder 不能進行單例處理,因為它的內部實作與每個 Channel 都有關,每個 SplitHandler 都需要維持每個Channel 讀到的資料,即它是有狀態的

6.2 縮短責任鏈呼叫

對服務端來說,每次解碼出來的Java物件在多個業務處理 Handler 中只會經過一個其中 Handler 完成業務處理,那么我們將所有業務相關的 Handler封裝起來到一個Map中,每次只讓它經過必要的Handler而不是經過整個責任鏈,那么便可以提高Netty處理請求的性能,

定義如下 ServerHandlers 單例bean,并使用 策略模式 將對應的 Handler 管理起來,每次處理時根據訊息型別獲取對應的 Handler 來完成業務邏輯

@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {

    @Resourse
    private HeartBeatHandler heartBeatHandler;

    /**
     * 策略模式封裝Handler,這樣就能在回呼 ServerHandler 的 channelRead0 方法時
     * 找到具體的Handler,而不需要經過責任鏈的每個 Handler 節點,以此來提高性能
     */
    private final Map<Command, SimpleChannelInboundHandler<Message>> map;

    public ServerHandler() {
        map = new HashMap();

        // key: 訊息型別列舉 value: 對應的Handler
        map.put(MessageType.HEART_BEAT, heartBeatHandler);
        // ...
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 呼叫 channelRead() 方法完成業務邏輯處理
        map.get(msg.getMessageType()).channelRead(ctx, msg);
    }
}

改造完成后,服務端代碼如下,因為我們封裝了平行的業務處理Handler,所以代碼很清爽

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日志Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼Handler
                        .addLast(tcpDecoder)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers)
                        // 編碼Handler
                        .addLast(tcpEncoder);
            }
        });

6.3 合并編碼、解碼Handler

Netty 對編碼解碼提供了統一處理Handler是MessageToMessageCodec,這樣我們就能將編碼和解碼的Handler合并成一個添加介面,代碼示例如下

@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {

    /**
     * 序列化器
     */
    @Resourse
    private Serializer serializer;

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        // 將位元組陣列寫入 ByteBuf 
        ByteBuf byteBuf = ctx.alloc().ioBuffer();
        serializer.serialize(byteBuf, msg);

        // 這個編碼也需要添加到List中
        out.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        // 根據協議自定義的解碼邏輯將其解碼成Java物件,并添加到List中
        out.add(serializer.deSerialize(msg));
    }
}

改造完成后,服務端代碼如下,將其放在業務處理Handler前即可,呼叫完業務Handler邏輯,會執行編碼邏輯

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日志Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼、編碼Handler
                        .addLast(messageCodecHandler)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers);
            }
        });

6.4 減少NIO執行緒阻塞

對于耗時的業務操作,需要將它們都丟到業務執行緒池中去處理,因為單個NIO執行緒會管理很多 Channel ,只要有一個 Channel 中的 Handler 的 channelRead() 方法被業務邏輯阻塞,那么它就會拖慢系結在該NIO執行緒上的其他所有 Channel

為了避免上述情況,可以在包含長時間業務處理邏輯的Handler中創建一個執行緒池,并將其丟入執行緒池中進行執行,偽代碼如下

protected void channelRead(ChannelHandlerContext ctx, Object message) {
    threadPool.submit(new Runnable() {
        // 耗時的業務處理邏輯
        doSomethingSependTooMuchTime();
        
        writeAndFlush();
    });
}

6.5 空閑"假死"檢測Handler

如果底層的TCP連接已經斷開,但是另一端服務并沒有捕獲到,在某一端(客戶端或服務端)看來會認為這條連接仍然存在,這就是連接"假死"現象,這造成的問題就是,對于服務端來說,每個連接連接都會耗費CPU和記憶體資源,過多的假死連接會造成性能下降和服務崩潰;對客戶端來說,
連接假死會使得發往服務端的請求都會超時,所以需要盡可能避免假死現象的發生,

造成假死的原因可能是公網丟包、客戶端或服務端網路故障等,Netty為我們提供了 IdleStateHandler 來解決超時假死問題,示例代碼如下

public class MyIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 15;

    public MyIdleStateHandler() {
        // 讀超時時間、寫超時時間、讀寫超時時間 指定0值不判斷超時
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內沒有讀到資料,關閉連接");
        ctx.channel().close();
    }
}

其構造方法中有三個引數來分別指定讀、寫和讀寫超時時間,當指定0時不判斷超時,除此之外Netty也有專門用來處理讀和寫超時的Handler,分別為 ReadTimeoutHandlerWriteTimeoutHandler

將其添加到服務端 Handler 的首位即可

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        // 超時判斷Handler
                        .addLast(new MyIdleStateHandler())
                        // 拆包Handler
                        .addLast(new SplitHandler())
                        // 日志Handler
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        // 解碼、編碼Handler
                        .addLast(messageCodecHandler)
                        // serverHandlers 封裝了 心跳、格口狀態、設備狀態、RFID上報、掃碼上報和分揀結果上報Handler
                        .addLast(serverHandlers);
            }
        });

7. ChannelPipeline

ChannelPipeline 與 Channel 密切相關,它可以看做是一條流水線,資料以位元組流的形式進來,經過不同 Handler 的"加工處理",
最終以位元組流的形式輸出,ChannelPipeline 在每條新連接建立的時候被創建,是一條雙向鏈表,其中每一個節點都是ChannelHadnlerContext 物件,能夠通過它拿到相關的背景關系資訊,默認它有頭節點 HeadContext 和尾結點 TailContext

7.1 InboundHandler 和 OutboundHandler

定義在 ChannelPipeline 中的 Handler 是可插拔的,能夠完成動態編織,呼叫 ctx.pipeline().remove() 方法可移除,呼叫 ctx.pipeline().addXxx() 方法可進行添加,

InboundHandler 與 OutboundHandler 處理的事件不同,前者處理 Inbound事件,典型的就是讀取資料流并加工處理;后者會對呼叫 writeAndFlush() 方法的 Outbound事件 進行處理,

此外,兩者的傳播機制也是不同的:

InboundHandler 會從鏈表頭逐個向下呼叫,頭節點只是簡單的將該事件傳播下去(ctx.fireChannelRead(mug)),執行程序中呼叫findContextInbound() 方法來尋找 InboundHandler 節點,直到 TailContext 節點執行方法完畢,結束呼叫,

一般自定義的 ChannelInboundHandler 都繼承自ChannelInboundHandlerAdapter, 如果沒有覆寫channelXxx() 相關方法,那么該事件正常會遍歷雙向鏈表一直傳播到尾結點,否則就會在當前節點執行完結束;當然也可以呼叫 fireXxx() 方法讓事件從當前節點繼續向下傳播,

OutboundHandler 是從鏈表尾向鏈表頭呼叫,相當于反向遍歷 ChannelPipeline 雙向鏈表,Outbound事件 會先經過TailContext 尾節點,并在執行程序中不斷尋找OutboundHandler 節點加工處理,直到頭節點 HeadContext 呼叫 Unsafe.write() 方法結束,

7.2 例外傳播

例外的傳播機制和 Inbound事件 的傳播機制類似,在任何節點發生的例外都會向下一個節點傳遞,如果自定義的 Handler 沒有處理例外也沒有實作 exceptionCaught() 方法,最終則會落到 TailContext 節點,控制臺列印例外未處理的警告資訊,

通常例外處理,我們會定義一個例外處理器,繼承自ChannelDuplexHandler ,放在自定義鏈表節點的末尾,這樣就能夠一定捕獲和處理例外,

8. Reactor執行緒模型

8.1 NioEventLoopGroup

創建 new NioEventLoopGroup() 它的默認執行緒數是當前CPU執行緒數的2倍,最侄訓呼叫到如下原始碼

// 這里計算的執行緒數量
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

跟進到構造方法的最終實作,會執行如下業務邏輯

NioEventLoopGroup構造方法.jpg

其中在第2步創建 NioEventLoop 時,值得關注的是創建了一個 Selector,以此來實作IO多路復用;另外它還創建了高性能 MPSC(多生產者單消費者)佇列,借助它來協調任務的異步執行,如此單條執行緒(NioEventLoop)、Selector和MPSC它們三者是一對一的關系,而每條連接都對應一個 Channel,每個 Channel 都系結唯一一個 NioEventLoop,因此單個連接的所有操作都是在一個執行緒中執行,是執行緒安全的,

第3步驟創建執行緒選擇器,它的作用是為連接在NioEventLoopGroup 中選擇一個 NioEventLoop,并將該連接與 NioEventLoop 中的 Selector 完成系結,

在底層有兩種選擇器的實作,分別是PowerOfTowEventExecutorChooser 和GenericEventExecutorChooser,它們的原理都是從執行緒池里回圈選擇執行緒,不同的是前者計算回圈的索引采用的是位運算而后者采用的是取余運算

8.2 Reactor執行緒 select 操作

原始碼位置 NioEventLoop 的 run() 方法, select 操作會不斷輪詢是否有IO事件發生,并且在輪詢程序中不斷檢查是否有任務需要執行,保證Netty任務佇列中的任務能夠及時執行,輪詢程序使用一個計數器避開了 JDK 的空輪詢Bug

8.3 處理產生IO事件的Channel

在 Netty 的 Channel 中,有兩大型別的 Channel,一個是 NioServerSocketChannel,由 boss NioEventLoop 處理;另一個是 NioSocketChannel,由worker NioEventLoop 處理,所以

  1. 對于 boss NioEventLoop 來說,輪詢到的是連接事件,后續通過 NioServerSocketChannel 的 Pipeline 將連接交給一個 work NioEventLoop 處理
  2. 對于 work NioEventLoop 來說,輪詢到的是讀寫事件,后續通過 NioSocketChannel 的 Pipeline 將讀取到的資料傳遞給每個ChannelHandler 處理

注意任務的執行都是異步的,

8.4 任務的收集和執行

上文中提到了我們創建了高性能的MPSC佇列,它是用來聚集非Reactor執行緒創建的任務的,NioEventLoop 會在執行的程序中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之后再處理非Reactor執行緒創建的任務,在檢測是否有事件發生的時候,為了保證異步任務的及時處理,只要有任務要處理,就會停止任務檢測,去處理任務,處理任務時是Reactor單執行緒執行,

8.5 注冊連接的流程

當 boss Reactor執行緒檢測到 ACCEPT 事件之后,創建一個 NioSocketChannel,并把用戶設定的 ChannelOption(Option引數配置)、ChannelAttr(Channel 引數)、ChannelHandler(ChannelInitializer)封裝到 NioSocketChannel 中,接著,使用執行緒選擇器在NioEventLoopGroup 中選擇一條 NioEventLoop (執行緒),把 NioSocketChannel 中包裝的JDK Channel 當做Key,自身(NioSocketChannel)作為 attachment,注冊 NioEventLoop 對應的 Selector上,這樣,后續有讀寫事件發生,就可以直接獲取 attachment 來處理讀寫資料的邏輯,

8.6 如何理解IO多路復用

簡單地說:IO多路復用是指可以在一個執行緒內處理多個連接的IO事件請求,以Java中的IO多路復用為例,服務端創建 Selector 物件不斷的呼叫 select() 方法來處理各個連接上的IO事件,之后將這些IO事件交給任務執行緒異步去執行,這就達到了在一個執行緒內同時處理多個連接的IO請求事件的目的,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/551690.html

標籤:其他

上一篇:優化演算法-從梯度下降到深度學習非凸優化

下一篇:返回列表

標籤雲
其他(158466) Python(38117) JavaScript(25400) Java(18013) C(15222) 區塊鏈(8261) C#(7972) AI(7469) 爪哇(7425) MySQL(7162) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5871) 数组(5741) R(5409) Linux(5335) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4565) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2432) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1965) Web開發(1951) HtmlCss(1932) python-3.x(1918) 弹簧靴(1913) C++(1912) xml(1889) PostgreSQL(1874) .NETCore(1857) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • Netty服務端開發及性能優化

    Netty是一個異步基于事件驅動的高性能網路通信框架,可以看做是對NIO和BIO的封裝,并提供了簡單易用的API、Handler和工具類等,用以快速開發高性能、高可靠性的網路服務端和客戶端程式。 ......

    uj5u.com 2023-05-05 09:53:11 more
  • 優化演算法-從梯度下降到深度學習非凸優化

    一、數學優化 1.1 定義 Mathematical Optimization(數學優化)問題,亦稱最優化問題,是指在一定約束條件下,求解一個目標函式的最大值(或最小值)問題。 根據輸入變數 𝑿 的值域是否為實數域,數學優化問題可以分為離散優化問題和連續優化問題. 在連續優化問題中,根據是否有變數 ......

    uj5u.com 2023-05-05 09:18:16 more
  • Codeforces Round 867 (Div. 3)

    A. TubeTube Feed 分析: 從所有a[i]+i-1<=t的選擇種取個max即可 code: #include <bits/stdc++.h> using namespace std; const int N = 55; int a[N], b[N]; int main() { std: ......

    uj5u.com 2023-05-05 09:18:02 more
  • 【動手學深度學習】第十二章筆記:異步計算、資料并行

    為了更好的閱讀體驗,請點擊這里 12.1 編譯器和解釋器 原書主要關注的是命令式編程(imperative programming)。Python 是一種解釋性語言,因此沒有編譯器給代碼優化,代碼會跑得很慢。 12.1.1 符號式編程 考慮另一種選擇符號式編程(symbolic programmin ......

    uj5u.com 2023-05-05 09:17:57 more
  • FreeSWITCH對接vosk實作實時語音識別

    環境:CentOS 7.6_x64 FreeSWITCH版本 :1.10.9 Python版本:3.9.2 一、背景描述 vosk是一個開源語音識別工具,可識別中文,之前介紹過python使用vosk進行中文語音識別,今天記錄下FreeSWITCH對接vosk實作實時語音識別。 vosk離線語音識別 ......

    uj5u.com 2023-05-05 09:12:31 more
  • 基于MobileNet的人臉表情識別系統(MATLAB GUI版+原理詳解)

    本篇博客介紹了基于MobileNet的人臉表情識別系統,支持圖片識別、視頻識別、攝像頭識別等多種形式,通過GUI界面實作表情識別可視化展示。首先介紹了表情識別任務的背景與意義,總結近年來利用深度學習進行表情識別的相關技術和作業。在資料集選擇上,本文選擇了Fer2013和CK+兩個資料集,并使用MAT... ......

    uj5u.com 2023-05-05 09:06:47 more
  • 從功能測驗轉型測驗開發,薪資漲了20K,1000字講述轉型必經之路...

    身處職場之中,猶如逆水行舟不進則退,想要不被后浪拍死在沙灘上,就要不斷學習新知識,接受新事物。 要得到更好的發展,就要緊跟發展趨勢,不斷轉型才能保持競爭力,在職場中占有一席之地。 轉型不是一件容易的事,涉及到轉型、革新,就要突破現有的框架,必然會經歷陣痛。 我剛作業時就是一名月薪4000軟體測驗工程 ......

    uj5u.com 2023-05-05 09:06:23 more
  • 基于YOLOv5的目標檢測系統詳解(附MATLAB GUI版代碼)

    本文重點介紹了基于YOLOv5目標檢測系統的MATLAB實作,用于智能檢測物體種類并記錄和保存結果,對各種物體檢測結果可視化,提高目標識別的便捷性和準確性。本文詳細闡述了目標檢測系統的原理,并給出MATLAB的實作代碼、預訓練模型,以及GUI界面設計。基于YOLOv5目標檢測演算法,在界面中可以選擇各... ......

    uj5u.com 2023-05-05 09:05:47 more
  • 為什么說測驗崗位是巨坑?10年測驗人告訴你千萬別上當

    每次都有人問我軟體測驗的前景是什么樣的,每年也會有人很多人紛紛涌入測驗的崗位上,希望自己能夠進入阿里、華為等大廠
    但是測驗崗位真的那么吃香嗎?今天我結合從零基礎小白到測驗開發的成長經歷,來說下這個行業的發展前景,以及要入行的同學應該從哪個地方入手學習 ......

    uj5u.com 2023-05-05 09:05:13 more
  • 基于YOLOv4的目標檢測系統(附MATLAB代碼+GUI實作)

    本文介紹了一種MATLAB實作的目標檢測系統代碼,采用 YOLOv4 檢測網路作為核心模型,用于訓練和檢測各種任務下的目標,并在GUI界面中對各種目標檢測結果可視化。文章詳細介紹了YOLOv4的實作程序,包括演算法原理、MATLAB 實作代碼、訓練資料集、訓練程序和圖形用戶界面。在GUI界面中,用戶可... ......

    uj5u.com 2023-05-05 09:04:34 more