一、環境準備
Netty需要的運行環境很簡單,只有2個,
- JDK 1.8+
- Apache Maven 3.3.9+
二、Netty 客戶端/服務器概覽

如圖,展示了一個我們將要撰寫的 Echo 客戶端和服務器應用程式,該圖展示是多個客戶端同時連接到一臺服務器,所能夠支持的客戶端數量,在理論上,僅受限于系統的可用資源(以及所使用的 JDK 版本可能會施加的限制),
Echo 客戶端和服務器之間的互動是非常簡單的;在客戶端建立一個連接之后,它會向服務器發送一個或多個訊息,反過來服務器又會將每個訊息回送給客戶端,雖然它本身看起來好像用處不大,但它充分地體現了客戶端/服務器系統中典型的請求-回應互動模式,
三、撰寫 Echo 服務器
所有的 Netty 服務器都需要以下兩部分,
- 至少一個 ChannelHandler—該組件實作了服務器對從客戶端接收的資料的處理,即它的業務邏輯,
- 引導—這是配置服務器的啟動代碼,至少,它會將服務器系結到它要監聽連接請求的埠上,
3.1 ChannelHandler 和業務邏輯
上一篇博文我們介紹了 Future 和回呼,并且闡述了它們在事件驅動設計中的應用,我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實作負責接收并回應事件通知,
在 Netty 應用程式中,所有的資料處理邏輯都包含在這些核心抽象的實作中,因為你的 Echo 服務器會回應傳入的訊息,所以它需要實作ChannelInboundHandler 介面,用來定義回應入站事件的方法,簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的默認實作,
我們將要用到的方法是:
- channelRead() :對于每個傳入的訊息都要呼叫;
- channelReadComplete() : 通知ChannelInboundHandler最后一次對channelRead()的呼叫是當前批量讀取中的最后一條訊息;
- exceptionCaught() :在讀取操作期間,有例外拋出時會呼叫,
該 Echo 服務器的 ChannelHandler 實作是 EchoServerHandler,如代碼:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:05
* @notes Netty Echo服務端簡單邏輯
*/
//表示channel可以并多個實體共享,它是執行緒安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
//將訊息列印到控制臺
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//將收到的訊息寫給發送者,而不沖刷出站訊息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//將未決訊息沖刷到遠程節點,并且關閉該 Channe
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//列印例外堆疊跟蹤
cause.printStackTrace();
//關閉該channel
ctx.close();
}
}
ChannelInboundHandlerAdapter 有一個直觀的 API,并且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上,
因為需要處理所有接收到的資料,所以我們重寫了 channelRead() 方法,在這個服務器應用程式中,我們將資料簡單地回送給了遠程節點,
重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子型別做出反應,在這里你記錄了例外并關閉了連接,
雖然一個更加完善的應用程式也許會嘗試從例外中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤,
ps:如果不捕獲例外,會發生什么呢?
每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實體鏈,在默認的情況下,ChannelHandler 會把對它的方法的呼叫轉發給鏈中的下一個 ChannelHandler,因此,如果 exceptionCaught()方法沒有被該鏈中的某處實作,那么所接收的例外將會被傳遞到 ChannelPipeline 的尾端并被記錄,為此,你的應用程式應該提供至少有一個實作exceptionCaught()方法的 ChannelHandler,
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子型別和實作,這些之后會一一說明,目前,我們只關注:
- 針對不同型別的事件來呼叫 ChannelHandler;
- 應用程式通過實作或者擴展 ChannelHandler 來掛鉤到事件的生命周期,并且提供自定義的應用程式邏輯;
- 在架構上,ChannelHandler 有助于保持業務邏輯與網路處理代碼的分離,這簡化了開發程序,因為代碼必須不斷地演化以回應不斷變化的需求,
3.2 引導服務器
下面我們準備開始構建服務器,構建服務器涉及到兩個內容:
- 系結到服務器將在其上監聽并接受傳入連接請求的埠;
- 配置 Channel,以將有關的入站訊息通知給 EchoServerHandler 實體,
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:21
* @notes Netty引導服務器
*/
public class EchoServer {
public static void main(String[] args) throws Exception {
//呼叫服務器的 start()方法
new EchoServer().start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
//創建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建ServerBootstra
ServerBootstrap b = new ServerBootstrap();
//指定服務器監視埠
int port = 8080;
b.group(group)
//指定所使用的 NIO 傳輸 Channel
//因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接,
// 并且將 Channel 的型別指定為 NioServerSocketChannel ,
.channel(NioServerSocketChannel.class)
//使用指定的埠設定套接字地址
//將本地地址設定為一個具有選定埠的 InetSocketAddress ,服務器將系結到這個地址以監聽新的連接請求
.localAddress(new InetSocketAddress(port))
//添加一個EchoServerHandler 到子Channel的 ChannelPipeline
//這里使用了一個特殊的類——ChannelInitializer,這是關鍵,
// 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的
//EchoServerHandler 的實體添加到該 Channel 的 ChannelPipeline 中,正如我們之前所解釋的,
// 這個 ChannelHandler 將會收到有關入站訊息的通知,
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
//EchoServerHandler 被標注為 @Shareable,所以我們可以總是使用同樣的實體
//實際上所有客戶端都是使用的同一個EchoServerHandler
ch.pipeline().addLast(serverHandler);
}
});
//異步地系結服務器,呼叫 sync()方法阻塞等待直到系結完成
//sync()方法的呼叫將導致當前 Thread阻塞,一直到系結操作完成為止
ChannelFuture f = b.bind().sync();
//獲取 Channel 的CloseFuture,并且阻塞當前線
//該應用程式將會阻塞等待直到服務器的 Channel關閉(因為你在 Channel 的 CloseFuture 上呼叫了 sync()方法)
f.channel().closeFuture().sync();
} finally {
//關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的執行緒
group.shutdownGracefully().sync();
}
}
}
我們總結一下服務器實作中的重要步驟,下面這些是服務器的主要代碼組件:
- EchoServerHandler 實作了業務邏輯;
- main()方法引導了服務器;
引導程序中所需要的步驟如下:- 創建一個 ServerBootstrap 的實體以引導和系結服務器;
- 創建并分配一個 NioEventLoopGroup 實體以進行事件的處理,如接受新連接以及讀/寫資料;
- 指定服務器系結的本地的 InetSocketAddress;
- 使用一個 EchoServerHandler 的實體初始化每一個新的 Channel;
- 呼叫 ServerBootstrap.bind()方法以系結服務器,
到此我們的引導服務器已經完成,
四、撰寫 Echo 客戶端
Echo 客戶端將會:
(1)連接到服務器;
(2)發送一個或者多個訊息;
(3)對于每個訊息,等待并接收從服務器發回的相同的訊息;
(4)關閉連接,
撰寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在服務器中看到的一樣,
4.1 通過 ChannelHandler 實作客戶端邏輯
如同服務器,客戶端將擁有一個用來處理資料的 ChannelInboundHandler,在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務,這要求重寫下面的方法:
- channelActive() : 在到服務器的連接已經建立之后將被呼叫;
- channelRead0() : 當從服務器接收到一條訊息時被呼叫;
- exceptionCaught() :在處理程序中引發例外時被呼叫,
具體代碼可以參考如下:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:45
* @notes Netty 簡單的客戶端邏輯
*/
//標記該類的實體可以被多個 Channel 共享
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
//當被通知 Channel是活躍的時候,發送一條訊息
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
//記錄已接收訊息的轉儲
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
//在發生例外時,記錄錯誤并關閉Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
首先,我們重寫了 channelActive() 方法,其將在一個連接建立時被呼叫,這確保了資料將會被盡可能快地寫入服務器,其在這個場景下是一個編碼了字串"Netty rocks!"的位元組緩沖區,
接下來,我們重寫了 channelRead0() 方法,每當接收資料時,都會呼叫這個方法,由服務器發送的訊息可能會被分塊接收,也就是說,如果服務器發送了 5 位元組,那么不能保證這 5 位元組會被一次性接收,即使是對于這么少量的資料,channelRead0()方法也可能會被呼叫兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf,作為一個面向流的協議,TCP 保證了位元組陣列將會按照服務器發送它們的順序被接收,
ps:所以channelRead0()的呼叫次數不一定等于服務器發布訊息的次數
重寫的第三個方法是 exceptionCaught(),如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到服務器的連接,
ps:為什么客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在客戶端,當 channelRead0()方法完成時,我們已經有了傳入訊息,并且已經處理完它了,當該方法回傳時,SimpleChannelInboundHandler 負責釋放指向保存該訊息的 ByteBuf 的記憶體參考,
在 EchoServerHandler 中,我們仍然需要將傳入訊息回送給發送者,而 write()操作是異步的,直到 channelRead()方法回傳后可能仍然沒有完成,為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放訊息,訊息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被呼叫時被釋放,
4.2 引導客戶端
引導客戶端類似于引導服務器,不同的是,客戶端是使用主機和埠引數來連接遠程地址,也就是這里的 Echo 服務器的地址,而不是系結到一個一直被監聽的埠,
package com.example.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:59
* @notes 引導客戶端
*/
public class EchoClient {
public void start() throws Exception {
//指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實作
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建 Bootstrap
Bootstrap b = new Bootstrap();
b.group(group)
//適用于 NIO 傳輸的 Channel 型別
.channel(NioSocketChannel.class)
//設定服務器的InetSocketAddress
.remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實體
ch.pipeline().addLast(new EchoClientHandler());}
});
//連接到遠程節點,阻塞等待直到連接完成
ChannelFuture f = b.connect().sync();
//阻塞,直到Channel 關閉
f.channel().closeFuture().sync();
} finally {
//關閉執行緒池并且釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient().start();
}
}
總結一下要點:
- 為初始化客戶端,創建了一個 Bootstrap 實體;
- 為進行事件處理分配了一個 NioEventLoopGroup 實體,其中事件處理包括創建新的連接以及處理入站和出站資料;
- 為服務器連接創建了一個 InetSocketAddress 實體;
- 當連接被建立時,一個 EchoClientHandler 實體會被安裝到(該 Channel 的)
ChannelPipeline 中; - 在一切都設定完成后,呼叫 Bootstrap.connect()方法連接到遠程節點;
綜上客戶端的構建已經完成,
五、構建和運行 Echo 服務器和客戶端
將我們上面的代碼復制到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:
服務端控制臺列印:

客戶端控制臺列印:

我們關閉服務端后,客戶端控制臺列印:

因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,列印出了例外堆疊并關閉了連接,
這只是一個簡單的應用程式,但是它可以伸縮到支持數千個并發連接——每秒可以比普通的基于套接字的 Java 應用程式處理多得多的訊息,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/553272.html
標籤:其他
上一篇:springboot~sharding-jdbc實作分庫分表
下一篇:返回列表
