作者:rickiyang
出處:www.cnblogs.com/rickiyang/p/11074231.html
我們知道在TCP長連接或者WebSocket長連接中一般我們都會使用心跳機制–即發送特殊的資料包來通告對方自己的業務還沒有辦完,不要關閉鏈接,
那么心跳機制可以用來做什么呢?
我們知道網路的傳輸是不可靠的,當我們發起一個鏈接請求的程序之中會發生什么事情誰都無法預料,或者斷電,服務器重啟,斷網線之類,
如果有這種情況的發生對方也無法判斷你是否還在線,所以這時候我們引入心跳機制,在長鏈接中雙方沒有資料互動的時候互相發送資料(可能是空包,也可能是特殊資料),對方收到該資料之后也回復相應的資料用以確保雙方都在線,這樣就可以確保當前鏈接是有效的,
1. 如何實作心跳機制
一般實作心跳機制由兩種方式:
- TCP協議自帶的心跳機制來實作;
- 在應用層來實作,
但是TCP協議自帶的心跳機制系統默認是設定的是2小時的心跳頻率,它檢查不到機器斷電、網線拔出、防火墻這些斷線,而且邏輯層處理斷線可能也不是那么好處理,另外該心跳機制是與TCP協議系結的,那如果我們要是使用UDP協議豈不是用不了?所以一般我們都不用,
而一般我們自己實作呢大致的策略是這樣的:
- Client啟動一個定時器,不斷發送心跳;
- Server收到心跳后,做出回應;
- Server啟動一個定時器,判斷Client是否存在,這里做判斷有兩種方法:時間差和簡單標識,
時間差:
- 收到一個心跳包之后記錄當前時間;
- 判斷定時器到達時間,計算多久沒收到心跳時間=當前時間-上次收到心跳時間,如果改時間大于設定值則認為超時,
簡單標識:
- 收到心跳后設定連接標識為true;
- 判斷定時器到達時間,如果未收到心跳則設定連接標識為false;
今天我們來看一下Netty的心跳機制的實作,在Netty中提供了IdleStateHandler類來進行心跳的處理,它可以對一個 Channel 的 讀/寫設定定時器, 當 Channel 在一定事件間隔內沒有資料互動時(即處于 idle 狀態), 就會觸發指定的事件,
該類可以對三種型別的超時做心跳機制檢測:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
- readerIdleTimeSeconds:設定讀超時時間;
- writerIdleTimeSeconds:設定寫超時時間;
- allIdleTimeSeconds:同時為讀或寫設定超時時間;
下面我們還是通過一個例子來講解IdleStateHandler的使用,
服務端:
public class HeartBeatServer {
private int port;
public HeartBeatServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}
服務端Initializer:
public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatServerHandler());
}
}
在這里IdleStateHandler也是handler的一種,所以加入addLast,我們分別設定4個引數:讀超時時間為3s,寫超時和讀寫超時為0,然后加入時間控制單元,
服務端handler:
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服務端對應著讀事件,當為READER_IDLE時觸發
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收訊息超時");
if(loss_connect_time > 2){
System.out.println("關閉不活動的鏈接");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
我們看到在handler中呼叫了userEventTriggered方法,IdleStateEvent的state()方法一個有三個值:
READER_IDLE,WRITER_IDLE,ALL_IDLE,正好對應讀事件寫事件和讀寫事件,
再來寫一下客戶端:
public class HeartBeatsClient {
private int port;
private String address;
public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
客戶端Initializer:
public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
這里我們設定了IdleStateHandler的寫超時為3秒,客戶端執行的動作為寫訊息到服務端,服務端執行讀動作,
客戶端handler:
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
private static final int TRY_TIMES = 3;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活時間是:"+new Date());
System.out.println("鏈接已經激活");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
System.out.println("關閉鏈接");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("當前輪詢時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
啟動服務端和客戶端我們看到輸出為:

我們再來屢一下思路:
- 首先客戶端激活channel,因為客戶端中并沒有發送訊息所以會觸發客戶端的IdleStateHandler,它設定的寫超時時間為3s;
- 然后觸發客戶端的事件機制進入userEventTriggered方法,在觸發器中計數并向客戶端發送訊息;
- 服務端接收訊息;
- 客戶端觸發器繼續輪詢發送訊息,直到計數器滿不再向服務端發送訊息;
- 服務端在IdleStateHandler設定的讀訊息超時時間5s內未收到訊息,觸發了服務端中handler的userEventTriggered方法,于是關閉客戶端的鏈接,
大體我們的簡單心跳機制就是這樣的思路,通過事件觸發機制以及計數器的方式來實作,上面我們的案例中最后客戶端沒有發送訊息的時候我們是強制斷開了客戶端的鏈接,那么既然可以關閉,我們是不是也可是重新鏈接客戶端呢?因為萬一客戶端本身并不想關閉而是由于別的原因導致他無法與服務端通信,下面我們來說一下重連機制,
當我們的服務端在未讀到客戶端訊息超時而關閉客戶端的時候我們一般在客戶端的finally塊中方的是關閉客戶端的代碼,這時我們可以做一下修改的,finally是一定會被執行新的,所以我們可以在finally塊中重新呼叫一下啟動客戶端的代碼,這樣就又重新啟動了客戶端了,上客戶端代碼:
/**
* 本Client為測驗netty重連機制
* Server端代碼都一樣,所以不做修改
* 只用在client端中做一下判斷即可
*/
public class HeartBeatsClient2 {
private int port;
private String address;
ChannelFuture future;
public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("準備重連");
start();
System.out.println("重連成功");
}
}
public static void main(String[] args) {
HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
client.start();
}
}
其余部分的代碼與上面的實體并無異同,只需改造客戶端即可,我們再運行服務端和客戶端會看到客戶端雖然被關閉了,但是立馬又被重啟:

當然生產級別的代碼應該不是這樣實作的吧,哈哈,
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/286872.html
標籤:其他
