前言:移動互聯網蓬勃發展的今天,大部分手機 APP和網站 都提供了訊息推送功能,如新聞客戶端的熱點新聞推薦,IM 工具的聊天訊息提醒,電商產品促銷資訊,企業應用的通知和審批流程等等,推送對于提高產品活躍度、提高功能模塊使用率、提升用戶粘性、提升用戶留存率起到了重要作用,作為 APP 和網站運營中一個關鍵的渠道,對訊息推送的合理運用能有效促進目標的實作,
一、淺析web端的訊息推送原理
股票曲線實時變化,在線IM聊天等等,Web系統里總是能見到訊息推送的應用,訊息推送用好了能增強用戶體驗,實作訊息推送有N種解決方案,
1.1、什么是訊息推送
訊息推送(Push)指運營人員通過自己的產品或第三方工具對用戶當前網頁或移動設備進行的主動訊息推送,用戶可以在網頁上或移動設備鎖定螢屏和通知欄看到push訊息通知,以此來實作用戶的多層次需求,使得用戶能夠自己設定所需要的資訊頻道,得到即時訊息,簡單說就是一種定制資訊的實作方式,我們平時瀏覽郵箱時突然彈出訊息提示收到新郵件就屬于web端訊息推送,在手機鎖屏上看到的微信訊息等等都屬于APP訊息推送,
Web網站推送:
當我們在瀏覽網站觀望猶豫時,突然看到了系統發來一條訊息,一位神秘的神豪老板竟然爆出了麻痹戒指!!!我的天,于是我果斷開始了游戲!這訊息很及時!
APP移動推送:
上述兩種經典場景,是生活中比較常見的場景,也引出了兩大推送種類,Web端訊息推送和移動端訊息推送,本篇博客主要介紹Web推送,順便提一句移動端App常見第三方推送SDK有極光推送、小米推送等等,
1.2、Web端實作訊息推送的四種方式
主要介紹web端其中的四種實作方式:短輪詢、Comet長輪詢、Server-sent、WebSocket,
(1)短輪詢
指在特定的的時間間隔(如每10秒),由瀏覽器對服務器發出HTTP request,然后由服務器回傳最新的資料給客戶端的瀏覽器,瀏覽器做處理后進行顯示,無論后端此時是否有新的訊息產生,都會進行回應,字面上看,這種方式是最簡單的,這種方式的優點是,后端撰寫非常簡單,邏輯不復雜,但是缺點是請求中大部分中是無用的,浪費了帶寬和服務器資源,總結來說,簡單粗暴,適用于小型(偷懶)應用,
(2)Comet長輪詢
長輪詢是客戶端向服務器發送Ajax請求,服務器接到請求后hold住連接,直到有新訊息才回傳回應資訊并關閉連接,客戶端處理完回應資訊后再向服務器發送新的請求;長連接是在頁面中的iframe發送請求到服務端,服務端hold住請求并不斷將需要回傳前端的資料封裝成呼叫javascript函式的形式回應到前端,前端不斷收到回應并處理,Comet的實作原理和短輪詢相比,很明顯少了很多無用請求,減少了帶寬壓力,實作起來比短輪詢復雜一丟丟,想比用短輪詢的同學有夢想時,就可以用Comet來實作自己的推送,
長輪詢的優點很明顯,在無訊息的情況下不會頻繁的請求,耗費資小并且實作了服務端主動向前端推送的功能,但是服務器hold連接會消耗資源,回傳資料順序無保證,難于管理維護,WebQQ(好像掛了)就是這樣實作的,
(3)Server-sent
服務器推指的是HTML5規范中提供的服務端事件EventSource,瀏覽器在實作了該規范的前提下創建一個EventSource連接后,便可收到服務端的發送的訊息,實作一個單向通信,客戶端進行監聽,并對回應的資訊處理顯示,該種方式已經實作了服務端主動推送至前端的功能,優點是在單項傳輸資料的場景中完全滿足需求,開發人員擴展起來基本不需要改后端代碼,直接用現有框架和技術就可以集成,
(4)WebSocket
WebSocket是HTML5下一種新的協議,是基于TCP的應用層協議,只需要一次連接,便可以實作全雙工通信,客戶端和服務端可以相互主動發送訊息,客戶端進行監聽,并對回應的訊息處理顯示,
這個技術相信基本都聽說過,就算沒寫過代碼,也大概知道干嘛的,通過名字就能知道,這是一個Socket連接,一個能在瀏覽器上用的Socket連接,WebSocket是HTML5標準中的一個內容,瀏覽器通過javascript腳本手動創建一個TCP連接與服務端進行通訊,優點是雙向通信,都可以主動發送訊息,既可以滿足“問”+“答”的回應機制,也可以實作主動推送的功能,缺點就是編碼相對來說會多點,服務端處理更復雜(我覺得當一條有情懷的咸魚就應該用這個!),
1.3、實作個性化的推送
上面說了很多實作方案,針對自己系統的應用場景選擇合適的推送方案才是合理的,因此最后簡單說一下實作個性化推送的兩種方式,第一種很簡單,直接使用第三方實作的推送,無需復雜的開發運維,直接可以使用,第二種就是自己封裝,可以選擇如今較為火熱的WebSocket來實作系統的訊息推送,
①直接用第三方的訊息推送服務(并發量多了會收費)
在這里推薦一個第三方推送平臺,GoEasy,
推薦理由是GoEasy的理念符合我們的選擇(可參考http://t.cn/Ex6jg3q):
(1)更簡單的方式將訊息從服務器端推送至客戶端
(2)更簡單的方式將訊息從各種客戶端推送至客戶端
GoEasy具體的使用方式這里不再贅述,詳見官網,對于后端后端開發者,可直接使用Rest方式呼叫推送,對于前端或web開發者,可以從web客戶端用javascript腳本進行呼叫推送,
②封裝自己的推送服務
如果是一個老系統進行擴展,那么更推薦使用Server-sent,服務端改動量不會很大,如果是新系統,更推薦websocket,實作的功能功能更全面,
我們如果需要使用websocket技術實作自己的推送服務,需要注意哪些點,或者說需要踩哪些坑呢,本文最后列出幾點供大家參考:
長連接的心跳激活處理;
服務端調優實作高并發量client同時在線(單機服務器可以實作百萬并發長連接);
群發訊息;
服務端維持多用戶的狀態;
從WebSocket中獲取HttpSession進行用戶相關操作;
等等等….
二、WebSocket簡介
2.1、websocket的由來
我們經常用的是HTTP協議,而HTTP協議是一種無狀態的協議,要實作有狀態的會話必須借助一些外部機制如session/cookie或者Token,這或多或少會帶來一些不便,尤其是服務端和客戶端需要實時交換資料的時候(監控,聊天),這個問題更加明顯,為了適應這種環境,websocket就產生了,目的是即時通訊,替代輪詢,
2.2、websocket概述
WebSocket 是HTML5一種新的協議,它實作了瀏覽器與服務器全雙工通信(full-duplex),一開始的握手需要借助HTTP請求完成,WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,WebSocket 使得客戶端和服務器之間的資料交換變得更加簡單,允許服務端主動向客戶端推送資料,
websocket的特點或作用
-
允許服務端主動向客戶端推送資料
-
在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,并進行雙向資料傳輸,
websocket使用的優點
-
更強的實時性
-
保持連接狀態,創建一次連接后,之后通信時可以省略部分狀態資訊,較少的控制開銷,在連接創建后,服務器和客戶端之間交換資料時,用于協議控制的資料包頭部相對較小,在不包含擴展的情況下,對于服務器到客戶端的內容,此頭部大小只有2至10位元組(和資料包長度有關);對于客戶端到服務器的內容,此頭部還需要加上額外的4位元組的掩碼,相對于HTTP請求每次都要攜帶完整的頭部,此項開銷顯著減少了
websocket使用的缺點
由于websocket使用的持久連接,與服務器一直保持連接,對服務器壓力很大
2.3、websocket請求頭和回應頭
瀏覽器發送websocket請求頭類似如下:

下面是對請求頭部解釋(比http協議多了Upgrade和Connection,是告訴服務器包協議設計ws):
-
Accept-Encoding:瀏覽器可以接受的資料的壓縮型別,
-
Accept-Language:瀏覽器可以接受的語言型別,
-
Cache-Control:no-cache不使用強快取,
-
Connection:Upgrade 通知服務器通信協議提升,
-
Host:主機名,
-
Origin:用于驗證瀏覽器域名是否在服務器許可范圍內,
-
Pragma:no-cache HTTP/1.0定義的不使用本地快取,
-
Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits
-
Sec-WebSocket-Key:lb69kw8CsB4CrSk9tKa3 g==
握手協議密鑰,base64編碼的16位元組的隨機字串, -
Sec-WebSocket-Version:13 版本號,
-
Upgrade:websocket 使用websocket協議進行傳輸資料,而不使用HTTP/1.1,
-
User-Agent:用戶代理字串,
服務器接收到客戶端請求后做出回應并回傳如下:

下面是服務器回傳的頭部解釋:
-
Connection:Upgrade 通信協議提升,
-
Date:通信時間
-
Upgrade: websocket 傳輸協議升級為websocket,
-
Sec-WebSocket-Extensions:permessage-deflate
-
Sec-WebSocket-Accept:q9g5u1WfIWaAjNgMmjlTQTqkS/k=
將Sec-WebSocket-Key的值進行一定的運算和該值進行比較來判斷是否是目標服務器回應了WebSocket請求, -
Upgrade: 使用websocket協議進行資料傳輸
2.4、WebSocket和Socket的區別
短答案:就像Java和JavaScript,并沒有什么太大的關系,但又不能說完全沒關系,可以這么說:
-
命名方面,Socket是一個深入人心的概念,WebSocket借用了這一概念;
-
使用方面,完全兩個東西,
Socket是應用層與TCP/IP協議族通信的中間軟體抽象層,它是一組介面(不是協議,為了方便使用TCP或UDP而抽象出來的一層,是位于應用層和傳輸控制層之間的一組介面),
WebSocket是應用層協議,
2.5、向指定用戶發送WebSocket訊息并處理對方不在線的情況
給指定用戶發送訊息:
-
如果接收者在線,則直接發送訊息;
-
否則將訊息存盤到redis,等用戶上線后主動拉取未讀訊息,
2.6、WebSocket心跳機制
在使用WebSocket的程序中,有時候會遇到網路例外斷開的情況,但是在網路斷開的時候服務器端并沒有觸發onclose的事件,這樣會有:服務器會繼續向客戶端發送多余的連接,并且這些資料還會丟失,所以就需要一種機制來檢測客戶端和服務端是否處于正常的連接狀態,因此就有了WebSocket的心跳機制了,還有心跳,說明還活著,沒有心跳說明已經掛掉了,
心跳機制
心跳機制是每隔一段時間會向服務器發送一個資料包,告訴服務器自己還活著,同時客戶端會確認服務器端是否還活著,如果還活著的話,就會回傳一個資料包給客戶端來確定服務器端也還活著,否則的話,有可能是網路斷開連接了,需要重連,
2.7、Netty可以實作WebSocket
Netty是由jboss提供的一款開源框架,常用于搭建RPC中的TCP服務器和WebSocket服務器,甚至是類似Tomcat的web服務器,反正就是各種網路服務器,在處理高并發的專案中,功能豐富且性能良好,基于Java中NIO的二次封裝,具有比原生NIO更好更穩健的體驗,
三、基于Netty實作WebSocket訊息推送
因為產品需求,要實作服務端推送訊息至客戶端,并且支持客戶端對用戶點對點訊息發送的社交功能,服務端給客戶端推送訊息,可以選擇原生的websocket,或者更加高級的netty框架實作,
在此我極力推薦netty,因為一款好的框架一般都是在原生的基礎上進行包裝成更加實用方便,很多我們需要自己考慮的問題都基本可以不用去考慮,不過此文不會去講netty有多么的高深莫測,因為這些概念性的東西隨處可見,而是通過實戰來達到推送訊息的目的,
這個小節,我們主要講解下如何整合Netty和WebSocket,我們需要使用netty對接websocket連接,實作雙向通信,這一步需要有服務端的netty程式,用來處理客戶端的websocket連接操作,例如建立連接,斷開連接,收發資料等,
WebSocket訊息推送實作思路:
前端使用WebSocket與服務端創建連接的時候,將用戶ID傳給服務端,服務端將用戶ID與channel關聯起來存盤,同時將channel放入到channel組中,
如果需要給所有用戶發送訊息,直接執行channel組的writeAndFlush()方法;
如果需要給指定用戶發送訊息,根據用戶ID查詢到對應的channel,然后執行writeAndFlush()方法;
前端獲取到服務端推送的訊息之后,將訊息內容展示到文本域中
下面是具體的代碼實作,基本上每一步操作都配有注釋說明,配合注釋看應該還是比較容易理解的,
3.1、引入Netty的依賴
netty-all包含了netty的所有封裝,hutool-all封裝了常用的一些依賴,如Json相關
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
3.2、修改組態檔application.yml
server:
port: 8899
#netty的配置資訊(埠號,webSocket路徑)
webSocket:
netty:
port: 58080
path: /webSocket
readerIdleTime: 30 #讀空閑超時時間設定(Netty心跳檢測配置)
writerIdleTime: 30 #寫空閑超時時間設定(Netty心跳檢測配置)
allIdleTime: 30 #讀寫空閑超時時間設定(Netty心跳檢測配置)
3.3、創建NettyConfig
在NettyConfig中定義一個單例的channel組,管理所有的channel,再定義一個map,管理用戶與channel的對應關系,
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
/**
* NettyConfig類
*
* @author hs
* @date 2021-09-18
*/
public class NettyConfig {
/**
* 定義一個channel組,管理所有的channel
* GlobalEventExecutor.INSTANCE 是全域的事件執行器,是一個單例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放用戶與Chanel的對應資訊,用于給指定用戶發送訊息
*/
private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();
private NettyConfig() {}
/**
* 獲取channel組
* @return
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
/**
* 獲取用戶channel map
* @return
*/
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
return userChannelMap;
}
}
3.4、創建Netty的初始化類NettyServer(重點)
定義兩個EventLoopGroup,bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之間的讀寫操作,需要說明的是,需要開啟一個新的執行緒來執行netty server,要不然會阻塞主執行緒,到時候就無法呼叫專案的其他controller介面了,
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
* Netty初始化服務
*
* @author hs
*/
@Component
public class NettyServer{
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
/**
* webSocket協議名
*/
private static final String WEBSOCKET_PROTOCOL = "WebSocket";
/**
* 埠號
*/
@Value("${webSocket.netty.port}")
private int port;
/**
* webSocket路徑
*/
@Value("${webSocket.netty.path}")
private String webSocketPath;
/**
* 在Netty心跳檢測中配置 - 讀空閑超時時間設定
*/
@Value("${webSocket.netty.readerIdleTime}")
private long readerIdleTime;
/**
* 在Netty心跳檢測中配置 - 寫空閑超時時間設定
*/
@Value("${webSocket.netty.writerIdleTime}")
private long writerIdleTime;
/**
* 在Netty心跳檢測中配置 - 讀寫空閑超時時間設定
*/
@Value("${webSocket.netty.allIdleTime}")
private long allIdleTime;
@Autowired
private WebSocketHandler webSocketHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
/**
* 啟動
* @throws InterruptedException
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之前的讀寫操作
bootstrap.group(bossGroup,workGroup);
// 設定NIO型別的channel
bootstrap.channel(NioServerSocketChannel.class);
// 設定監聽埠
bootstrap.localAddress(new InetSocketAddress(port));
// 連接到達時會創建一個通道
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 心跳檢測(一般情況第一個設定,如果超時了,則會呼叫userEventTriggered方法,且會告訴你超時的型別)
ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
// 流水線管理通道中的處理程式(Handler),用來處理業務
// webSocket協議本身是基于http協議的,所以這邊也要使用http編解碼器
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ObjectEncoder());
// 以塊的方式來寫的處理器
ch.pipeline().addLast(new ChunkedWriteHandler());
/*
說明:
1、http資料在傳輸程序中是分段的,HttpObjectAggregator可以將多個段聚合
2、這就是為什么,當瀏覽器發送大量資料時,就會發送多次http請求
*/
ch.pipeline().addLast(new HttpObjectAggregator(8192));
/*
說明:
1、對應webSocket,它的資料是以幀(frame)的形式傳遞
2、瀏覽器請求時 ws://localhost:58080/xxx 表示請求的uri
3、核心功能是將http協議升級為ws協議,保持長連接
*/
ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
// 自定義的handler,處理業務邏輯
ch.pipeline().addLast(webSocketHandler);
}
});
// 配置完成,開始系結server,通過呼叫sync同步方法阻塞直到系結成功
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
// 對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
}
/**
* 釋放資源
* @throws InterruptedException
*/
@PreDestroy
public void destroy() throws InterruptedException {
if(bossGroup != null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup != null){
workGroup.shutdownGracefully().sync();
}
}
/**
* 初始化(新執行緒開啟)
*/
@PostConstruct()
public void init() {
//需要開啟一個新的執行緒來執行netty server 服務器
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
注意:啟動方法需要開啟一個新執行緒執行netty server服務,服務中配置了IdleStateHandler心跳檢測,此類要在創建一個通道的第一個設定,如果超時了,則會呼叫userEventTriggered方法,且會告訴你超時的型別)
3.5、具體實作業務的WebSocketHandler(重點)
創建Netty配置的操作執行類WebSocketHandler,userEventTriggered為心跳檢測超時所呼叫的方法,超時后ctx.channel().close()執行完畢會主動呼叫handlerRemoved洗掉通道及用戶資訊,
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 操作執行類
*
* TextWebSocketFrame型別,表示一個文本幀
* @author hs
*/
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
/**
* 一旦連接,第一個被執行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded 被呼叫"+ctx.channel().id().asLongText());
// 添加到channelGroup 通道組
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* 讀取資料
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 獲取用戶ID,關聯channel
JSONObject jsonObject = JSONUtil.parseObj(msg.text());
String uid = jsonObject.getStr("uid");
// 當用戶ID已存入通道內,則不進行寫入,只有第一次建立連接時才會存入,其他情況發送uid則為心跳需求
if(!NettyConfig.getUserChannelMap().containsKey(uid)){
log.info("服務器收到訊息:{}",msg.text());
NettyConfig.getUserChannelMap().put(uid,ctx.channel());
// 將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID
AttributeKey<String> key = AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);
// 回復訊息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器連接成功!"));
}else{
// 前端定時請求,保持心跳連接,避免服務端誤刪通道
ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
}
}
/**
* 移除通道及關聯用戶
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved 被呼叫"+ctx.channel().id().asLongText());
// 洗掉通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}
/**
* 例外處理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("例外:{}",cause.getMessage());
// 洗掉通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}
/**
* 心跳檢測相關方法 - 會主動呼叫handlerRemoved
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.ALL_IDLE){
//清除超時會話
ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
writeAndFlush.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.channel().close();
}
});
}
}else{
super.userEventTriggered(ctx, evt);
}
}
/**
* 洗掉用戶與channel的對應關系
* @param ctx
*/
private void removeUserId(ChannelHandlerContext ctx){
AttributeKey<String> key = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(key).get();
NettyConfig.getUserChannelMap().remove(userId);
log.info("洗掉用戶與channel的對應關系,uid:{}",userId);
}
}
3.6、具體訊息推送的介面
/**
* 推送訊息介面
*
* @author hs
*/
public interface PushService {
/**
* 推送給指定用戶
* @param userId 用戶ID
* @param msg 訊息資訊
*/
void pushMsgToOne(String userId,String msg);
/**
* 推送給所有用戶
* @param msg 訊息資訊
*/
void pushMsgToAll(String msg);
/**
* 獲取當前連接數
* @return 連接數
*/
int getConnectCount();
}
3.7、訊息推送介面實作類
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
/**
* 推送訊息介面實作類
*
* @author hs
*/
@Service
public class PushServiceImpl implements PushService {
/**
* 推送給指定用戶
* @param userId 用戶ID
* @param msg 訊息資訊
*/
@Override
public void pushMsgToOne(String userId, String msg){
ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
Channel channel = userChannelMap.get(userId);
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
/**
* 推送給所有用戶
* @param msg 訊息資訊
*/
@Override
public void pushMsgToAll(String msg){
NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
}
/**
* 獲取當前連接數
* @return 連接數
*/
@Override
public int getConnectCount() {
return NettyConfig.getChannelGroup().size();
}
}
3.8、提供訊息推送服務的Controller
主要為了測驗
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* 請求Controller(用于postman測驗)
*
* @author hs
*/
@RestController
@RequestMapping("/push")
public class PushController {
@Autowired
private PushService pushService;
/**
* 推送給所有用戶
* @param msg 訊息資訊
*/
@PostMapping("/pushAll")
public void pushToAll(@RequestParam("msg") String msg){
pushService.pushMsgToAll(msg);
}
/**
* 推送給指定用戶
* @param userId 用戶ID
* @param msg 訊息資訊
*/
@PostMapping("/pushOne")
public void pushMsgToOne(@RequestParam("userId") String userId,@RequestParam("msg") String msg){
pushService.pushMsgToOne(userId,msg);
}
/**
* 獲取當前連接數
*/
@GetMapping("/getConnectCount")
public int getConnectCout(){
return pushService.getConnectCount();
}
}
3.9、Web前端通過websocket與服務端連接
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="/static/jquery-2.2.4.min.js" charset="utf-8"></script>
</head>
<body>
<script>
var socket;
var userId = "123456";
// 判斷當前瀏覽器是否支持webSocket
if(window.WebSocket){
socket = new WebSocket("ws://127.0.0.1:58080/webSocket")
// 相當于channel的read事件,ev 收到服務器回送的訊息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相當于連接開啟
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "連接開啟了..."
socket.send(
JSON.stringify({
// 連接成功將,用戶ID傳給服務端
uid: userId
})
);
}
// 相當于連接關閉
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "連接關閉了...";
}
}else{
alert("當前瀏覽器不支持webSocket")
}
// 如果前端需要保持連接,則需要定時往服務器針對自己發送請求,回傳的引數和發送引數一致則證明時間段內有互動,服務端則不進行連接斷開操作
var int = self.setInterval("clock()",10000);
function clock() {
socket.send(
JSON.stringify({
// 連接成功將,用戶ID傳給服務端
uid: userId
})
);
}
</script>
<form onsubmit="return false">
<textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
<br>
<input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
ws.onclose = function (e) {
console.log('websocket 斷開: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
console.log(e)
}
原因有很多,最好在WebSocket斷開時,將錯誤列印出來,
錯誤狀態碼:
WebSocket斷開時,會觸發CloseEvent, CloseEvent會在連接關閉時發送給使用 WebSockets 的客戶端. 它在 WebSocket 物件的 onclose 事件監聽器中使用,CloseEvent的code欄位表示了WebSocket斷開的原因,可以從該欄位中分析斷開的原因,
CloseEvent有三個欄位需要注意, 通過分析這三個欄位,一般就可以找到斷開原因
-
CloseEvent.code: code是錯誤碼,是整數型別 -
CloseEvent.reason: reason是斷開原因,是字串 -
CloseEvent.wasClean: wasClean表示是否正常斷開,是布林值,一般例外斷開時,該值為false
| 狀態碼 | 名稱 | 描述 |
|---|---|---|
| 0–999 | 保留段, 未使用. | |
| 1000 | CLOSE_NORMAL | 正常關閉; 無論為何目的而創建, 該鏈接都已成功完成任務. |
| 1001 | CLOSE_GOING_AWAY | 終端離開, 可能因為服務端錯誤, 也可能因為瀏覽器正從打開連接的頁面跳轉離開. |
| 1002 | CLOSE_PROTOCOL_ERROR | 由于協議錯誤而中斷連接. |
| 1003 | CLOSE_UNSUPPORTED | 由于接收到不允許的資料型別而斷開連接 (如僅接收文本資料的終端接收到了二進制資料). |
| 1004 | 保留. 其意義可能會在未來定義. | |
| 1005 | CLOSE_NO_STATUS | 保留. 表示沒有收到預期的狀態碼. |
1006 | CLOSE_ABNORMAL | 保留. 用于期望收到狀態碼時連接非正常關閉 (也就是說, 沒有發送關閉幀). |
| 1007 | Unsupported Data | 由于收到了格式不符的資料而斷開連接 (如文本訊息中包含了非 UTF-8 資料). |
| 1008 | Policy Violation | 由于收到不符合約定的資料而斷開連接. 這是一個通用狀態碼, 用于不適合使用 1003 和 1009 狀態碼的場景. |
| 1009 | CLOSE_TOO_LARGE | 由于收到過大的資料幀而斷開連接. |
| 1010 | Missing Extension | 客戶端期望服務器商定一個或多個拓展, 但服務器沒有處理, 因此客戶端斷開連接. |
| 1011 | Internal Error | 客戶端由于遇到沒有預料的情況阻止其完成請求, 因此服務端斷開連接. |
| 1012 | Service Restart | 服務器由于重啟而斷開連接. |
| 1013 | Try Again Later | 服務器由于臨時原因斷開連接, 如服務器過載因此斷開一部分客戶端連接. |
| 1014 | 由 WebSocket標準保留以便未來使用. | |
| 1015 | TLS Handshake | 保留. 表示連接由于無法完成 TLS 握手而關閉 (例如無法驗證服務器證書). |
| 1016–1999 | 由 WebSocket標準保留以便未來使用. | |
| 2000–2999 | 由 WebSocket拓展保留使用. | |
| 3000–3999 | 可以由庫或框架使用.? 不應由應用使用. 可以在 IANA 注冊, 先到先得. | |
| 4000–4999 | 可以由應用使用. |
四、WebSocket和Http之長連接和短連接區別
4.1、HTTP1.0、HTTP1.1 和 HTTP2.0 的區別
HTTP是一個應用層協議,無狀態的,埠號為80,主要的版本有1.0/1.1/2.0.
(1) HTTP/1.0
一次請求-回應,建立一個連接,用完關閉;
(2) HTTP/1.1
HTTP 1.1支持長連接(PersistentConnection)和請求的流水線(Pipelining)處理
串行化單執行緒處理,可以同時在同一個tcp鏈接上發送多個請求,但是只有回應是有順序的,只 有上一個請求完成后,下一個才 能回應,一旦有任務處理超時等,后續任務只能被阻塞(線頭阻塞);
(3)HTTP/2
HTTP2支持多路復用,所以通過同一個連接實作多個http請求傳輸變成了可能,請求并行執行,某任務耗時嚴重,不會影響到任務 正常執行,
4.2、什么是websocket?
Websocket是html5提出的一個協議規范,是為解決客戶端與服務端實時通信,本質上是一個基于tcp,先通過HTTP/HTTPS協議發起一條特殊的http請求進行握手后創建一個用于交換資料的TCP連接,
WebSocket優勢: 瀏覽器和服務器只需要要做一個握手的動作,在建立連接之后,雙方可以在任意時刻,相互推送資訊,同時,服務器與客戶端之間交換的頭資訊很小,
4.3、什么是長連接和短連接
短連接:
連接->傳輸資料->關閉連接
HTTP是無狀態的,瀏覽器和服務器每進行一次HTTP操作,就建立一次連接,但任務結束就中斷連接,也可以這樣說:短連接是指SOCKET連接后發送后接收完資料后馬上斷開連接,
長連接:
連接->傳輸資料->保持連接 -> 傳輸資料-> ,,, ->關閉連接,
長連接指建立SOCKET連接后不管是否使用都保持連接,但安全性較差,
4.4、http和websocket的長連接區別
HTTP1.1通過使用Connection:keep-alive進行長連接,HTTP 1.1默認進行持久連接,在一次 TCP 連接中可以完成多個 HTTP 請求,但是對每個請求仍然要單獨發 header,Keep-Alive不會永久保持連接,它有一個保持時間,可以在不同的服務器軟體(如Apache)中設定這個時間,這種長連接是一種“偽鏈接”
websocket的長連接,是一個真的全雙工,長連接第一次tcp鏈路建立之后,后續資料可以雙方都進行發送,不需要發送請求頭,
keep-alive雙方并沒有建立正真的連接會話,服務端可以在任何一次請求完成后關閉,WebSocket 它本身就規定了是正真的、雙工的長連接,兩邊都必須要維持住連接的狀態,
4.5、HTTP2.0和HTTP1.X相比的新特性
-
新的二進制格式(Binary Format),HTTP1.x的決議是基于文本,基于文本協議的格式決議存在天然缺陷,文本的表現形式有多樣性,要做到健壯性考慮的場景必然很多,二進制則不同,只認0和1的組合,基于這種考慮HTTP2.0的協議決議決定采用二進制格式,實作方便且健壯,
-
多路復用(MultiPlexing),即連接共享,即每一個request都是是用作連接共享機制的,一個request對應一個id,這樣一個連接上可以有多個request,每個連接的request可以隨機的混雜在一起,接收方可以根據request的 id將request再歸屬到各自不同的服務端請求里面,
-
header壓縮,如上文中所言,對前面提到過HTTP1.x的header帶有大量資訊,而且每次都要重復發送,HTTP2.0使用encoder來減少需要傳輸的header大小,通訊雙方各自cache一份header fields表,既避免了重復header的傳輸,又減小了需要傳輸的大小,
-
服務端推送(server push),同SPDY一樣,HTTP2.0也具有server push功能,
參考鏈接:
WebSocket使用
SpringBoot+WebSocket+Netty實作訊息推送
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/301478.html
標籤:其他
