主頁 >  其他 > netty客戶端斷線重連實作及問題思考

netty客戶端斷線重連實作及問題思考

2021-01-19 11:05:45 其他

前言

在實作TCP長連接功能中,客戶端斷線重連是一個很常見的問題,當我們使用netty實作斷線重連時,是否考慮過如下幾個問題:

  • 如何監聽到客戶端和服務端連接斷開 ?
  • 如何實作斷線后重新連接 ?
  • netty客戶端執行緒給多大比較合理 ?

其實上面都是筆者在做斷線重連時所遇到的問題,而 “netty客戶端執行緒給多大比較合理?” 這個問題更是筆者在做斷線重連時因一個例外引發的思考,下面講講整個程序:

因為本節講解內容主要涉及在客戶端,但是為了讀者能夠運行整個程式,所以這里先給出服務端及公共的依賴和物體類,

服務端及common代碼:

maven依賴:

<dependencies>
    <!--只是用到了spring-boot的日志框架-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.4.1</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.56.Final</version>
    </dependency>

    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>2.0.10.Final</version>
    </dependency>
</dependencies>

服務端業務處理代碼

com.bruce.netty.rpc.server.SimpleServerHandler
主要用于記錄列印當前客戶端連接數,當接收到客戶端資訊后回傳“hello netty”字串

@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
    public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        log.info("客戶端連接成功: client address :{}", ctx.channel().remoteAddress());
        log.info("當前共有{}個客戶端連接", channels.size());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        ctx.channel().writeAndFlush("hello netty");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive: client close");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof java.io.IOException) {
            log.warn("exceptionCaught: client close");
        } else {
            cause.printStackTrace();
        }
    }
}

服務端心跳檢查代碼

當接收心跳"ping"資訊后,回傳客戶端’'pong"資訊,如果客戶端在指定時間內沒有發送任何資訊則關閉客戶端,
com.bruce.netty.rpc.server.ServerHeartbeatHandler

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        if (msg.equals("ping")) {
            ctx.channel().writeAndFlush("pong");
        } else {
            //由下一個handler處理,示例中則為SimpleServerHandler
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //該事件需要配合 io.netty.handler.timeout.IdleStateHandler使用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超過指定時間沒有讀事件,關閉連接
                log.info("超過心跳時間,關閉和服務端的連接:{}", ctx.channel().remoteAddress());
                //ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

編解碼工具類

主要使用jboss-marshalling-serial編解碼工具,可自行查詢其優缺點,這里只是示例使用,
com.bruce.netty.rpc.handler.codec.MarshallingCodeFactory

public final class MarshallingCodeFactory {
    /** 創建Jboss marshalling 解碼器 */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //引數serial表示創建的是Java序列化工廠物件,由jboss-marshalling-serial提供
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
        return new MarshallingDecoder(provider, 1024);
    }

    /** 創建Jboss marshalling 編碼器 */
    public static MarshallingEncoder buildMarshallingEncoder() {
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
        return new MarshallingEncoder(provider);
    }
}

公共物體類

com.bruce.netty.rpc.entity.UserInfo

public class UserInfo implements Serializable {
    private static final long serialVersionUID = 6271330872494117382L;
 
    private String username;
    private int age;

    public UserInfo() {
    }

    public UserInfo(String username, int age) {
        this.username = username;
        this.age = age;
    }
   //省略getter/setter/toString
}

下面開始本文的重點,客戶端斷線重連以及問題思考,

客戶端實作

  1. 剛開始啟動時需要進行同步連接,指定連接次數內沒用通過則拋出例外,行程退出,
  2. 客戶端啟動后,開啟定時任務,模擬客戶端資料發送,

com.bruce.netty.rpc.client.SimpleClientHandler:
客戶端業務處理handler,接收到資料后,通過日志列印,

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }
}

com.bruce.netty.rpc.client.NettyClient:
封裝連接方法、斷開連接方法、getChannel()回傳io.netty.channel.Channel用于向服務端發送資料,boolean connect()是一個同步連接方法,如果連接成功回傳true,連接失敗回傳false,

public class NettyClient {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);

    private EventLoopGroup workerGroup;
    private Bootstrap bootstrap;
    private volatile Channel clientChannel;

    public NettyClient() {
        this(-1);
    }

    public NettyClient(int threads) {
        workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .handler(new ClientHandlerInitializer(this));
    }

    public boolean connect() {
        log.info("嘗試連接到服務端: 127.0.0.1:8088");
        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);

            boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
            clientChannel = channelFuture.channel();
            if (notTimeout) {
                if (clientChannel != null && clientChannel.isActive()) {
                    log.info("netty client started !!! {} connect to server", clientChannel.localAddress());
                    return true;
                }
                Throwable cause = channelFuture.cause();
                if (cause != null) {
                    exceptionHandler(cause);
                }
            } else {
                log.warn("connect remote host[{}] timeout {}s", clientChannel.remoteAddress(), 30);
            }
        } catch (Exception e) {
            exceptionHandler(e);
        }
        clientChannel.close();
        return false;
    }

    private void exceptionHandler(Throwable cause) {
        if (cause instanceof ConnectException) {
            log.error("連接例外:{}", cause.getMessage());
        } else if (cause instanceof ClosedChannelException) {
            log.error("connect error:{}", "client has destroy");
        } else {
            log.error("connect error:", cause);
        }
    }

    public void close() {
        if (clientChannel != null) {
            clientChannel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    public Channel getChannel() {
        return clientChannel;
    }

    static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
        private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
        private NettyClient client;

        public ClientHandlerInitializer(NettyClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
            //pipeline.addLast(new IdleStateHandler(25, 0, 10));
            //pipeline.addLast(new ClientHeartbeatHandler());
            pipeline.addLast(new SimpleClientHandler(client));
        }
    }
}

com.bruce.netty.rpc.client.NettyClientMain:客戶端啟動類

public class NettyClientMain {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);
    private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        boolean connect = false;
        //剛啟動時嘗試連接10次,都無法建立連接則不在嘗試
        //如果想在剛啟動后,一直嘗試連接,需要放在執行緒中,異步執行,防止阻塞程式
        for (int i = 0; i < 10; i++) {
            connect = nettyClient.connect();
            if (connect) {
                break;
            }
            //連接不成功,隔5s之后重新嘗試連接
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        if (connect) {
            log.info("定時發送資料");
            send(nettyClient);
        } else {
            nettyClient.close();
            log.info("行程退出");
        }
    }

    /** 定時發送資料 */
    static void send(NettyClient client) {
        scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);
    }
}

客戶端斷線重連

斷線重連需求:

  1. 服務端和客戶端之間網路例外,或回應超時(例如有個很長時間的fullGC),客戶端需要主動重連其他節點,
  2. 服務端宕機時或者和客戶端之間發生任何例外時,客戶端需要主動重連其他節點,
  3. 服務端主動向客戶端發送(服務端)下線通知時,客戶端需要主動重連其他節點,

如何監聽到客戶端和服務端連接斷開 ?

netty的io.netty.channel.ChannelInboundHandler介面中給我們提供了許多重要的介面方法,為了避免實作全部的介面方法,可以通過繼承io.netty.channel.ChannelInboundHandlerAdapter來重寫相應的方法即可,

  1. void channelInactive(ChannelHandlerContext ctx);則在客戶端關閉時被呼叫,表示客戶端斷開連接,當如下幾種情況發生時會觸發:

    • 客戶端在正常active狀態下,主動呼叫channel或者ctx的close方法,
    • 服務端主動呼叫channel或者ctx的close方法關閉客戶端的連接 ,
    • 發生java.io.IOException(一般情況下是雙方連接斷開)或者java.lang.OutOfMemoryError(4.1.52版本中新增)時
  2. void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;則是在入堆疊發生任何例外時被呼叫,如果例外是java.io.IOException或者java.lang.OutOfMemoryError(4.1.52版本新增)時,還會觸發channelInactive方法,也就是上面channelInactive被觸發的第3條情況,

  3. 心跳檢查也是檢查客戶端與服務端之間連接狀態的必要方式,因為在一些狀態下,兩端實際上已經斷開連接,但客戶端無法感知,這時候就需要通過心跳來判斷兩端的連接狀態,心跳可以是客戶端心跳服務端心跳
    客戶端心跳:即為客戶端發送心跳ping資訊,服務端回復pong資訊,這樣在指定時間內,雙方有資料互動則認為是正常連接狀態,
    服務端心跳:則是服務端向客戶端發送ping資訊,客戶端回復pong資訊,在指定時間內沒有收到回復,則認為對方下線,
    netty給我們提供了非常簡單的心跳檢查方式,只需要在channel的handler鏈上,添加io.netty.handler.timeout.IdleStateHandler即可實作,

    IdleStateHandler有如下幾個重要的引數:

    • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到資料時, 會觸發一個READER_IDLE的IdleStateEvent 事件.
    • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有資料寫入到 Channel 時, 會觸發一個WRITER_IDLE的IdleStateEvent 事件.
    • allIdleTimeSeconds, 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個ALL_IDLE的IdleStateEvent 事件.

    為了能夠監聽到這些事件的觸發,還需要重寫ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方法,通過引數evt判斷事件型別,在指定的時間類如果沒有讀寫則發送一條心跳的ping請求,在指定時間內沒有收到操作則任務已經和服務端斷開連接,則呼叫channel或者ctx的close方法,使客戶端Handler執行channelInactive方法,

到這里看來我們只要在channelInactiveexceptionCaught兩個方法中實作自己的重連邏輯即可,但是筆者遇到了第一個坑,重連方法執行了兩次
先看示例代碼和結果,在com.bruce.netty.rpc.client.SimpleClientHandler中添加如下代碼:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    //省略部分代碼......
    /** 客戶端正常下線時執行該方法 */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        reconnection(ctx);
    }

    /** 入堆疊發生例外時執行exceptionCaught */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客戶端[{}]和遠程斷開連接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        reconnection(ctx);
    }

    private void reconnection(ChannelHandlerContext ctx) {
        log.info("5s之后重新建立連接");
        //暫時為空實作
    }
}

ClientHandlerInitializer 中添加io.netty.handler.timeout.IdleStateHandler用于心跳檢查,ClientHeartbeatHandler用于監聽心跳事件,接收心跳pong回復,

static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
    private NettyClient client;

    public ClientHandlerInitializer(NettyClient client) {
        this.client = client;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
        //25s內沒有read操作則觸發READER_IDLE事件
        //10s內既沒有read又沒有write操作則觸發ALL_IDLE事件
        pipeline.addLast(new IdleStateHandler(25, 0, 10));
        pipeline.addLast(new ClientHeartbeatHandler());
        pipeline.addLast(new SimpleClientHandler(client));
    }
}

com.bruce.netty.rpc.client.ClientHeartbeatHandler

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.equals("pong")) {
            log.info("收到心跳回復");
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //該事件需要配合 io.netty.handler.timeout.IdleStateHandler使用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                //向服務端發送心跳檢測
                ctx.writeAndFlush("ping");
                log.info("發送心跳資料");
            } else if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超過指定時間沒有讀事件,關閉連接
                log.info("超過心跳時間,關閉和服務端的連接:{}", ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

先啟動server端,再啟動client端,待連接成功之后kill掉 server端行程,
在這里插入圖片描述
通過客戶端日志可以看出,先是執行了exceptionCaught方法然后執行了channelInactive方法,但是這兩個方法中都呼叫了reconnection方法,導致同時執行了兩次重連

為什么執行了exceptionCaught方法又執行了channelInactive方法呢?

我們可以在exceptionCaught和channelInactive方法添加斷點一步步查看原始碼
在這里插入圖片描述
當NioEventLoop執行select操作之后,處理相應的SelectionKey,發生例外后,會呼叫AbstractNioByteChannel.NioByteUnsafe#handleReadException方法進行處理,并觸發pipeline.fireExceptionCaught(cause),最終呼叫到用戶handler的fireExceptionCaught方法,

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
		RecvByteBufAllocator.Handle allocHandle) {
	if (byteBuf != null) {
		if (byteBuf.isReadable()) {
			readPending = false;
			pipeline.fireChannelRead(byteBuf);
		} else {
			byteBuf.release();
		}
	}
	allocHandle.readComplete();
	pipeline.fireChannelReadComplete();
	pipeline.fireExceptionCaught(cause);

	// If oom will close the read event, release connection.
	// See https://github.com/netty/netty/issues/10434
	if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
		closeOnRead(pipeline);
	}
}

該方法最后會判斷例外型別,執行close連接的方法,在連接斷線的場景中,這里即為java.io.IOException,所以執行了close方法,當debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中會發現最后又呼叫了AbstractUnsafe#fireChannelInactiveAndDeregister方法,繼續debug最后則會執行自定義的fireChannelInactive方法,

到這里可以總結一個知識點:netty中當執行到handler地fireExceptionCaught方法時,可能會繼續觸發到fireChannelInactive,也可能不會觸發fireChannelInactive

除了netty根據例外型別判斷是否執行close方法外,其實開發人員也可以自己通過ctx或者channel去呼叫close方法,代碼如下:

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof IOException) {
        log.warn("exceptionCaught:客戶端[{}]和遠程斷開連接", ctx.channel().localAddress());
    } else {
        log.error(cause);
    }
    //ctx.close();
    ctx.channel().close();
}

但這種顯示呼叫close方法,是否一定會觸發呼叫fireChannelInactive呢?
如果是,那么只需要在exceptionCaught中呼叫close方法,fireChannelInactive中做重連的邏輯即可!!

在筆者通過日志觀察到,在exceptionCaught中呼叫close方法每次都會呼叫fireChannelInactive方法,但是查看原始碼,筆者認為這是不一定的,因為在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中會呼叫io.netty.channel.Channel#isActive進行判斷,只有為true,才會執行fireChannelInactive方法,

//io.netty.channel.socket.nio.NioSocketChannel#isActive
@Override
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

如何解決同時執行兩次問題呢?
在netty初始化時,我們都會添加一系列的handler處理器,這些handler實際上會在netty創建Channel物件(NioSocketChannel)時,被封裝在DefaultChannelPipeline中,而DefaultChannelPipeline實際上是一個雙向鏈表,頭節點為TailContext,尾節點為TailContext,而中間的節點則是我們添加的一個個handler(被封裝成DefaultChannelHandlerContext),當執行Pipeline上的方法時,會從鏈表上遍歷handler執行,因此當執行exceptionCaught方法時,我們只需要提前移除自定義的Handler則無法執行fireChannelInactive方法,
在這里插入圖片描述
最后實作代碼如下:com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
		ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnection(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客戶端[{}]和遠程斷開連接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        //ctx.close();
        ctx.channel().close();
        reconnection(ctx);
    }
}

執行效果如下,可以看到當發生例外時,只是執行了exceptionCaught方法,并且通過channel關閉了上一次連接資源,也沒有執行當前handler的fireChannelInactive方法,
在這里插入圖片描述

如何實作斷線后重新連接 ?

通過上面分析,我們已經知道在什么方法中實作自己的重連邏輯,但是具體該怎么實作呢,懷著好奇的心態搜索了一下各大碼友的實作方案,大多做法是通過ctx.channel().eventLoop().schedule添加一個定時任務呼叫客戶端的連接方法,筆者也參考該方式實作代碼如下:,

private void reconnection(ChannelHandlerContext ctx) {
	log.info("5s之后重新建立連接");
	ctx.channel().eventLoop().schedule(new Runnable() {
		@Override
		public void run() {
			boolean connect = client.connect();
			if (connect) {
				log.info("重新連接成功");
			} else {
				reconnection(ctx);
			}
		}
	}, 5, TimeUnit.SECONDS);
}

測驗:先啟動server端,再啟動client端,待連接成功之后kill掉 server端行程,客戶端如期定時執行重連,但也就去茶水間倒杯水的時間,回來后發現了如下例外,

......省略14條相同的重試日志
[2021-01-17 18:46:45.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立連接
[2021-01-17 18:46:48.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 嘗試連接到服務端: 127.0.0.1:8088
[2021-01-17 18:46:50.038] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 連接例外:Connection refused: no further information: /127.0.0.1:8088
[2021-01-17 18:46:50.038] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立連接
[2021-01-17 18:46:53.040] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 嘗試連接到服務端: 127.0.0.1:8088
[2021-01-17 18:46:53.048] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:
io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete)
	at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)
	at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)
	at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)
	at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)
	at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)
	at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

根據例外堆疊,可以發現是com.bruce.netty.rpc.client.NettyClient#connect方法中呼叫了等待方法

boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);

而該方法內部會進行檢測,是否在io執行緒上執行了同步等待,這會導致拋出例外BlockingOperationException
io.netty.channel.DefaultChannelPromise#checkDeadLock

@Override
protected void checkDeadLock() {
    if (channel().isRegistered()) {
        super.checkDeadLock();
    }
}

io.netty.util.concurrent.DefaultPromise#checkDeadLock

protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}

奇怪的是為什么不是每次嘗試重連都拋出該例外,而是每隔16次拋出一次呢?
這讓我連想到自己的筆記本是8核處理器,而netty默認執行緒池是2 * c,就是16條執行緒,這之間似乎有些關聯,
實際上在呼叫ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);,netty首先會創建一個io.netty.channel.Channel(示例中是NioSocketChannel),然后通過io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser依次選擇一個NioEventLoop,將Channel系結到NioEventLoop上,
在這里插入圖片描述
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

//Return true if the given Thread is executed in the event loop, false otherwise.
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

重連的方法是在一個NioEventLoop(也就是io執行緒)上被呼叫,第1次重連實際上是選擇了第2個NioEventLoop,第2次重連實際上是選擇了第3個NioEventLoop,以此類推,當一輪選擇過后,重新選到第一個NioEventLoop時,boolean inEventLoop()回傳true,則拋出了BlockingOperationException

方案1
不要在netty的io執行緒上執行同步連接,使用單獨的執行緒池定時執行重試,重試成功之后銷毀執行緒池,

com.bruce.netty.rpc.client.SimpleClientHandler 修改reconnection方法

private static ScheduledExecutorService SCHEDULED_EXECUTOR;

private void initScheduledExecutor() {
	if (SCHEDULED_EXECUTOR == null) {
		synchronized (SimpleClientHandler.class) {
			if (SCHEDULED_EXECUTOR == null) {
				SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {
					Thread t = new Thread(r, "Client-Reconnect-1");
					t.setDaemon(true);
					return t;
				});
			}
		}
	}
}

private void reconnection(ChannelHandlerContext ctx) {
	log.info("5s之后重新建立連接");
	initScheduledExecutor();

	SCHEDULED_EXECUTOR.schedule(() -> {
		boolean connect = client.connect();
		if (connect) {
			//連接成功,關閉執行緒池
			SCHEDULED_EXECUTOR.shutdown();
			log.info("重新連接成功");
		} else {
			reconnection(ctx);
		}
	}, 3, TimeUnit.SECONDS);
}

方案2
可以在io執行緒上使用異步重連:

com.bruce.netty.rpc.client.NettyClient添加方法connectAsync方法,兩者的區別在于connectAsync方法中沒有呼叫channelFuture的同步等待方法,而是改成監聽器(ChannelFutureListener)的方式,實際上這個監聽器是運行在io執行緒上

 public void connectAsync() {
    log.info("嘗試連接到服務端: 127.0.0.1:8088");
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);
    channelFuture.addListener((ChannelFutureListener) future -> {
        Throwable cause = future.cause();
        if (cause != null) {
            exceptionHandler(cause);
            log.info("等待下一次重連");
            channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);
        } else {
            clientChannel = channelFuture.channel();
            if (clientChannel != null && clientChannel.isActive()) {
                log.info("Netty client started !!! {} connect to server", clientChannel.localAddress());
            }
        }
    });
}

com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnectionAsync(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客戶端[{}]和遠程斷開連接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        ctx.close();
        reconnectionAsync(ctx);
    }

    private void reconnectionAsync(ChannelHandlerContext ctx) {
        log.info("5s之后重新建立連接");
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                client.connectAsync();
            }
        }, 5, TimeUnit.SECONDS);
    }
}

netty客戶端執行緒給多大比較合理 ?

netty中一個NioEventLoopGroup默認創建的執行緒數是cpu核心數 * 2 ,這些執行緒都是用于io操作,那么對于客戶端應用程式來說真的需要這么多io執行緒么?

通過上面分析BlockingOperationException例外時我們分析到,實際上netty在創建一個Channel物件后只會從NioEventLoopGroup中選擇一個NioEventLoop來系結,只有創建多個Channel才會依次選擇下一個NioEventLoop,也就是說一個Channel只會對應一個NioEventLoop,而NioEventLoop可以系結多個Channel

  1. 對于客戶端來說,如果只是連接的一個server節點,那么只要設定1條執行緒即可,即使出現了斷線重連,在連接斷開之后,之前的Channel會從NioEventLoop移除,重連之后,仍然只會在僅有的一個NioEventLoop注冊一個新的Channel
  2. 如果客戶端同時如下方式多次呼叫io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)連接多個Server節點,那么執行緒可以設定大一點,但不要超過2*c,而且只要出現斷線重連,同樣不能保證每個NioEventLoop都會系結一個客戶端Channel
     public boolean connect() {
          try {
              ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1", 8088);
              ChannelFuture channelFuture2 = bootstrap.connect("127.0.0.1", 8088);
              ChannelFuture channelFuture3 = bootstrap.connect("127.0.0.1", 8088);
          } catch (Exception e) {
              exceptionHandler(e);
          }
          clientChannel.close();
          return false;
      }
      ```
    
  3. 如果netty客戶端執行緒數設定大于1有什么影響么?
    明顯的例外肯定是不會有的,但是照成資源浪費,首先會創建多個NioEventLoop物件,但是這些對于的NioEventLoop是處于非運行狀態,一旦出現斷線重連,那么重新連接時,下一個NioEventLoop則會被選中,并啟動執行緒一直處于runnable狀態,而上一個NioEventLoop也是一直處于runnable狀態,由于上一個Channel已經被close,所以會造成每次select結果都是空的,沒有意義的空輪詢,
    如下則是netty客戶端使用默認執行緒數,4次斷線重連后一共創建的5條NioEventLoop執行緒,但是實際上只有第5條執行緒在執行讀寫操作,
    在這里插入圖片描述
    在這里插入圖片描述
  4. 如果客戶端存在耗時的業務邏輯,應該單獨使用業務執行緒池,避免在netty的io執行緒中執行耗時邏輯處理,

總結

本篇主要講解了,netty斷線重連的兩種實作方案,已經實作程序中遇到的例外問題,通過分析問題,讓大家了解netty的實作細節,

下一節:將分析,netty服務端boss執行緒設定多少比較合理?(個人比較喜歡稱為accept執行緒,即接收客戶端連接的執行緒)

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/250633.html

標籤:其他

上一篇:基于HIP6601的MOS的半橋電路測驗

下一篇:52單片機IO口輸出點亮你的LED

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more