主頁 > 軟體設計 > WebSocket服務端訊息推送

WebSocket服務端訊息推送

2021-09-20 10:31:46 軟體設計

前言:移動互聯網蓬勃發展的今天,大部分手機 APP和網站 都提供了訊息推送功能,如新聞客戶端的熱點新聞推薦,IM 工具的聊天訊息提醒,電商產品促銷資訊,企業應用的通知和審批流程等等,推送對于提高產品活躍度、提高功能模塊使用率、提升用戶粘性、提升用戶留存率起到了重要作用,作為 APP 和網站運營中一個關鍵的渠道,對訊息推送的合理運用能有效促進目標的實作,

一、淺析web端的訊息推送原理

股票曲線實時變化,在線IM聊天等等,Web系統里總是能見到訊息推送的應用,訊息推送用好了能增強用戶體驗,實作訊息推送有N種解決方案,

1.1、什么是訊息推送

訊息推送(Push)指運營人員通過自己的產品或第三方工具對用戶當前網頁或移動設備進行的主動訊息推送,用戶可以在網頁上或移動設備鎖定螢屏和通知欄看到push訊息通知,以此來實作用戶的多層次需求,使得用戶能夠自己設定所需要的資訊頻道,得到即時訊息,簡單說就是一種定制資訊的實作方式,我們平時瀏覽郵箱時突然彈出訊息提示收到新郵件就屬于web端訊息推送,在手機鎖屏上看到的微信訊息等等都屬于APP訊息推送,

Web網站推送:

當我們在瀏覽網站觀望猶豫時,突然看到了系統發來一條訊息,一位神秘的神豪老板竟然爆出了麻痹戒指!!!我的天,于是我果斷開始了游戲!這訊息很及時!

APP移動推送:

上述兩種經典場景,是生活中比較常見的場景,也引出了兩大推送種類,Web端訊息推送和移動端訊息推送,本篇博客主要介紹Web推送,順便提一句移動端App常見第三方推送SDK有極光推送、小米推送等等,

1.2、Web端實作訊息推送的四種方式

主要介紹web端其中的四種實作方式:短輪詢、Comet長輪詢、Server-sent、WebSocket,

(1)短輪詢

指在特定的的時間間隔(如每10秒),由瀏覽器對服務器發出HTTP request,然后由服務器回傳最新的資料給客戶端的瀏覽器,瀏覽器做處理后進行顯示,無論后端此時是否有新的訊息產生,都會進行回應,字面上看,這種方式是最簡單的,這種方式的優點是,后端撰寫非常簡單,邏輯不復雜,但是缺點是請求中大部分中是無用的,浪費了帶寬和服務器資源,總結來說,簡單粗暴,適用于小型(偷懶)應用,

(2)Comet長輪詢

長輪詢是客戶端向服務器發送Ajax請求,服務器接到請求后hold住連接,直到有新訊息才回傳回應資訊并關閉連接,客戶端處理完回應資訊后再向服務器發送新的請求;長連接是在頁面中的iframe發送請求到服務端,服務端hold住請求并不斷將需要回傳前端的資料封裝成呼叫javascript函式的形式回應到前端,前端不斷收到回應并處理,Comet的實作原理和短輪詢相比,很明顯少了很多無用請求,減少了帶寬壓力,實作起來比短輪詢復雜一丟丟,想比用短輪詢的同學有夢想時,就可以用Comet來實作自己的推送,

長輪詢的優點很明顯,在無訊息的情況下不會頻繁的請求,耗費資小并且實作了服務端主動向前端推送的功能,但是服務器hold連接會消耗資源,回傳資料順序無保證,難于管理維護,WebQQ(好像掛了)就是這樣實作的,

(3)Server-sent

服務器推指的是HTML5規范中提供的服務端事件EventSource,瀏覽器在實作了該規范的前提下創建一個EventSource連接后,便可收到服務端的發送的訊息,實作一個單向通信,客戶端進行監聽,并對回應的資訊處理顯示,該種方式已經實作了服務端主動推送至前端的功能,優點是在單項傳輸資料的場景中完全滿足需求,開發人員擴展起來基本不需要改后端代碼,直接用現有框架和技術就可以集成,

(4)WebSocket

WebSocket是HTML5下一種新的協議,是基于TCP的應用層協議,只需要一次連接,便可以實作全雙工通信,客戶端和服務端可以相互主動發送訊息,客戶端進行監聽,并對回應的訊息處理顯示,

這個技術相信基本都聽說過,就算沒寫過代碼,也大概知道干嘛的,通過名字就能知道,這是一個Socket連接,一個能在瀏覽器上用的Socket連接,WebSocket是HTML5標準中的一個內容,瀏覽器通過javascript腳本手動創建一個TCP連接與服務端進行通訊,優點是雙向通信,都可以主動發送訊息,既可以滿足“問”+“答”的回應機制,也可以實作主動推送的功能,缺點就是編碼相對來說會多點,服務端處理更復雜(我覺得當一條有情懷的咸魚就應該用這個!),

1.3、實作個性化的推送

上面說了很多實作方案,針對自己系統的應用場景選擇合適的推送方案才是合理的,因此最后簡單說一下實作個性化推送的兩種方式,第一種很簡單,直接使用第三方實作的推送,無需復雜的開發運維,直接可以使用,第二種就是自己封裝,可以選擇如今較為火熱的WebSocket來實作系統的訊息推送,

①直接用第三方的訊息推送服務(并發量多了會收費)

在這里推薦一個第三方推送平臺,GoEasy,

推薦理由是GoEasy的理念符合我們的選擇(可參考http://t.cn/Ex6jg3q):

(1)更簡單的方式將訊息從服務器端推送至客戶端
(2)更簡單的方式將訊息從各種客戶端推送至客戶端

GoEasy具體的使用方式這里不再贅述,詳見官網,對于后端后端開發者,可直接使用Rest方式呼叫推送,對于前端或web開發者,可以從web客戶端用javascript腳本進行呼叫推送,

②封裝自己的推送服務

如果是一個老系統進行擴展,那么更推薦使用Server-sent,服務端改動量不會很大,如果是新系統,更推薦websocket,實作的功能功能更全面,

我們如果需要使用websocket技術實作自己的推送服務,需要注意哪些點,或者說需要踩哪些坑呢,本文最后列出幾點供大家參考:

長連接的心跳激活處理;

服務端調優實作高并發量client同時在線(單機服務器可以實作百萬并發長連接);

群發訊息;

服務端維持多用戶的狀態;

從WebSocket中獲取HttpSession進行用戶相關操作;

等等等….


二、WebSocket簡介

2.1、websocket的由來

我們經常用的是HTTP協議,而HTTP協議是一種無狀態的協議,要實作有狀態的會話必須借助一些外部機制如session/cookie或者Token,這或多或少會帶來一些不便,尤其是服務端和客戶端需要實時交換資料的時候(監控,聊天),這個問題更加明顯,為了適應這種環境,websocket就產生了,目的是即時通訊,替代輪詢,

2.2、websocket概述

WebSocket 是HTML5一種新的協議,它實作了瀏覽器與服務器全雙工通信(full-duplex),一開始的握手需要借助HTTP請求完成,WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,WebSocket 使得客戶端和服務器之間的資料交換變得更加簡單,允許服務端主動向客戶端推送資料,

websocket的特點或作用

  • 允許服務端主動向客戶端推送資料

  • 在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,并進行雙向資料傳輸,

websocket使用的優點

  • 更強的實時性

  • 保持連接狀態,創建一次連接后,之后通信時可以省略部分狀態資訊,較少的控制開銷,在連接創建后,服務器和客戶端之間交換資料時,用于協議控制的資料包頭部相對較小,在不包含擴展的情況下,對于服務器到客戶端的內容,此頭部大小只有2至10位元組(和資料包長度有關);對于客戶端到服務器的內容,此頭部還需要加上額外的4位元組的掩碼,相對于HTTP請求每次都要攜帶完整的頭部,此項開銷顯著減少了

websocket使用的缺點

由于websocket使用的持久連接,與服務器一直保持連接,對服務器壓力很大

2.3、websocket請求頭和回應頭

瀏覽器發送websocket請求頭類似如下:


下面是對請求頭部解釋(比http協議多了Upgrade和Connection,是告訴服務器包協議設計ws):

  • Accept-Encoding:瀏覽器可以接受的資料的壓縮型別,

  • Accept-Language:瀏覽器可以接受的語言型別,

  • Cache-Control:no-cache不使用強快取,

  • Connection:Upgrade 通知服務器通信協議提升,

  • Host:主機名,

  • Origin:用于驗證瀏覽器域名是否在服務器許可范圍內,

  • Pragma:no-cache HTTP/1.0定義的不使用本地快取,

  • Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits

  • Sec-WebSocket-Key:lb69kw8CsB4CrSk9tKa3 g==
    握手協議密鑰,base64編碼的16位元組的隨機字串,

  • Sec-WebSocket-Version:13 版本號,

  • Upgrade:websocket 使用websocket協議進行傳輸資料,而不使用HTTP/1.1,

  • User-Agent:用戶代理字串,

服務器接收到客戶端請求后做出回應并回傳如下:

下面是服務器回傳的頭部解釋:

  • Connection:Upgrade 通信協議提升,

  • Date:通信時間

  • Upgrade: websocket 傳輸協議升級為websocket,

  • Sec-WebSocket-Extensions:permessage-deflate

  • Sec-WebSocket-Accept:q9g5u1WfIWaAjNgMmjlTQTqkS/k=
    將Sec-WebSocket-Key的值進行一定的運算和該值進行比較來判斷是否是目標服務器回應了WebSocket請求,

  • Upgrade: 使用websocket協議進行資料傳輸

2.4、WebSocket和Socket的區別

短答案:就像Java和JavaScript,并沒有什么太大的關系,但又不能說完全沒關系,可以這么說:

  • 命名方面,Socket是一個深入人心的概念,WebSocket借用了這一概念;

  • 使用方面,完全兩個東西,

Socket是應用層與TCP/IP協議族通信的中間軟體抽象層,它是一組介面(不是協議,為了方便使用TCP或UDP而抽象出來的一層,是位于應用層和傳輸控制層之間的一組介面),

WebSocket是應用層協議,

2.5、向指定用戶發送WebSocket訊息并處理對方不在線的情況

給指定用戶發送訊息:

  • 如果接收者在線,則直接發送訊息;

  • 否則將訊息存盤到redis,等用戶上線后主動拉取未讀訊息,

2.6、WebSocket心跳機制

在使用WebSocket的程序中,有時候會遇到網路例外斷開的情況,但是在網路斷開的時候服務器端并沒有觸發onclose的事件,這樣會有:服務器會繼續向客戶端發送多余的連接,并且這些資料還會丟失,所以就需要一種機制來檢測客戶端和服務端是否處于正常的連接狀態,因此就有了WebSocket的心跳機制了,還有心跳,說明還活著,沒有心跳說明已經掛掉了,

心跳機制

心跳機制是每隔一段時間會向服務器發送一個資料包,告訴服務器自己還活著,同時客戶端會確認服務器端是否還活著,如果還活著的話,就會回傳一個資料包給客戶端來確定服務器端也還活著,否則的話,有可能是網路斷開連接了,需要重連,

2.7、Netty可以實作WebSocket

Netty是由jboss提供的一款開源框架,常用于搭建RPC中的TCP服務器和WebSocket服務器,甚至是類似Tomcat的web服務器,反正就是各種網路服務器,在處理高并發的專案中,功能豐富且性能良好,基于Java中NIO的二次封裝,具有比原生NIO更好更穩健的體驗,


三、基于Netty實作WebSocket訊息推送

因為產品需求,要實作服務端推送訊息至客戶端,并且支持客戶端對用戶點對點訊息發送的社交功能,服務端給客戶端推送訊息,可以選擇原生的websocket,或者更加高級的netty框架實作,

在此我極力推薦netty,因為一款好的框架一般都是在原生的基礎上進行包裝成更加實用方便,很多我們需要自己考慮的問題都基本可以不用去考慮,不過此文不會去講netty有多么的高深莫測,因為這些概念性的東西隨處可見,而是通過實戰來達到推送訊息的目的,

這個小節,我們主要講解下如何整合Netty和WebSocket,我們需要使用netty對接websocket連接,實作雙向通信,這一步需要有服務端的netty程式,用來處理客戶端的websocket連接操作,例如建立連接,斷開連接,收發資料等,

WebSocket訊息推送實作思路:

前端使用WebSocket與服務端創建連接的時候,將用戶ID傳給服務端,服務端將用戶ID與channel關聯起來存盤,同時將channel放入到channel組中,

如果需要給所有用戶發送訊息,直接執行channel組的writeAndFlush()方法;

如果需要給指定用戶發送訊息,根據用戶ID查詢到對應的channel,然后執行writeAndFlush()方法;

前端獲取到服務端推送的訊息之后,將訊息內容展示到文本域中

下面是具體的代碼實作,基本上每一步操作都配有注釋說明,配合注釋看應該還是比較容易理解的,

3.1、引入Netty的依賴

netty-all包含了netty的所有封裝,hutool-all封裝了常用的一些依賴,如Json相關

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.33.Final</version>
</dependency>

<dependency>
	<groupId>cn.hutool</groupId>
	<artifactId>hutool-all</artifactId>
	<version>5.2.3</version>
</dependency>

3.2、修改組態檔application.yml

server:
  port: 8899

#netty的配置資訊(埠號,webSocket路徑)
webSocket:
  netty:
    port: 58080
    path: /webSocket
    readerIdleTime: 30 #讀空閑超時時間設定(Netty心跳檢測配置)
    writerIdleTime: 30 #寫空閑超時時間設定(Netty心跳檢測配置)
    allIdleTime: 30 #讀寫空閑超時時間設定(Netty心跳檢測配置)

3.3、創建NettyConfig

在NettyConfig中定義一個單例的channel組,管理所有的channel,再定義一個map,管理用戶與channel的對應關系

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;

/**
 * NettyConfig類
 *
 * @author hs
 * @date 2021-09-18
 */
public class NettyConfig {

    /**
     * 定義一個channel組,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全域的事件執行器,是一個單例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用戶與Chanel的對應資訊,用于給指定用戶發送訊息
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private NettyConfig() {}

    /**
     * 獲取channel組
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 獲取用戶channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
        return userChannelMap;
    }
}

3.4、創建Netty的初始化類NettyServer(重點)

定義兩個EventLoopGroup,bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之間的讀寫操作,需要說明的是,需要開啟一個新的執行緒來執行netty server,要不然會阻塞主執行緒,到時候就無法呼叫專案的其他controller介面了,

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * Netty初始化服務
 *
 * @author hs
 */
@Component
public class NettyServer{

    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * webSocket協議名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 埠號
     */
    @Value("${webSocket.netty.port}")
    private int port;

    /**
     * webSocket路徑
     */
    @Value("${webSocket.netty.path}")
    private String webSocketPath;

    /**
     * 在Netty心跳檢測中配置 - 讀空閑超時時間設定
     */
    @Value("${webSocket.netty.readerIdleTime}")
    private long readerIdleTime;

    /**
     * 在Netty心跳檢測中配置 - 寫空閑超時時間設定
     */
    @Value("${webSocket.netty.writerIdleTime}")
    private long writerIdleTime;

    /**
     * 在Netty心跳檢測中配置 - 讀寫空閑超時時間設定
     */
    @Value("${webSocket.netty.allIdleTime}")
    private long allIdleTime;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 啟動
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之前的讀寫操作
        bootstrap.group(bossGroup,workGroup);
        // 設定NIO型別的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 設定監聽埠
        bootstrap.localAddress(new InetSocketAddress(port));
        // 連接到達時會創建一個通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 心跳檢測(一般情況第一個設定,如果超時了,則會呼叫userEventTriggered方法,且會告訴你超時的型別)
                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
                // 流水線管理通道中的處理程式(Handler),用來處理業務
                // webSocket協議本身是基于http協議的,所以這邊也要使用http編解碼器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以塊的方式來寫的處理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                /*
                    說明:
                    1、http資料在傳輸程序中是分段的,HttpObjectAggregator可以將多個段聚合
                    2、這就是為什么,當瀏覽器發送大量資料時,就會發送多次http請求
                 */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                    說明:
                    1、對應webSocket,它的資料是以幀(frame)的形式傳遞
                    2、瀏覽器請求時 ws://localhost:58080/xxx 表示請求的uri
                    3、核心功能是將http協議升級為ws協議,保持長連接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                // 自定義的handler,處理業務邏輯
                ch.pipeline().addLast(webSocketHandler);
            }
        });
        // 配置完成,開始系結server,通過呼叫sync同步方法阻塞直到系結成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
        // 對關閉通道進行監聽
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 釋放資源
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if(bossGroup != null){
            bossGroup.shutdownGracefully().sync();
        }
        if(workGroup != null){
            workGroup.shutdownGracefully().sync();
        }
    }

    /**
     * 初始化(新執行緒開啟)
     */
    @PostConstruct()
    public void init() {
        //需要開啟一個新的執行緒來執行netty server 服務器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

注意:啟動方法需要開啟一個新執行緒執行netty server服務,服務中配置了IdleStateHandler心跳檢測,此類要在創建一個通道的第一個設定,如果超時了,則會呼叫userEventTriggered方法,且會告訴你超時的型別)

3.5、具體實作業務的WebSocketHandler(重點)

創建Netty配置的操作執行類WebSocketHandler,userEventTriggered為心跳檢測超時所呼叫的方法,超時后ctx.channel().close()執行完畢會主動呼叫handlerRemoved洗掉通道及用戶資訊,

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


/**
 * 操作執行類
 *
 * TextWebSocketFrame型別,表示一個文本幀
 * @author hs
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    /**
     * 一旦連接,第一個被執行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded 被呼叫"+ctx.channel().id().asLongText());
        // 添加到channelGroup 通道組
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 讀取資料
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 獲取用戶ID,關聯channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        // 當用戶ID已存入通道內,則不進行寫入,只有第一次建立連接時才會存入,其他情況發送uid則為心跳需求
        if(!NettyConfig.getUserChannelMap().containsKey(uid)){
            log.info("服務器收到訊息:{}",msg.text());
            NettyConfig.getUserChannelMap().put(uid,ctx.channel());
            // 將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回復訊息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器連接成功!"));
        }else{
            // 前端定時請求,保持心跳連接,避免服務端誤刪通道
            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
        }
    }

    /**
     * 移除通道及關聯用戶
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 被呼叫"+ctx.channel().id().asLongText());
        // 洗掉通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    /**
     * 例外處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("例外:{}",cause.getMessage());
        // 洗掉通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 心跳檢測相關方法 - 會主動呼叫handlerRemoved
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.ALL_IDLE){
                //清除超時會話
                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
                writeAndFlush.addListener(new ChannelFutureListener() {

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 洗掉用戶與channel的對應關系
     * @param ctx
     */
    private void removeUserId(ChannelHandlerContext ctx){
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
        log.info("洗掉用戶與channel的對應關系,uid:{}",userId);
    }
}

3.6、具體訊息推送的介面

/**
 * 推送訊息介面
 *
 * @author hs
 */
public interface PushService {

    /**
     * 推送給指定用戶
     * @param userId 用戶ID
     * @param msg 訊息資訊
     */
    void pushMsgToOne(String userId,String msg);

    /**
     * 推送給所有用戶
     * @param msg 訊息資訊
     */
    void pushMsgToAll(String msg);

    /**
     * 獲取當前連接數
     * @return 連接數
     */
    int getConnectCount();
}

3.7、訊息推送介面實作類

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 推送訊息介面實作類
 *
 * @author hs
 */
@Service
public class PushServiceImpl implements PushService {

    /**
     * 推送給指定用戶
     * @param userId 用戶ID
     * @param msg 訊息資訊
     */
    @Override
    public void pushMsgToOne(String userId, String msg){
        ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
        Channel channel = userChannelMap.get(userId);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 推送給所有用戶
     * @param msg 訊息資訊
     */
    @Override
    public void pushMsgToAll(String msg){
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 獲取當前連接數
     * @return 連接數
     */
    @Override
    public int getConnectCount() {
        return NettyConfig.getChannelGroup().size();
    }
}

3.8、提供訊息推送服務的Controller

主要為了測驗

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * 請求Controller(用于postman測驗)
 *
 * @author hs
 */
@RestController
@RequestMapping("/push")
public class PushController {

    @Autowired
    private PushService pushService;

    /**
     * 推送給所有用戶
     * @param msg 訊息資訊
     */
    @PostMapping("/pushAll")
    public void pushToAll(@RequestParam("msg") String msg){
        pushService.pushMsgToAll(msg);
    }

    /**
     * 推送給指定用戶
     * @param userId 用戶ID
     * @param msg 訊息資訊
     */
    @PostMapping("/pushOne")
    public void pushMsgToOne(@RequestParam("userId") String userId,@RequestParam("msg") String msg){
        pushService.pushMsgToOne(userId,msg);
    }

    /**
     * 獲取當前連接數
     */
    @GetMapping("/getConnectCount")
    public int getConnectCout(){
        return pushService.getConnectCount();
    }
}

3.9、Web前端通過websocket與服務端連接

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="/static/jquery-2.2.4.min.js" charset="utf-8"></script>
</head>
<body>
<script>
    var socket;
    var userId = "123456";
    // 判斷當前瀏覽器是否支持webSocket
    if(window.WebSocket){
        socket = new WebSocket("ws://127.0.0.1:58080/webSocket")
        // 相當于channel的read事件,ev 收到服務器回送的訊息
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + ev.data;
        }
        // 相當于連接開啟
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "連接開啟了..."
            socket.send(
                JSON.stringify({
                    // 連接成功將,用戶ID傳給服務端
                    uid: userId
                })
            );
        }
        // 相當于連接關閉
        socket.onclose = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + "連接關閉了...";
        }
    }else{
        alert("當前瀏覽器不支持webSocket")
    }

    // 如果前端需要保持連接,則需要定時往服務器針對自己發送請求,回傳的引數和發送引數一致則證明時間段內有互動,服務端則不進行連接斷開操作
    var int = self.setInterval("clock()",10000);
    function clock() {
        socket.send(
            JSON.stringify({
                // 連接成功將,用戶ID傳給服務端
                uid: userId
            })
        );
    }

</script>
<form onsubmit="return false">
    <textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
    <br>
    <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
ws.onclose = function (e) {
  console.log('websocket 斷開: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
  console.log(e)
}

原因有很多,最好在WebSocket斷開時,將錯誤列印出來,

錯誤狀態碼:

WebSocket斷開時,會觸發CloseEvent, CloseEvent會在連接關閉時發送給使用 WebSockets 的客戶端. 它在 WebSocket 物件的 onclose 事件監聽器中使用,CloseEvent的code欄位表示了WebSocket斷開的原因,可以從該欄位中分析斷開的原因,

CloseEvent有三個欄位需要注意, 通過分析這三個欄位,一般就可以找到斷開原因

  • CloseEvent.code: code是錯誤碼,是整數型別

  • CloseEvent.reason: reason是斷開原因,是字串

  • CloseEvent.wasClean: wasClean表示是否正常斷開,是布林值,一般例外斷開時,該值為false

狀態碼名稱描述
0–999保留段, 未使用.
1000CLOSE_NORMAL正常關閉; 無論為何目的而創建, 該鏈接都已成功完成任務.
1001CLOSE_GOING_AWAY終端離開, 可能因為服務端錯誤, 也可能因為瀏覽器正從打開連接的頁面跳轉離開.
1002CLOSE_PROTOCOL_ERROR由于協議錯誤而中斷連接.
1003CLOSE_UNSUPPORTED由于接收到不允許的資料型別而斷開連接 (如僅接收文本資料的終端接收到了二進制資料).
1004保留. 其意義可能會在未來定義.
1005CLOSE_NO_STATUS保留. 表示沒有收到預期的狀態碼.
1006CLOSE_ABNORMAL保留. 用于期望收到狀態碼時連接非正常關閉 (也就是說, 沒有發送關閉幀).
1007Unsupported Data由于收到了格式不符的資料而斷開連接 (如文本訊息中包含了非 UTF-8 資料).
1008Policy Violation由于收到不符合約定的資料而斷開連接. 這是一個通用狀態碼, 用于不適合使用 1003 和 1009 狀態碼的場景.
1009CLOSE_TOO_LARGE由于收到過大的資料幀而斷開連接.
1010Missing Extension客戶端期望服務器商定一個或多個拓展, 但服務器沒有處理, 因此客戶端斷開連接.
1011Internal Error客戶端由于遇到沒有預料的情況阻止其完成請求, 因此服務端斷開連接.
1012Service Restart服務器由于重啟而斷開連接.
1013Try Again Later服務器由于臨時原因斷開連接, 如服務器過載因此斷開一部分客戶端連接.
1014由 WebSocket標準保留以便未來使用.
1015TLS Handshake保留. 表示連接由于無法完成 TLS 握手而關閉 (例如無法驗證服務器證書).
1016–1999由 WebSocket標準保留以便未來使用.
2000–2999由 WebSocket拓展保留使用.
3000–3999可以由庫或框架使用.? 不應由應用使用. 可以在 IANA 注冊, 先到先得.
4000–4999可以由應用使用.

四、WebSocket和Http之長連接和短連接區別

4.1、HTTP1.0、HTTP1.1 和 HTTP2.0 的區別

HTTP是一個應用層協議,無狀態的,埠號為80,主要的版本有1.0/1.1/2.0.

(1) HTTP/1.0

一次請求-回應,建立一個連接,用完關閉;

(2) HTTP/1.1

HTTP 1.1支持長連接(PersistentConnection)和請求的流水線(Pipelining)處理

串行化單執行緒處理,可以同時在同一個tcp鏈接上發送多個請求,但是只有回應是有順序的,只 有上一個請求完成后,下一個才 能回應,一旦有任務處理超時等,后續任務只能被阻塞(線頭阻塞);

(3)HTTP/2

HTTP2支持多路復用,所以通過同一個連接實作多個http請求傳輸變成了可能,請求并行執行,某任務耗時嚴重,不會影響到任務 正常執行,

4.2、什么是websocket?

Websocket是html5提出的一個協議規范,是為解決客戶端與服務端實時通信,本質上是一個基于tcp,先通過HTTP/HTTPS協議發起一條特殊的http請求進行握手后創建一個用于交換資料的TCP連接,

WebSocket優勢: 瀏覽器和服務器只需要要做一個握手的動作,在建立連接之后,雙方可以在任意時刻,相互推送資訊,同時,服務器與客戶端之間交換的頭資訊很小,

4.3、什么是長連接和短連接

短連接:

連接->傳輸資料->關閉連接

HTTP是無狀態的,瀏覽器和服務器每進行一次HTTP操作,就建立一次連接,但任務結束就中斷連接,也可以這樣說:短連接是指SOCKET連接后發送后接收完資料后馬上斷開連接,

長連接:

連接->傳輸資料->保持連接 -> 傳輸資料-> ,,, ->關閉連接,

長連接指建立SOCKET連接后不管是否使用都保持連接,但安全性較差,

4.4、http和websocket的長連接區別

HTTP1.1通過使用Connection:keep-alive進行長連接,HTTP 1.1默認進行持久連接,在一次 TCP 連接中可以完成多個 HTTP 請求,但是對每個請求仍然要單獨發 header,Keep-Alive不會永久保持連接,它有一個保持時間,可以在不同的服務器軟體(如Apache)中設定這個時間,這種長連接是一種“偽鏈接”

websocket的長連接,是一個真的全雙工,長連接第一次tcp鏈路建立之后,后續資料可以雙方都進行發送,不需要發送請求頭,

keep-alive雙方并沒有建立正真的連接會話,服務端可以在任何一次請求完成后關閉,WebSocket 它本身就規定了是正真的、雙工的長連接,兩邊都必須要維持住連接的狀態,

4.5、HTTP2.0和HTTP1.X相比的新特性

  • 新的二進制格式(Binary Format),HTTP1.x的決議是基于文本,基于文本協議的格式決議存在天然缺陷,文本的表現形式有多樣性,要做到健壯性考慮的場景必然很多,二進制則不同,只認0和1的組合,基于這種考慮HTTP2.0的協議決議決定采用二進制格式,實作方便且健壯,

  • 多路復用(MultiPlexing),即連接共享,即每一個request都是是用作連接共享機制的,一個request對應一個id,這樣一個連接上可以有多個request,每個連接的request可以隨機的混雜在一起,接收方可以根據request的 id將request再歸屬到各自不同的服務端請求里面,

  • header壓縮,如上文中所言,對前面提到過HTTP1.x的header帶有大量資訊,而且每次都要重復發送,HTTP2.0使用encoder來減少需要傳輸的header大小,通訊雙方各自cache一份header fields表,既避免了重復header的傳輸,又減小了需要傳輸的大小,

  • 服務端推送(server push),同SPDY一樣,HTTP2.0也具有server push功能,


參考鏈接:

WebSocket使用

SpringBoot+WebSocket+Netty實作訊息推送

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/301478.html

標籤:其他

上一篇:前端微服務架構下CI/CD構建單鏡像落地方案

下一篇:【手寫原始碼-設計模式15】-責任鏈模式-基于人事請假單作業流場景

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more