本系列Netty原始碼決議文章基于 4.1.56.Final版本
在上篇文章《聊聊Netty那些事兒之從內核角度看IO模型》中我們花了大量的篇幅來從內核角度詳細講述了五種IO模型的演程序序以及ReactorIO執行緒模型的底層基石IO多路復用技術在內核中的實作原理,
最后我們引出了netty中使用的主從Reactor IO執行緒模型,

通過上篇文章的介紹,我們已經清楚了在IO呼叫的程序中內核幫我們搞了哪些事情,那么俗話說的好內核領進門,修行在netty,netty在用戶空間又幫我們搞了哪些事情?
那么從本文開始,筆者將從原始碼角度來帶大家看下上圖中的Reactor IO執行緒模型在Netty中是如何實作的,
本文作為Reactor在Netty中實作系列文章中的開篇文章,筆者先來為大家介紹Reactor的骨架是如何創建出來的,
在上篇文章中我們提到Netty采用的是主從Reactor多執行緒的模型,但是它在實作上又與Doug Lea在Scalable IO in Java論文中提到的經典主從Reactor多執行緒模型有所差異,

Netty中的Reactor是以Group的形式出現的,主從Reactor在Netty中就是主從Reactor組,每個Reactor Group中會有多個Reactor用來執行具體的IO任務,當然在netty中Reactor不只用來執行IO任務,這個我們后面再說,
Main Reactor Group中的Reactor數量取決于服務端要監聽的埠個數,通常我們的服務端程式只會監聽一個埠,所以Main Reactor Group只會有一個Main Reactor執行緒來處理最重要的事情:系結埠地址,接收客戶端連接,為客戶端創建對應的SocketChannel,將客戶端SocketChannel分配給一個固定的Sub Reactor,也就是上篇文章筆者為大家舉的例子,飯店最重要的作業就是先把客人迎接進來,“我家大門常打開,開放懷抱等你,擁抱過就有了默契你會愛上這里......”

Sub Reactor Group里有多個Reactor執行緒,Reactor執行緒的個數可以通過系統引數-D io.netty.eventLoopThreads指定,默認的Reactor的個數為CPU核數 * 2,Sub Reactor執行緒主要用來輪詢客戶端SocketChannel上的IO就緒事件,處理IO就緒事件,執行異步任務,Sub Reactor Group做的事情就是上篇飯店例子中服務員的作業,客人進來了要為客人分配座位,端茶送水,做菜上菜,“不管遠近都是客人,請不用客氣,相約好了在一起,我們歡迎您......”

一個
客戶端SocketChannel只能分配給一個固定的Sub Reactor,一個Sub Reactor負責處理多個客戶端SocketChannel,這樣可以將服務端承載的全量客戶端連接分攤到多個Sub Reactor中處理,同時也能保證客戶端SocketChannel上的IO處理的執行緒安全性,
由于文章篇幅的關系,作為Reactor在netty中實作的第一篇我們主要來介紹主從Reactor Group的創建流程,骨架脈絡先搭好,
下面我們來看一段Netty服務端代碼的撰寫模板,從代碼模板的流程中我們來決議下主從Reactor的創建流程以及在這個程序中所涉及到的Netty核心類,
Netty服務端代碼模板
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創建主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel型別
.option(ChannelOption.SO_BACKLOG, 100)//設定主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設定主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設定從Reactor中注冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 系結埠啟動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 首先我們要創建Netty最核心的部分 ->
創建主從Reactor Group,在Netty中EventLoopGroup就是Reactor Group的實作類,對應的EventLoop就是Reactor的實作類,
//創建主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 創建用于
IO處理的ChannelHandler,實作相應IO事件的回呼函式,撰寫對應的IO處理邏輯,注意這里只是簡單示例哈,詳細的IO事件處理,筆者會單獨開一篇文章專門講述,
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................省略IO處理邏輯................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
創建
ServerBootstrapNetty服務端啟動類,并在啟動類中配置啟動Netty服務端所需要的一些必備資訊,-
通過
serverBootstrap.group(bossGroup, workerGroup)為Netty服務端配置主從Reactor Group實體, -
通過
serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服務端的ServerSocketChannel用于系結埠地址以及創建客戶端SocketChannel,Netty中的NioServerSocketChannel.class就是對JDK NIO中ServerSocketChannel的封裝,而用于表示客戶端連接的NioSocketChannel是對JDK NIOSocketChannel封裝,
在上篇文章介紹
Socket內核結構小節中我們提到,在撰寫服務端網路程式時,我們首先要創建一個Socket用于listen和bind埠地址,我們把這個叫做監聽Socket,這里對應的就是NioServerSocketChannel.class,當客戶端連接完成三次握手,系統呼叫accept函式會基于監聽Socket創建出來一個新的Socket專門用于與客戶端之間的網路通信我們稱為客戶端連接Socket,這里對應的就是NioSocketChannel.class-
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)設定服務端ServerSocketChannel中的SocketOption,關于SocketOption的選項我們后邊的文章再聊,本文主要聚焦在NettyMain Reactor Group的創建及作業流程, -
serverBootstrap.handler(....)設定服務端NioServerSocketChannel中對應Pipieline中的ChannelHandler,
netty有兩種
Channel型別:一種是服務端用于監聽系結埠地址的NioServerSocketChannel,一種是用于客戶端通信的NioSocketChannel,每種Channel型別實體都會對應一個PipeLine用于編排對應channel實體上的IO事件處理邏輯,PipeLine中組織的就是ChannelHandler用于撰寫特定的IO處理邏輯,注意
serverBootstrap.handler設定的是服務端NioServerSocketChannel PipeLine中的ChannelHandler,serverBootstrap.childHandler(ChannelHandler childHandler)用于設定客戶端NioSocketChannel中對應Pipieline中的ChannelHandler,我們通常配置的編碼解碼器就是在這里,
ServerBootstrap啟動類方法帶有child前綴的均是設定客戶端NioSocketChannel屬性的,ChannelInitializer是用于當SocketChannel成功注冊到系結的Reactor上后,用于初始化該SocketChannel的Pipeline,它的initChannel方法會在注冊成功后執行,這里只是捎帶提一下,讓大家有個初步印象,后面我會專門介紹, -
-
ChannelFuture f = serverBootstrap.bind(PORT).sync()這一步會是下篇文章要重點分析的主題Main Reactor Group的啟動,系結埠地址,開始監聽客戶端連接事件(OP_ACCEPT),本文我們只關注創建流程, -
f.channel().closeFuture().sync()等待服務端NioServerSocketChannel關閉,Netty服務端到這里正式啟動,并準備好接受客戶端連接的準備, -
shutdownGracefully優雅關閉主從Reactor執行緒組里的所有Reactor執行緒,
Netty對IO模型的支持
在上篇文章中我們介紹了五種IO模型,Netty中支持BIO,NIO,AIO以及多種作業系統下的IO多路復用技術實作,
在Netty中切換這幾種IO模型也是非常的方便,下面我們來看下Netty如何對這幾種IO模型進行支持的,
首先我們介紹下幾個與IO模型相關的重要介面:
EventLoop
EventLoop就是Netty中的Reactor,可以說它就是Netty的引擎,負責Channel上IO就緒事件的監聽,IO就緒事件的處理,異步任務的執行驅動著整個Netty的運轉,
不同IO模型下,EventLoop有著不同的實作,我們只需要切換不同的實作類就可以完成對NettyIO模型的切換,
| BIO | NIO | AIO |
|---|---|---|
| ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型下Netty會自動根據作業系統以及版本的不同選擇對應的IO多路復用技術實作,比如Linux 2.6版本以上用的是Epoll,2.6版本以下用的是Poll,Mac下采用的是Kqueue,
其中Linux kernel 在5.1版本引入的異步IO庫io_uring正在netty中范訓,
EventLoopGroup
Netty中的Reactor是以Group的形式出現的,EventLoopGroup正是Reactor組的介面定義,負責管理Reactor,Netty中的Channel就是通過EventLoopGroup注冊到具體的Reactor上的,
Netty的IO執行緒模型是主從Reactor多執行緒模型,主從Reactor執行緒組在Netty原始碼中對應的其實就是兩個EventLoopGroup實體,
不同的IO模型也有對應的實作:
| BIO | NIO | AIO |
|---|---|---|
| ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用于Netty服務端使用的ServerSocketChannel,對應于上篇文章提到的監聽Socket,負責系結監聽埠地址,接收客戶端連接并創建用于與客戶端通信的SocketChannel,
不同的IO模型下的實作:
| BIO | NIO | AIO |
|---|---|---|
| OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用于與客戶端通信的SocketChannel,對應于上篇文章提到的客戶端連接Socket,當客戶端完成三次握手后,由系統呼叫accept函式根據監聽Socket創建,
不同的IO模型下的實作:
| BIO | NIO | AIO |
|---|---|---|
| OioSocketChannel | NioSocketChannel | AioSocketChannel |
我們看到在不同IO模型的實作中,Netty這些圍繞IO模型的核心類只是前綴的不同:
- BIO對應的前綴為
Oio表示old io,現在已經廢棄不推薦使用, - NIO對應的前綴為
Nio,正是Netty推薦也是我們常用的非阻塞IO模型, - AIO對應的前綴為
Aio,由于Linux下的異步IO機制實作的并不成熟,性能提升表現上也不明顯,現已被洗掉,
我們只需要將IO模型的這些核心介面對應的實作類前綴改為對應IO模型的前綴,就可以輕松在Netty中完成對IO模型的切換,

多種NIO的實作
| Common | Linux | Mac |
|---|---|---|
| NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
| NioEventLoop | EpollEventLoop | KQueueEventLoop |
| NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
| NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
我們通常在使用NIO模型的時候會使用Common列下的這些IO模型核心類,Common類也會根據作業系統的不同自動選擇JDK在對應平臺下的IO多路復用技術的實作,
而Netty自身也根據作業系統的不同提供了自己對IO多路復用技術的實作,比JDK的實作性能更優,比如:
JDK的 NIO默認實作是水平觸發,Netty 是邊緣觸發(默認)和水平觸發可切換,,- Netty 實作的垃圾回收更少、性能更好,
我們撰寫Netty服務端程式的時候也可以根據作業系統的不同,采用Netty自身的實作來進一步優化程式,做法也很簡單,直接將上圖中紅框里的實作類替換成Netty的自身實作類即可完成切換,
經過以上對Netty服務端代碼撰寫模板以及IO模型相關核心類的簡單介紹,我們對Netty的創建流程有了一個簡單粗略的總體認識,下面我們來深入剖析下創建流程程序中的每一個步驟以及這個程序中涉及到的核心類實作,
以下原始碼決議部分我們均采用Common列下NIO相關的實作進行決議,
創建主從Reactor執行緒組
在Netty服務端程式撰寫模板的開始,我們首先會創建兩個Reactor執行緒組:

-
一個是主Reactor執行緒組
bossGroup用于監聽客戶端連接,創建客戶端連接NioSocketChannel,并將創建好的客戶端連接NioSocketChannel注冊到從Reactor執行緒組中一個固定的Reactor上, -
一個是從Reactor執行緒組
workerGroup,workerGroup中的Reactor負責監聽系結在其上的客戶端連接NioSocketChannel上的IO就緒事件,并處理IO就緒事件,執行異步任務,
//創建主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor執行緒組的實作類為NioEventLoopGroup,在創建bossGroup和workerGroup的時候用到了NioEventLoopGroup的兩個建構式:
- 帶
nThreads引數的建構式public NioEventLoopGroup(int nThreads), - 不帶
nThreads引數的默認建構式public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......................省略...........................
}
nThreads引數表示當前要創建的Reactor執行緒組內包含多少個Reactor執行緒,不指定nThreads引數的話采用默認的Reactor執行緒個數,用0表示,
最侄訓呼叫到建構式
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
下面簡單介紹下建構式中這幾個引數的作用,后面我們在講解本文主線的程序中還會提及這幾個引數,到時在詳細介紹,這里只是讓大家有個初步印象,不必做過多的糾纏,
Executor executor:負責啟動Reactor執行緒進而Reactor才可以開始作業,
Reactor執行緒組
NioEventLoopGroup負責創建Reactor執行緒,在創建的時候會將executor傳入,
-
RejectedExecutionHandler:當向Reactor添加異步任務添加失敗時,采用的拒絕策略,Reactor的任務不只是監聽IO活躍事件和IO任務的處理,還包括對異步任務的處理,這里大家只需有個這樣的概念,后面筆者會專門詳細介紹, -
SelectorProvider selectorProvider:Reactor中的IO模型為IO多路復用模型,對應于JDK NIO中的實作為java.nio.channels.Selector(就是我們上篇文章中提到的select,poll,epoll),每個Reator中都包含一個Selector,用于輪詢注冊在該Reactor上的所有Channel上的IO事件,SelectorProvider就是用來創建Selector的, -
SelectStrategyFactory selectStrategyFactory:Reactor最重要的事情就是輪詢注冊其上的Channel上的IO就緒事件,這里的SelectStrategyFactory用于指定輪詢策略,默認為DefaultSelectStrategyFactory.INSTANCE,
最侄訓將這些引數交給NioEventLoopGroup 的父類構造器,下面我們來看下NioEventLoopGroup類的繼承結構:

NioEventLoopGroup類的繼承結構乍一看比較復雜,大家不要慌,筆者會隨著主線的深入慢慢地介紹這些父類介面,我們現在重點關注Mutithread前綴的類,
我們知道NioEventLoopGroup是Netty中的Reactor執行緒組的實作,既然是執行緒組那么肯定是負責管理和創建多個Reactor執行緒的,所以Mutithread前綴的類定義的行為自然是對Reactor執行緒組內多個Reactor執行緒的創建和管理作業,
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//默認Reactor個數
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...................省略.....................
}
MultithreadEventLoopGroup類主要的功能就是用來確定Reactor執行緒組內Reactor的個數,
默認的Reactor的個數存放于欄位DEFAULT_EVENT_LOOP_THREADS 中,
從static {}靜態代碼塊中我們可以看出默認Reactor的個數的獲取邏輯:
-
可以通過系統變數
-D io.netty.eventLoopThreads"指定, -
如果不指定,那么默認的就是
NettyRuntime.availableProcessors() * 2
當nThread引數設定為0采用默認設定時,Reactor執行緒組內的Reactor個數則設定為DEFAULT_EVENT_LOOP_THREADS,
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup這里就是本小節的核心,主要用來定義創建和管理Reactor的行為,
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor執行緒組中的Reactor集合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
//從Reactor group中選擇一個特定的Reactor的選擇策略 用于channel注冊系結到一個固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................省略................................
}
首先介紹一個新的構造器引數EventExecutorChooserFactory chooserFactory,當客戶端連接完成三次握手后,Main Reactor會創建客戶端連接NioSocketChannel,并將其系結到Sub Reactor Group中的一個固定Reactor,那么具體要系結到哪個具體的Sub Reactor上呢?這個系結策略就是由chooserFactory來創建的,默認為DefaultEventExecutorChooserFactory,
下面就是本小節的主題Reactor執行緒組的創建程序:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//用于創建Reactor執行緒
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//回圈創建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//創建reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
................省略................
}
}
}
//創建channel到Reactor的系結策略
chooser = chooserFactory.newChooser(children);
................省略................
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1. 創建用于啟動Reactor執行緒的executor
在Netty Reactor Group中的單個Reactor的IO執行緒模型為上篇文章提到的單Reactor單執行緒模型,一個Reactor執行緒負責輪詢注冊其上的所有Channel中的IO就緒事件,處理IO事件,執行Netty中的異步任務等作業,正是這個Reactor執行緒驅動著整個Netty的運轉,可謂是Netty的核心引擎,

而這里的executor就是負責啟動Reactor執行緒的,從創建原始碼中我們可以看到executor的型別為ThreadPerTaskExecutor ,
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我們看到ThreadPerTaskExecutor 做的事情很簡單,從它的命名前綴ThreadPerTask我們就可以猜出它的作業方式,就是來一個任務就創建一個執行緒執行,而創建的這個執行緒正是netty的核心引擎Reactor執行緒,
在Reactor執行緒啟動的時候,Netty會將Reactor執行緒要做的事情封裝成Runnable,丟給exexutor啟動,
而Reactor執行緒的核心就是一個死回圈不停的輪詢IO就緒事件,處理IO事件,執行異步任務,一刻也不停歇,堪稱996典范,
這里向大家先賣個關子,"Reactor執行緒是何時啟動的呢??"
2. 創建Reactor
Reactor執行緒組NioEventLoopGroup包含多個Reactor,存放于private final EventExecutor[] children陣列中,
所以下面的事情就是創建nThread個Reactor,并存放于EventExecutor[] children欄位中,
我們來看下用于創建Reactor的newChild(executor, args)方法:
newChild
newChild方法是MultithreadEventExecutorGroup中的一個抽象方法,提供給具體子類實作,
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
這里我們決議的是NioEventLoopGroup,我們來看下newChild在該類中的實作:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
前邊提到的眾多構造器引數,這里會通過可變引數Object... args傳入到Reactor類NioEventLoop的構造器中,
這里介紹下新的引數EventLoopTaskQueueFactory queueFactory,前邊提到Netty中的Reactor主要作業是輪詢注冊其上的所有Channel上的IO就緒事件,處理IO就緒事件,除了這些主要的作業外,Netty為了極致的壓榨Reactor的性能,還會讓它做一些異步任務的執行作業,既然要執行異步任務,那么Reactor中就需要一個佇列來保存任務,
這里的EventLoopTaskQueueFactory就是用來創建這樣的一個佇列來保存Reactor中待執行的異步任務,
可以把Reactor理解成為一個單執行緒的執行緒池,類似于JDK中的SingleThreadExecutor,僅用一個執行緒來執行輪詢IO就緒事件,處理IO就緒事件,執行異步任務,同時待執行的異步任務保存在Reactor里的taskQueue中,
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
//用于創建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector輪詢策略 決定什么時候輪詢,什么時候處理IO事件,什么時候執行異步任務
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}
這里就正式開始了Reactor的創建程序,我們知道Reactor的核心是采用的IO多路復用模型來對客戶端連接上的IO事件進行監聽,所以最重要的事情是創建Selector(JDK NIO 中IO多路復用技術的實作),
可以把
Selector理解為我們上篇文章介紹的Select,poll,epoll,它是JDK NIO對作業系統內核提供的這些IO多路復用技術的封裝,
openSelector
openSelector是NioEventLoop類中用于創建IO多路復用的Selector,并對創建出來的JDK NIO 原生的Selector進行性能優化,
首先會通過SelectorProvider#openSelector 創建JDK NIO原生的Selector,
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//通過JDK NIO SelectorProvider創建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
..................省略.............
}
SelectorProvider會根據作業系統的不同選擇JDK在不同作業系統版本下的對應Selector的實作,Linux下會選擇Epoll,Mac下會選擇Kqueue,
下面我們就來看下SelectorProvider是如何做到自動適配不同作業系統下IO多路復用實作的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
SelectorProvider是在前面介紹的NioEventLoopGroup類建構式中通過呼叫SelectorProvider.provider()被加載,并通過NioEventLoopGroup#newChild方法中的可變長引數Object... args傳遞到NioEventLoop中的private final SelectorProvider provider欄位中,
SelectorProvider的加載程序:
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
從SelectorProvider加載原始碼中我們可以看出,SelectorProvider的加載方式有三種,優先級如下:
- 通過系統變數
-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定義實作類全限定名,通過應用程式類加載器(Application Classloader)加載,
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
}
.................省略.............
}
- 通過
SPI方式加載,在工程目錄META-INF/services下定義名為java.nio.channels.spi.SelectorProvider的SPI檔案,檔案中第一個定義的SelectorProvider實作類全限定名就會被加載,
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
- 如果以上兩種方式均未被定義,那么就采用
SelectorProvider系統默認實作sun.nio.ch.DefaultSelectorProvider,筆者當前使用的作業系統是MacOS,從原始碼中我們可以看到自動適配了KQueue實作,
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
不同作業系統中JDK對于
DefaultSelectorProvider會有所不同,Linux內核版本2.6以上對應的Epoll,Linux內核版本2.6以下對應的Poll,MacOS對應的是KQueue,
下面我們接著回到io.netty.channel.nio.NioEventLoop#openSelector的主線上來,
Netty對JDK NIO 原生Selector的優化
首先在NioEventLoop中有一個Selector優化開關DISABLE_KEY_SET_OPTIMIZATION,通過系統變數-D io.netty.noKeySetOptimization指定,默認是開啟的,表示需要對JDK NIO原生Selector進行優化,
public final class NioEventLoop extends SingleThreadEventLoop {
//Selector優化開關 默認開啟 為了遍歷的效率 會對Selector中的SelectedKeys進行資料結構優化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
如果優化開關DISABLE_KEY_SET_OPTIMIZATION 是關閉的,那么直接回傳JDK NIO原生的Selector,
private SelectorTuple openSelector() {
..........SelectorProvider創建JDK NIO 原生Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION) {
//JDK NIO原生Selector ,Selector優化開關 默認開啟需要對Selector進行優化
return new SelectorTuple(unwrappedSelector);
}
}
下面為Netty對JDK NIO原生的Selector的優化程序:
- 獲取
JDK NIO原生Selector的抽象實作類sun.nio.ch.SelectorImpl,JDK NIO原生Selector的實作均繼承于該抽象類,用于判斷由SelectorProvider創建出來的Selector是否為JDK默認實作(SelectorProvider第三種加載方式),因為SelectorProvider可以是自定義加載,所以它創建出來的Selector并不一定是JDK NIO 原生的,
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
JDK NIO Selector的抽象類sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就緒的SelectionKey(里面包裹著channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//注冊在該Selector上的所有SelectionKey(里面包裹著channel)
protected HashSet<SelectionKey> keys;
// Public views of the key sets
//用于向呼叫執行緒回傳的keys,不可變
private Set<SelectionKey> publicKeys; // Immutable
//當有IO就緒的SelectionKey時,向呼叫執行緒回傳,只可洗掉其中元素,不可增加
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
//不可變
publicKeys = Collections.unmodifiableSet(keys);
//只可洗掉其中元素,不可增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
這里筆者來簡單介紹下JDK NIO中的Selector中這幾個欄位的含義,我們可以和上篇文章講到的epoll在內核中的結構做類比,方便大家后續的理解:

Set<SelectionKey> selectedKeys類似于我們上篇文章講解Epoll時提到的就緒佇列eventpoll->rdllist,Selector這里大家可以理解為Epoll,Selector會將自己監聽到的IO就緒的Channel放到selectedKeys中,
這里的
SelectionKey暫且可以理解為Channel在Selector中的表示,類比上圖中epitem結構里的epoll_event,封裝IO就緒Socket的資訊,
其實SelectionKey里包含的資訊不止是Channel還有很多IO相關的資訊,后面我們在詳細介紹,
HashSet<SelectionKey> keys:這里存放的是所有注冊到該Selector上的Channel,類比epoll中的紅黑樹結構rb_root
SelectionKey在Channel注冊到Selector中后生成,
-
Set<SelectionKey> publicSelectedKeys相當于是selectedKeys的視圖,用于向外部執行緒回傳IO就緒的SelectionKey,這個集合在外部執行緒中只能做洗掉操作不可增加元素,并且不是執行緒安全的, -
Set<SelectionKey> publicKeys相當于keys的不可變視圖,用于向外部執行緒回傳所有注冊在該Selector上的SelectionKey
這里需要重點關注抽象類sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys這兩個欄位,注意它們的型別都是HashSet ,一會優化的就是這里!!!!
- 判斷由
SelectorProvider創建出來的Selector是否是JDK NIO原生的Selector實作,因為Netty優化針對的是JDK NIO 原生Selector,判斷標準為sun.nio.ch.SelectorImpl類是否為SelectorProvider創建出Selector的父類,如果不是則直接回傳,不在繼續下面的優化程序,
//判斷是否可以對Selector進行優化,這里主要針對JDK NIO原生Selector的實作類進行優化,因為SelectorProvider可以加載的是自定義Selector實作
//如果SelectorProvider創建的Selector不是JDK原生sun.nio.ch.SelectorImpl的實作類,那么無法進行優化,直接回傳
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
通過前面對SelectorProvider的介紹我們知道,這里通過provider.openSelector()創建出來的Selector實作類為KQueueSelectorImpl類,它繼承實作了sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的Selector實作
class KQueueSelectorImpl extends SelectorImpl {
}
- 創建
SelectedSelectionKeySet通過反射替換掉sun.nio.ch.SelectorImpl類中selectedKeys和publicSelectedKeys的默認HashSet實作,
為什么要用SelectedSelectionKeySet替換掉原來的HashSet呢??
因為這里涉及到對HashSet型別的sun.nio.ch.SelectorImpl#selectedKeys集合的兩種操作:
-
插入操作: 通過前邊對
sun.nio.ch.SelectorImpl類中欄位的介紹我們知道,在Selector監聽到IO就緒的SelectionKey后,會將IO就緒的SelectionKey插入sun.nio.ch.SelectorImpl#selectedKeys集合中,這時Reactor執行緒會從java.nio.channels.Selector#select(long)阻塞呼叫中回傳(類似上篇文章提到的epoll_wait), -
遍歷操作:
Reactor執行緒回傳后,會從Selector中獲取IO就緒的SelectionKey集合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor執行緒遍歷selectedKeys,獲取IO就緒的SocketChannel,并處理SocketChannel上的IO事件,
我們都知道HashSet底層資料結構是一個哈希表,由于Hash沖突這種情況的存在,所以導致對哈希表進行插入和遍歷操作的性能不如對陣列進行插入和遍歷操作的性能好,
還有一個重要原因是,陣列可以利用CPU快取的優勢來提高遍歷的效率,后面筆者會有一篇專門的文章來講述利用CPU快取行如何為我們帶來性能優勢,
所以Netty為了優化對sun.nio.ch.SelectorImpl#selectedKeys集合的插入,遍歷性能,自己用陣列這種資料結構實作了SelectedSelectionKeySet ,用它來替換原來的HashSet實作,
SelectedSelectionKeySet
-
初始化
SelectionKey[] keys陣列大小為1024,當陣列容量不夠時,擴容為原來的兩倍大小, -
通過陣列尾部指標
size,在向陣列插入元素的時候可以直接定位到插入位置keys[size++],操作一步到位,不用像哈希表那樣還需要解決Hash沖突, -
對陣列的遍歷操作也是如絲般順滑,CPU直接可以在快取行中遍歷讀取陣列元素無需訪問記憶體,比
HashSet的迭代器java.util.HashMap.KeyIterator遍歷方式性能不知高到哪里去了,
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//采用陣列替換到JDK中的HashSet,這樣add操作和遍歷操作效率更高,不需要考慮hash沖突
SelectionKey[] keys;
//陣列尾部指標
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
/**
* 陣列的添加效率高于 HashSet 因為不需要考慮hash沖突
* */
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
//時間復雜度O(1)
keys[size++] = o;
if (size == keys.length) {
//擴容為原來的兩倍大小
increaseCapacity();
}
return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
/**
* 采用陣列的遍歷效率 高于 HashSet
* */
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
看到這里不禁感嘆,從各種小的細節可以看出Netty對性能的優化簡直淋漓盡致,對性能的追求令人發指,細節真的是魔鬼,
- Netty通過反射的方式用
SelectedSelectionKeySet替換掉sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys這兩個集合中原來HashSet的實作,
- 反射獲取
sun.nio.ch.SelectorImpl類中selectedKeys和publicSelectedKeys,
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Java9版本以上通過sun.misc.Unsafe設定欄位值的方式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
- 通過反射的方式用
SelectedSelectionKeySet替換掉hashSet實作的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys,
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//Java8反射替換欄位
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- 將與
sun.nio.ch.SelectorImpl類中selectedKeys和publicSelectedKeys關聯好的Netty優化實作SelectedSelectionKeySet,設定到io.netty.channel.nio.NioEventLoop#selectedKeys欄位中保存,
//會通過反射替換selector物件中的selectedKeySet保存就緒的selectKey
//該欄位為持有selector物件selectedKeys的參考,當IO事件就緒時,直接從這里獲取
private SelectedSelectionKeySet selectedKeys;
后續
Reactor執行緒就會直接從io.netty.channel.nio.NioEventLoop#selectedKeys中獲取IO就緒的SocketChannel
- 用
SelectorTuple封裝unwrappedSelector和wrappedSelector回傳給NioEventLoop建構式,到此Reactor中的Selector就創建完畢了,
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
-
所謂的
unwrappedSelector是指被Netty優化過的JDK NIO原生Selector, -
所謂的
wrappedSelector就是用SelectedSelectionKeySetSelector裝飾類將unwrappedSelector和與sun.nio.ch.SelectorImpl類關聯好的Netty優化實作SelectedSelectionKeySet封裝裝飾起來,
wrappedSelector會將所有對Selector的操作全部代理給unwrappedSelector,并在發起輪詢IO事件的相關操作中,重置SelectedSelectionKeySet清空上一次的輪詢結果,
final class SelectedSelectionKeySetSelector extends Selector {
//Netty優化后的 SelectedKey就緒集合
private final SelectedSelectionKeySet selectionKeys;
//優化后的JDK NIO 原生Selector
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
到這里Reactor的核心Selector就創建好了,下面我們來看下用于保存異步任務的佇列是如何創建出來的,
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//通過用SelectedSelectionKeySet裝飾后的unwrappedSelector
this.selector = selectorTuple.selector;
//Netty優化過的JDK NIO遠程Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
我們繼續回到創建Reactor的主線上,到目前為止Reactor的核心Selector就創建好了,前邊我們提到Reactor除了需要監聽IO就緒事件以及處理IO就緒事件外,還需要執行一些異步任務,當外部執行緒向Reactor提交異步任務后,Reactor就需要一個佇列來保存這些異步任務,等待Reactor執行緒執行,
下面我們來看下Reactor中任務佇列的創建程序:
//任務佇列大小,默認是無界佇列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
-
在
NioEventLoop的父類SingleThreadEventLoop中提供了一個靜態變數DEFAULT_MAX_PENDING_TASKS用來指定Reactor任務佇列的大小,可以通過系統變數-D io.netty.eventLoop.maxPendingTasks進行設定,默認為Integer.MAX_VALUE,表示任務佇列默認為無界佇列, -
根據
DEFAULT_MAX_PENDING_TASKS變數的設定,來決定創建無界任務佇列還是有界任務佇列,
//創建無界任務佇列
PlatformDependent.<Runnable>newMpscQueue()
//創建有界任務佇列
PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
return Mpsc.newMpscQueue(maxCapacity);
}
Reactor內的異步任務佇列的型別為MpscQueue,它是由JCTools提供的一個高性能無鎖佇列,從命名前綴Mpsc可以看出,它適用于多生產者單消費者的場景,它支持多個生產者執行緒安全的訪問佇列,同一時刻只允許一個消費者執行緒讀取佇列中的元素,
我們知道Netty中的
Reactor可以執行緒安全的處理注冊其上的多個SocketChannel上的IO資料,保證Reactor執行緒安全的核心原因正是因為這個MpscQueue,它可以支持多個業務執行緒在處理完業務邏輯后,執行緒安全的向MpscQueue添加異步寫任務,然后由單個Reactor執行緒來執行這些寫任務,既然是單執行緒執行,那肯定是執行緒安全的了,
Reactor對應的NioEventLoop型別繼承結構

NioEventLoop的繼承結構也是比較復雜,這里我們只關注在Reactor創建程序中涉及的到兩個父類SingleThreadEventLoop,SingleThreadEventExecutor,
剩下的繼承體系,我們在后邊隨著Netty原始碼的深入在慢慢介紹,
前邊我們提到,其實Reactor我們可以看作是一個單執行緒的執行緒池,只有一個執行緒用來執行IO就緒事件的監聽,IO事件的處理,異步任務的執行,用MpscQueue 來存盤待執行的異步任務,
命名前綴為SingleThread的父類都是對Reactor這些行為的分層定義,也是本小節要介紹的物件
SingleThreadEventLoop
Reactor負責執行的異步任務分為三類:
普通任務:這是Netty最主要執行的異步任務,存放在普通任務佇列taskQueue中,在NioEventLoop建構式中創建,定時任務:存放在優先級佇列中,后續我們介紹,尾部任務:存放于尾部任務佇列tailTasks中,尾部任務一般不常用,在普通任務執行完后 Reactor執行緒會執行尾部任務,使用場景:比如對Netty 的運行狀態做一些統計資料,例如任務回圈的耗時、占用物理記憶體的大小等等都可以向尾部佇列添加一個收尾任務完成統計資料的實時更新,
SingleThreadEventLoop 負責對尾部任務佇列tailTasks進行管理,并且提供Channel向Reactor注冊的行為,
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
//任務佇列大小,默認是無界佇列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
//尾部任務佇列
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
//尾部佇列 執行一些統計任務 不常用
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
@Override
public ChannelFuture register(Channel channel) {
//注冊channel到系結的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
}
SingleThreadEventExecutor
SingleThreadEventExecutor主要負責對普通任務佇列的管理,以及異步任務的執行,Reactor執行緒的啟停,
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
//parent為Reactor所屬的NioEventLoopGroup Reactor執行緒組
super(parent);
//向Reactor添加任務時,是否喚醒Selector停止輪詢IO就緒事件,馬上執行異步任務
this.addTaskWakesUp = addTaskWakesUp;
//Reactor異步任務佇列的大小
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//用于啟動Reactor執行緒的executor -> ThreadPerTaskExecutor
this.executor = ThreadExecutorMap.apply(executor, this);
//普通任務佇列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
//任務佇列滿時的拒絕策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
到現在為止,一個完整的Reactor架構就被創建出來了,

3. 創建Channel到Reactor的系結策略
到這一步,Reactor執行緒組NioEventLoopGroup里邊的所有Reactor就已經全部創建完畢,
無論是Netty服務端NioServerSocketChannel關注的OP_ACCEPT事件也好,還是Netty客戶端NioSocketChannel關注的OP_READ和OP_WRITE事件也好,都需要先注冊到Reactor上,Reactor才能監聽Channel上關注的IO事件實作IO多路復用,
NioEventLoopGroup(Reactor執行緒組)里邊有眾多的Reactor,那么以上提到的這些Channel究竟應該注冊到哪個Reactor上呢?這就需要一個系結的策略來平均分配,
還記得我們前邊介紹MultithreadEventExecutorGroup類的時候提到的構造器引數EventExecutorChooserFactory 嗎?
這時候它就派上用場了,它主要用來創建Channel到Reactor的系結策略,默認為DefaultEventExecutorChooserFactory.INSTANCE,
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//從Reactor集合中選擇一個特定的Reactor的系結策略 用于channel注冊系結到一個固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
}
下面我們來看下具體的系結策略:
DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
...................省略.................
}
我們看到在newChooser 方法系結策略有兩個分支,不同之處在于需要判斷Reactor執行緒組中的Reactor個數是否為2的次冪,
Netty中的系結策略就是采用round-robin輪詢的方式來挨個選擇Reactor進行系結,
采用round-robin的方式進行負載均衡,我們一般會用round % reactor.length取余的方式來挨個平均的定位到對應的Reactor上,
如果Reactor的個數reactor.length恰好是2的次冪,那么就可以用位操作&運算round & reactor.length -1來代替%運算round % reactor.length,因為位運算的性能更高,具體為什么&運算能夠代替%運算,筆者會在后面講述時間輪的時候為大家詳細證明,這里大家只需記住這個公式,我們還是聚焦本文的主線,
了解了優化原理,我們在看代碼實作就很容易理解了,
利用%運算的方式Math.abs(idx.getAndIncrement() % executors.length)來進行系結,
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
利用&運算的方式idx.getAndIncrement() & executors.length - 1來進行系結,
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
又一次被Netty對性能的極致追求所折服~~~~
4. 向Reactor執行緒組中所有的Reactor注冊terminated回呼函式
當Reactor執行緒組NioEventLoopGroup中所有的Reactor已經創建完畢,Channel到Reactor的系結策略也創建完畢后,我們就來到了創建NioEventGroup的最后一步,
俗話說的好,有創建就有啟動,有啟動就有關閉,這里會創建Reactor關閉的回呼函式terminationListener,在Reactor關閉時回呼,
terminationListener回呼的邏輯很簡單:
-
通過
AtomicInteger terminatedChildren變數記錄已經關閉的Reactor個數,用來判斷NioEventLoopGroup中的Reactor是否已經全部關閉, -
如果所有
Reactor均已關閉,設定NioEventLoopGroup中的terminationFuture為success,表示Reactor執行緒組關閉成功,
//記錄關閉的Reactor個數,當Reactor全部關閉后,才可以認為關閉成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//關閉future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
//當所有Reactor關閉后 才認為是關閉成功
terminationFuture.setSuccess(null);
}
}
};
//為所有Reactor添加terminationListener
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
我們在回到文章開頭Netty服務端代碼模板
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創建主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
...........省略............
}
}
現在Netty的主從Reactor執行緒組就已經創建完畢,此時Netty服務端的骨架已經搭建完畢,骨架如下:

總結
本文介紹了首先介紹了Netty對各種IO模型的支持以及如何輕松切換各種IO模型,
還花了大量的篇幅介紹Netty服務端的核心引擎主從Reactor執行緒組的創建程序,在這個程序中,我們還提到了Netty對各種細節進行的優化,展現了Netty對性能極致的追求,
好了,Netty服務端的骨架已經搭好,剩下的事情就該系結埠地址然后接收連接了,我們下篇文章再見~~~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/498424.html
標籤:Java
上一篇:java中重要關鍵字簡介說明
