快速預覽
- 執行緒模型圖
- 流程講解
- 快速上手
- 添加依賴
- 服務端代碼
- 創建自定義的處理器,寫我們自己的業務邏輯
- 客戶端代碼
- 創客戶端自定義處理器
- netty相關面試知識拓展
- 什么是拆包和粘包
- 名詞解釋
- 解釋下什么是零拷貝
在寫代碼之前,我們先看下netty的執行緒模型,這比那固定格式的代碼將會更有趣,看完執行緒模型,你就知道netty寫的那幾段固定代碼的意義了,
執行緒模型圖

這個執行緒模型圖里面大概包含了這幾個組件:bossGroup,workGroup,selectot(accept),selector(讀寫),pipline,NioSocketChannel,NioServerSocketChannel;
-
bossgroup,workgroup
在netty中,處理客戶端的請求會被注冊在兩類selector上,這兩類selector分別對應兩個執行緒池bossGroup和workgroup,bossGroup主要處理客戶端與服務端建立連接注冊的selector;
workgroup看名字也知道了,是用來干活的執行緒池,它主要負責處理客戶端讀事件的selector邏輯;
在創建netty的第一行代碼中,就是創建這兩個執行緒池,一般情況下bossgroup會設定成一個執行緒,workgroup會設定多個執行緒,默認不寫的話,netty會獲取當前服務器中的cpu核數*2作為默認創建的執行緒數量, -
selector(accepet),selector(讀寫)
selector和NIO中的selector是同一種組件,不過在netty中會分為兩種型別的selector:專門處理連接事件的selector和專門處理讀寫事件的selector;
但是在NIO中處理這些事件都是使用的同一個selector,NIO中通過遍歷key的方式,來判斷是連接事件還是讀寫事件,然后交給后端執行緒處理的邏輯; -
NioServerSocketChannel
這是服務端啟動之后創建的一個channel,然后會把這個channel注冊到selector中,并添加自己感興趣的accept的事件,后續所有客戶端發起的連接都會被該channel監聽到,具體用來做什么,我們會結合下個組件介紹
-
NioSocketChannel
客戶端在發起連接請求之后,服務端會通過呼叫NioServerSocketChannel的accepet方法,生成一個NioSocketChannel,接著會從workGroup中挑選一個eventLoop,然后把channel注冊到該eventLoop執行緒的selector上,并添加感興趣的讀事件;
后續客戶端與服務端所有的讀寫操作都會在該channel中進行, -
pipline
pipline是一個實作了職責鏈模式的管道處理器,在初始化之后,會添加一些處理器,例如:編碼器、解碼器、業務邏輯處理器,select在得到客戶端發送過來的資料后,會把資料丟到這個管道里面,然后從頭到尾依次執行這些處理器;
如果是服務端把資料發往客戶端,會從尾部到頭部依次執行處理器,但是從服務端發資料到客戶端,只會執行出站處理器;客戶端發送資料到服務端,只會執行入站處理器,
介紹完這些基本組件之后,我們對netty的執行緒模型應該有了初步的認識,現在我們大概梳理下netty的整個處理程序:
流程講解
-
服務端初始化時,會創建兩個執行緒組bossGroup,workGoup;
-
創建一個NioServerSocketChannel 注冊到bossGroup中eventLoop的selector上面,添加自己感興趣的accept事件, 并監聽指定埠;
-
client1發起連接請求,在服務端會產生一個accept事件,通過遍歷selector中的key得到accept事件;
-
服務端的NioServerSocketChannel通過accept方法進行阻塞(其實該事件已經來了,不需要阻塞),回傳一個客戶端的channel1(NioSocketChannel);
-
獲得了chnnel1之后,服務端會從workgroup挑選一個eventloop1,并將channel1注冊到該eventloop1的selector1上面,并添加感興趣的讀事件;這時候已經初始化好了該通道中的pipline1,并將所有的處理器都添加到了pipline1中;
-
這個時候又新加入一個client2發起連接,會執行同樣的操作,最終將chnnel2注冊到另外一個eventloop2里面的selector2上面,并添加感興趣的讀事件;這時候已經初始化好了該通道中的pipline2,并將所有的處理器都添加到了pipline2中;
-
如果client1發送資料到服務端,服務端生成的selector1會監聽到該事件(讀事件),讀取通道中的資料,并將資料交給pipline1中,執行后續邏輯處理;
-
如果client2發送資料到服務端,服務端生成的selector2會監聽到該事件(讀事件),讀取通道中的資料,并將資料交給pipline2中,執行后續邏輯處理;
快速上手
前面已經將netty的基本組成和其執行緒模型大概說了下,現在我們演示下如何使用netty進行開發:代碼已經放到碼云:穿云箭
添加依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
服務端代碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
// 創建 處理連接請求的執行緒組 1個
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 創建作業組執行緒 默認為 cpu核數*2 個
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipline中添加自定義的handle處理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start");
// 系結9000 埠號 sync指的是 創建完埠監聽后,才執行后續操作
ChannelFuture cf = serverBootstrap.bind(9000).sync();
// 添加監聽器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服務啟動完成");
}
});
// 注冊chnnel的關閉事件,sync是只有當關閉事件發生后才結束該執行緒,否則一直阻塞
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
創建自定義的處理器,寫我們自己的業務邏輯
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到客戶端(%s)訊息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客戶端連接:%s", ctx.channel().remoteAddress().toString()));
}
}
# 這里的ChannelInboundHandlerAdapter已經被廢棄了,大家后續可以繼承SimpleChannelInboundHandler,支持傳入泛型,然后配合解碼器使用,這里只是做個簡單的演示,
客戶端代碼
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客戶端連接:%s", ctx.channel().remoteAddress().toString()));
}
});
System.out.println("netty client start");
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
cf.addListener((ChannelFutureListener) channelFuture -> System.out.println("客戶端啟動完成"));
String msg = "";
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
try {
msg = br.readLine();
} catch (IOException e) {
e.printStackTrace();
}
ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
cf.channel().writeAndFlush(buf);
} while (!msg.equals("end"));
System.out.println("您已退出");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
創客戶端自定義處理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到服戶端(%s)訊息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
}
netty相關面試知識拓展
什么是拆包和粘包
名詞解釋
客戶端與服務端建立了TCP/UDP連接,如果連接中限制了發送資料的報文大小,此時 將要發送的資料大于這個限制,就會產生拆包現象;
截取后的資料包會等待下次發送資料的時候一起發送,如果這個時候這部分資料和其他資料包一起發到服務端,又會產生粘包的現象;

解決方案
- 自己定義資料發送的資料格式,包括資料長度和資料內容兩個,通過長度來判斷資料有沒有結束
- 使用定長解碼器實作
- 使用指定開始符和結束符實作
解釋下什么是零拷貝
說零拷貝之前,我們需要引入一個名詞“直接記憶體”,我們知道java代碼都運行在jvm虛擬機中,分配的記憶體資料都是在jvm中分配的,如果想直接訪問jvm之外的記憶體資料,那就叫直接記憶體訪問;
在netty中,直接使用直接記憶體進行socket進行讀寫,不需要將資料拷貝到jvm中的緩沖區中,而是將資料直接發送到socket中,不需要再執行中間的拷貝操作;

微信搜一搜【AI碼師】關注帥氣的我,回復【干貨領取】,領取2021最新面試資料一份
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/251748.html
標籤:java
