參考資料
Reactor單執行緒模型詳解與實作
異步回呼原理詳解與實作
Reactor(主從)原理詳解與實作
NioEventLoopGroup原始碼分析
netty服務端啟動流程原始碼分析
netty服務端啟動流程總結
netty處理客戶端新連接原始碼分析
先看處理器鏈初始化整體流程圖
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-i0Xn9sCi-1600503012123)(/Users/wuxinxin/Library/Application Support/typora-user-images/image-20200919160229930.png)]](https://img.uj5u.com/2020/09/21/90641210453581.png)
配置處理器
//使用配置構建器,配置需要系結的處理器
serverBootstrap.group(bossEventLoopGroup, workEventExecutors)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//1.childHandler配置項是給worker的channel系結的Handler(子通道所屬)
//2.Handler配置項是給boss的channel系結的Handler(父通道所屬)
初始化處理器鏈
//ServerBootstrapAcceptor的channelRead方法
//這個是父通道處理器的處理邏輯,我們這直接分析子通道的處理器鏈,這也是我們比較關心的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//ServerBootstrapAcceptor是我們ServerBootstra的靜態內部類,直接參考childHandler
//新連接的通道直接把我們構建時候的ChannelInitializer物件添加到處理器鏈中
//同時看到,ChannelHandler是添加到pipeline中的
child.pipeline().addLast(childHandler);
//省去不需要代碼
}
//DefaultChannelPipeline的addLast方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//將處理器包裝為一個AbstractChannelHandlerContext,這種處理器包裝了一些背景關系資訊在里面
//處理器鏈的所有處理器都是AbstractChannelHandlerContext型別
newCtx = newContext(group, filterName(name, handler), handler);
//將新的handler添加到處理鏈中
addLast0(newCtx);
//省去不需要代碼
return this;
}
//將新的處理器添加到一個雙向鏈表中
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//1.這里只是對處理器鏈做了一個簡單的初始化,只是把ChannelInitializer加入到處理器鏈中
//2.ChannelInitializer可以理解為我們配置的handler的一個包裝
//3.為什么不直接將我們所有自定義的處理器添加到處理器鏈中,而是將一個處理器包裝暫時放入
// 處理器鏈中?

將自定義處理器回圈放入處理器鏈中
//ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//省去不需要代碼
try {
//這里將完成一個新子通道所有初始化的動作,其實這里直接是做了異步去直接做這些事情的
//處理器鏈的真正添加是在這個異步任務里面的
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//AbstractUnsafe的register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//省去不需要代碼
//使用異步任務去做所有初始化通道的事情,像回圈添加所有處理器鏈這種動作,也很耗時
//容易影響處理新連接的效率,不適合放在主執行緒中
eventLoop.execute(new Runnable() {
@Override
public void run() {
//回圈添加所有自定義處理器,放入處理器鏈中
register0(promise);
}
});
//省去不需要代碼
}
//AbstractUnsafe的register0
private void register0(ChannelPromise promise) {
//省去不需要代碼
//回圈添加所有自定義處理器,放入處理器鏈中
pipeline.invokeHandlerAddedIfNeeded();
}
//ChannelInitializer的initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//回呼我們的內部類實作initChannel(SocketChannel socketChannel)方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
//添加完畢后移除之前處理器包裝
remove(ctx);
}
return true;
}
return false;
}
//我們自定義啟動類里面
//將包裝處理里面的自定義處理器放入處理器鏈中
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//將每個處理器放入處理器鏈中
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//回圈處理
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//回圈處理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
//洗掉處理鏈中的包裝處理器
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
總結:現在知道為什么在這里正真添加我們的處理器了吧,注冊一個channel,整體都是在異步任務的子執行緒中的
這樣不會阻塞或者影響連接處理

總結
- boss的執行緒依然是只是為了快速處理連接,具體邏輯到worker的執行緒中去做,這是ChannelInitializer存在的原因, boss和worker做了很好的邊界處理
- 處理器擴展方式使用了雙向鏈表,這樣有利于程式的擴展,使用了經典的責任鏈模式
- 不影響主流程的業務盡量使用異步任務去做,盡量不要占用主執行緒資源
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/94158.html
標籤:其他
