一、用戶獲取新的訊息通知有兩種模式
-
上線登錄后向系統主動索取
-
在線時系統向接收者主動推送新訊息
設想下,用戶的通知訊息和新通知提醒資料都放在資料庫中,資料庫的讀寫操作頻繁,如果訊息量大,DB壓力較大,可能出現資料瓶頸,這時候就可以引入訊息佇列RabbitMQ進行流量削峰,
向指定用戶發送WebSocket訊息并處理對方不在線的情況:
-
如果接收者在線,則直接發送訊息;
-
否則將訊息存盤到redis,等用戶上線后主動拉取未讀訊息,
二、Websocket+RabbitMQ訊息推送架構圖
從圖中可以看出訊息通知系統的基本流程是客戶端A請求服務端核心模塊,核心模塊生產一條訊息到訊息佇列,然后服務端訊息模塊消費訊息,消費完之后就將訊息推送給客戶端B,流程很簡單,沒有太多技巧,唯一的巧妙之處就在訊息模塊這邊的處理上,


一般還需要在「個人中心」需要有一個設定是否接收訊息的設定項,滿足用戶個性化需求,

我們當前的流程有些取巧,原本應該是消費者發訊息之前就去請求「用戶訊息設定」,用戶設定成接收,才去產生訊息的,而我們現在的流程中消費者不去關注用戶設定,把所有訊息都往「佇列」里塞,讓主流程去做過濾處理,這樣各個生產者就不用每個都去單獨處理,同時也少了一次網路互動,
三、訊息通知的型別
幾乎每個站點都有訊息通知系統,可見通知系統的重要性不言而喻,通知系統看似簡單,實際上比較復雜,那么本篇主要講解常見的訊息通知系統的設計和具體實作,包括資料庫設計、邏輯關系分析等,
常見的站內通知類別:
- 公告 Announcement
-
提醒 Remind
- 資源訂閱提醒「我關注的資源有更新、評論等事件時通知我」
- 資源發布提醒「我發布的資源有評論、收藏等事件時通知我」
- 系統提醒「平臺會根據一些演算法、規則等可能會對你的資源做一些事情,這時你會收到系統通知」
- 私信 Mailbox
以上三種訊息有各自特點,實作也各不相同,其中「提醒」類通知是最復雜的:
通知事件:
通知事件就是當用戶在網站或應用上產生了支付行為之后,如果你想給用戶一個通知,告訴她系統已收到她的付款,那么你就要把這個「支付行為」定義為一個通知事件,并且保存這個通知事件到「通知事件表」里,以便通知系統作異步處理,通知系統會不斷的處理「通知事件表」里的資料,分析每一個事件應該通知和不通知哪些人,
通知事件表「notify_event」
記錄每一個用戶行為產生的通知事件資訊
表結構如下:
id: {type: 'integer', primaryKey: true, autoIncrement:true}
userID: {type: 'string', required: true} //用戶ID
action: {type: 'string', required: true} //動作,如:捐款/更新/評論/收藏
objectID: {type: 'string', required: true}, //物件ID,如:文章ID;
objectType: {type: 'string', required: true} //物件所屬型別,如:人、文章、活動、視頻等;
createdAt:{type: 'timestamp', required: true} //創建時間;
用戶行為定義
「action」即用戶行為,如:贊了、評論了、喜歡了、捐款了、收藏了;一般來講,我們把一個用戶行為定義為一個通知型別,那么用戶行為必須是需要提前定義好的,
由訊息系統內部定義,為后臺提供介面,用于通知設定,如下:
notify_action_type := ["donated","conllected","commented","updated"]
物件型別定義
「objectType」即用戶行為作用的物件的所屬型別,簡單的說就是資源型別,如:專案、文章、評論、商品、視頻、圖片、用戶,
由訊息系統內部定義,為后臺提供介面,用于通知設定,如下:
notify_object_type := ["project","comment"]
四、訊息通知系統注意事項
4.1、Nginx代理webSocket時60s自動斷開, 怎么保持長連接
利用nginx代理websocket的時候,發現客戶端和服務器握手成功后,如果在60s時間內沒有資料互動,連接就會自動斷開,
nginx.conf 檔案里location 中的proxy_read_timeout 默認60s斷開,可以把他設定大一點,你可以設定成自己需要的時間,我這里設定的是十分鐘(600s).
nginx配置如下:
server {
listen 80;
server_name carrefourzone.senguo.cc;
#error_page 502 /static/502.html;
location /static/ {
root /home/chenming/Carrefour/carrefour.senguo.cc/source;
expires 7d;
}
location / {
proxy_pass_header Server;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_pass http://127.0.0.1:9887;
proxy_http_version 1.1;
proxy_set_header Upgrade "websocket";
proxy_set_header Connection "Upgrade";
proxy_read_timeout 600s;
}
}
按照上述方法設定好后,我們可以發現,如果在10分鐘之內沒有資料互動的話,websocket連接就會自動斷開,所以這種方式還是有點問題,如果我頁面停留時間超過十分鐘而且又沒有資料互動的話,連接還是會斷開的,所以需要同時結合WebSocket心跳機制的方法.
4.2、WebSocket的心跳激活機制
心跳機制是每隔一段時間會向服務器發送一個資料包,告訴服務器自己還活著,同時客戶端會確認服務器端是否還活著,如果還活著的話,就會回傳一個資料包給客戶端來確定服務器端也還活著,否則的話,有可能是網路斷開連接了,需要重連,
WebSocket 長連接需要在弱網環境和網路暫時斷開的情況下,需要有一個穩定的重連機制,保證在網路不穩定的時候,客戶端和服務端能夠重連,繼續通信,
在nginx延長超時時間的基礎上,前端在超時時間內發心跳包,重繪再讀時間,前端具體實作見如下代碼(此處代碼包含了前端整個websocket的實作程序,其中紅色重點標注了發心跳包的內容):
// websocket連接
var websocket_connected_count = 0;
var onclose_connected_count = 0;
function newWebSocket(){
var websocket = null;
// 判斷當前環境是否支持websocket
if(window.WebSocket){
if(!websocket){
var ws_url ="wss://"+domain+"/updatewebsocket";
websocket = new WebSocket(ws_url);
}
}else{
Tip("not support websocket");
}
// 連接成功建立的回呼方法
websocket.onopen = function(e){
heartCheck.reset().start(); // 成功建立連接后,重置心跳檢測
Tip("connected successfully")
}
// 連接發生錯誤,連接錯誤時會繼續嘗試發起連接(嘗試5次)
websocket.onerror = function() {
console.log("onerror連接發生錯誤")
websocket_connected_count++;
if(websocket_connected_count <= 5){
newWebSocket()
}
}
// 接受到訊息的回呼方法
websocket.onmessage = function(e){
console.log("接受到訊息了")
heartCheck.reset().start(); // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
var message = e.data;
if(message){
//執行接收到訊息的操作,一般是重繪UI
}
}
// 接受到服務端關閉連接時的回呼方法
websocket.onclose = function(){
Tip("onclose斷開連接");
}
// 監聽視窗事件,當視窗關閉時,主動斷開websocket連接,防止連接沒斷開就關閉視窗,server端報錯
window.onbeforeunload = function(){
websocket.close();
}
// 心跳檢測, 每隔一段時間檢測連接狀態,如果處于連接中,就向server端主動發送訊息,來重置server端與客戶端的最大連接時間,如果已經斷開了,發起重連,
var heartCheck = {
timeout: 55000, // 9分鐘發一次心跳,比server端設定的連接時間稍微小一點,在接近斷開的情況下以通信的方式去重置連接時間,
serverTimeoutObj: null,
reset: function(){
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function(){
var self = this;
this.serverTimeoutObj = setInterval(function(){
if(websocket.readyState == 1){
console.log("連接狀態,發送訊息保持連接");
websocket.send("ping");
heartCheck.reset().start(); // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
}else{
console.log("斷開狀態,嘗試重連");
newWebSocket();
}
}, this.timeout)
}
}
}
4.3、RabbitMQ消費端的必須捕捉例外
我遇到的Bug是對前端輸入沒有Emoj表情過濾導致JPA插入報錯,且消費者程式沒有try-catch捕捉例外,最終導致消費者程式例外終止,
4.4、WebSocket實作Web聊天室的多頁面跨面問題
一般能想到的有:
-
通過IFrame
-
通過web都做成單頁
-
通過sharedworker(也解決不了多瀏覽器),但是不是所有瀏覽器都支持
4.5、使用websocket實作群聊
需要很多用戶(在不同的房間)進行實時聊天,也就是一個簡單的聊天室,這里用的是WebSocket實作,這里需要對每一個連接都指定兩個引數:用戶的userId和所加入的房間id(roomId);
這里是用到一個map<房間id, 用戶set>來保存房間對應的用戶連接串列,當有用戶進入一個房間的時候,就會先檢測房間是否存在,如果不存在那就新建一個空的用戶set,再加入本身到這個set中;這里需要考慮執行緒安全問題,因為用到的是一個hashMap,如同時又AB兩個用戶加入一個空房間,同時訪問friends為空,然后都會新建一個set再加入進去,那么可能會出現一個情況就是A檢測不存在房間,然后創建加入進去,B也同時檢測到不存在,也重新創建一個用戶set,這樣就會覆寫原來的set,也就是說A用戶就加入失敗
推薦閱讀:使用websocket實作群聊(多個群)
五、自定義HandshakeInterceptor,用于禁止未登錄用戶連接WebSocket
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
/**
* 自定義{@link org.springframework.web.socket.server.HandshakeInterceptor},實作“需要登錄才允許連接WebSocket”
*
*/
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser != null){
logger.debug(MessageFormat.format("用戶{0}請求建立WebSocket連接", loginUser.getUsername()));
return true;
}else{
logger.error("未登錄系統,禁止連接WebSocket");
return false;
}
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
參考鏈接:
Web 端訊息通知機制現實方案
訊息通知系統模型設計
Nginx代理webSocket時60s自動斷開, 怎么保持長連接
消費端channel主動斷開后,可能存在的bug
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/301254.html
標籤:其他
