主頁 > 軟體設計 > Websocket+RabbitMQ實作訊息推送系統

Websocket+RabbitMQ實作訊息推送系統

2021-09-19 09:45:06 軟體設計

一、用戶獲取新的訊息通知有兩種模式

  • 上線登錄后向系統主動索取

  • 在線時系統向接收者主動推送新訊息

設想下,用戶的通知訊息和新通知提醒資料都放在資料庫中,資料庫的讀寫操作頻繁,如果訊息量大,DB壓力較大,可能出現資料瓶頸,這時候就可以引入訊息佇列RabbitMQ進行流量削峰

向指定用戶發送WebSocket訊息并處理對方不在線的情況:

  • 如果接收者在線,則直接發送訊息;

  • 否則將訊息存盤到redis,等用戶上線后主動拉取未讀訊息,


二、Websocket+RabbitMQ訊息推送架構圖

從圖中可以看出訊息通知系統的基本流程是客戶端A請求服務端核心模塊,核心模塊生產一條訊息到訊息佇列,然后服務端訊息模塊消費訊息,消費完之后就將訊息推送給客戶端B,流程很簡單,沒有太多技巧,唯一的巧妙之處就在訊息模塊這邊的處理上,

一般還需要在「個人中心」需要有一個設定是否接收訊息的設定項,滿足用戶個性化需求,

我們當前的流程有些取巧,原本應該是消費者發訊息之前就去請求「用戶訊息設定」,用戶設定成接收,才去產生訊息的,而我們現在的流程中消費者不去關注用戶設定,把所有訊息都往「佇列」里塞,讓主流程去做過濾處理,這樣各個生產者就不用每個都去單獨處理,同時也少了一次網路互動,


三、訊息通知的型別

幾乎每個站點都有訊息通知系統,可見通知系統的重要性不言而喻,通知系統看似簡單,實際上比較復雜,那么本篇主要講解常見的訊息通知系統的設計和具體實作,包括資料庫設計、邏輯關系分析等,

常見的站內通知類別:

  • 公告 Announcement
  • 提醒 Remind

    • 資源訂閱提醒「我關注的資源有更新、評論等事件時通知我」
    • 資源發布提醒「我發布的資源有評論、收藏等事件時通知我」
    • 系統提醒「平臺會根據一些演算法、規則等可能會對你的資源做一些事情,這時你會收到系統通知」
  • 私信 Mailbox

以上三種訊息有各自特點,實作也各不相同,其中「提醒」類通知是最復雜的:

通知事件

通知事件就是當用戶在網站或應用上產生了支付行為之后,如果你想給用戶一個通知,告訴她系統已收到她的付款,那么你就要把這個「支付行為」定義為一個通知事件,并且保存這個通知事件到「通知事件表」里,以便通知系統作異步處理,通知系統會不斷的處理「通知事件表」里的資料,分析每一個事件應該通知和不通知哪些人,

通知事件表「notify_event」

記錄每一個用戶行為產生的通知事件資訊

表結構如下:

id: {type: 'integer', primaryKey: true, autoIncrement:true} 
userID: {type: 'string', required: true} //用戶ID
action: {type: 'string', required: true} //動作,如:捐款/更新/評論/收藏
objectID: {type: 'string', required: true}, //物件ID,如:文章ID;
objectType: {type: 'string', required: true} //物件所屬型別,如:人、文章、活動、視頻等;
createdAt:{type: 'timestamp', required: true} //創建時間;

用戶行為定義

「action」即用戶行為,如:贊了、評論了、喜歡了、捐款了、收藏了;一般來講,我們把一個用戶行為定義為一個通知型別,那么用戶行為必須是需要提前定義好的,

由訊息系統內部定義,為后臺提供介面,用于通知設定,如下:

notify_action_type := ["donated","conllected","commented","updated"]

物件型別定義

「objectType」即用戶行為作用的物件的所屬型別,簡單的說就是資源型別,如:專案、文章、評論、商品、視頻、圖片、用戶,

由訊息系統內部定義,為后臺提供介面,用于通知設定,如下:

notify_object_type := ["project","comment"]

四、訊息通知系統注意事項

4.1、Nginx代理webSocket時60s自動斷開, 怎么保持長連接

利用nginx代理websocket的時候,發現客戶端和服務器握手成功后,如果在60s時間內沒有資料互動,連接就會自動斷開,

nginx.conf 檔案里location 中的proxy_read_timeout 默認60s斷開,可以把他設定大一點,你可以設定成自己需要的時間,我這里設定的是十分鐘(600s).

nginx配置如下:

server {
        listen 80;
        server_name carrefourzone.senguo.cc;
        #error_page 502 /static/502.html;

        location /static/ {
            root /home/chenming/Carrefour/carrefour.senguo.cc/source;
            expires 7d;
            }

        location / {
            proxy_pass_header Server;
            proxy_set_header Host $http_host;
            proxy_redirect off;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Scheme $scheme;
            proxy_pass       http://127.0.0.1:9887;
            proxy_http_version  1.1;
            proxy_set_header    Upgrade    "websocket";
            proxy_set_header    Connection "Upgrade";
            proxy_read_timeout 600s; 
        }
    }

按照上述方法設定好后,我們可以發現,如果在10分鐘之內沒有資料互動的話,websocket連接就會自動斷開,所以這種方式還是有點問題,如果我頁面停留時間超過十分鐘而且又沒有資料互動的話,連接還是會斷開的,所以需要同時結合WebSocket心跳機制的方法.

4.2、WebSocket的心跳激活機制

心跳機制是每隔一段時間會向服務器發送一個資料包,告訴服務器自己還活著,同時客戶端會確認服務器端是否還活著,如果還活著的話,就會回傳一個資料包給客戶端來確定服務器端也還活著,否則的話,有可能是網路斷開連接了,需要重連,

WebSocket 長連接需要在弱網環境和網路暫時斷開的情況下,需要有一個穩定的重連機制,保證在網路不穩定的時候,客戶端和服務端能夠重連,繼續通信,

在nginx延長超時時間的基礎上,前端在超時時間內發心跳包,重繪再讀時間,前端具體實作見如下代碼(此處代碼包含了前端整個websocket的實作程序,其中紅色重點標注了發心跳包的內容):

// websocket連接
var websocket_connected_count = 0;
var onclose_connected_count = 0;
function newWebSocket(){
    var websocket = null;
    // 判斷當前環境是否支持websocket
    if(window.WebSocket){
        if(!websocket){
            var ws_url ="wss://"+domain+"/updatewebsocket";
            websocket = new WebSocket(ws_url);
        }
    }else{
        Tip("not support websocket");
    }
 
    // 連接成功建立的回呼方法
    websocket.onopen = function(e){
        heartCheck.reset().start();   // 成功建立連接后,重置心跳檢測
        Tip("connected successfully")
    }
    // 連接發生錯誤,連接錯誤時會繼續嘗試發起連接(嘗試5次)
    websocket.onerror = function() {
        console.log("onerror連接發生錯誤")
        websocket_connected_count++;
        if(websocket_connected_count <= 5){
            newWebSocket()
        }
    }
    // 接受到訊息的回呼方法
    websocket.onmessage = function(e){
        console.log("接受到訊息了")
        heartCheck.reset().start();    // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
        var message = e.data;
        if(message){
           //執行接收到訊息的操作,一般是重繪UI
        }
    }
 
    // 接受到服務端關閉連接時的回呼方法
    websocket.onclose = function(){
        Tip("onclose斷開連接");
    }
    // 監聽視窗事件,當視窗關閉時,主動斷開websocket連接,防止連接沒斷開就關閉視窗,server端報錯
    window.onbeforeunload = function(){
        websocket.close();
    }
 
    // 心跳檢測, 每隔一段時間檢測連接狀態,如果處于連接中,就向server端主動發送訊息,來重置server端與客戶端的最大連接時間,如果已經斷開了,發起重連,
    var heartCheck = {
        timeout: 55000,        // 9分鐘發一次心跳,比server端設定的連接時間稍微小一點,在接近斷開的情況下以通信的方式去重置連接時間,
        serverTimeoutObj: null,
        reset: function(){
            clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
        },
        start: function(){
            var self = this;
            this.serverTimeoutObj = setInterval(function(){
                if(websocket.readyState == 1){
                    console.log("連接狀態,發送訊息保持連接");
                    websocket.send("ping");
                    heartCheck.reset().start();    // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
                }else{
                    console.log("斷開狀態,嘗試重連");
                    newWebSocket();
                }
            }, this.timeout)
        }
    }
}

4.3、RabbitMQ消費端的必須捕捉例外

我遇到的Bug是對前端輸入沒有Emoj表情過濾導致JPA插入報錯,且消費者程式沒有try-catch捕捉例外,最終導致消費者程式例外終止,

4.4、WebSocket實作Web聊天室的多頁面跨面問題

一般能想到的有:

  • 通過IFrame

  • 通過web都做成單頁

  • 通過sharedworker(也解決不了多瀏覽器),但是不是所有瀏覽器都支持

4.5、使用websocket實作群聊

需要很多用戶(在不同的房間)進行實時聊天,也就是一個簡單的聊天室,這里用的是WebSocket實作,這里需要對每一個連接都指定兩個引數:用戶的userId和所加入的房間id(roomId);

這里是用到一個map<房間id, 用戶set>來保存房間對應的用戶連接串列,當有用戶進入一個房間的時候,就會先檢測房間是否存在,如果不存在那就新建一個空的用戶set,再加入本身到這個set中;這里需要考慮執行緒安全問題,因為用到的是一個hashMap,如同時又AB兩個用戶加入一個空房間,同時訪問friends為空,然后都會新建一個set再加入進去,那么可能會出現一個情況就是A檢測不存在房間,然后創建加入進去,B也同時檢測到不存在,也重新創建一個用戶set,這樣就會覆寫原來的set,也就是說A用戶就加入失敗

推薦閱讀:使用websocket實作群聊(多個群)


五、自定義HandshakeInterceptor,用于禁止未登錄用戶連接WebSocket

package cn.zifangsky.stompwebsocket.interceptor.websocket;
 
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
 
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
 
/**
 * 自定義{@link org.springframework.web.socket.server.HandshakeInterceptor},實作“需要登錄才允許連接WebSocket”
 *
 */
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
 
        if(loginUser != null){
            logger.debug(MessageFormat.format("用戶{0}請求建立WebSocket連接", loginUser.getUsername()));
            return true;
        }else{
            logger.error("未登錄系統,禁止連接WebSocket");
            return false;
        }
 
    }
 
    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
 
    }
 
}

參考鏈接:

Web 端訊息通知機制現實方案

訊息通知系統模型設計

Nginx代理webSocket時60s自動斷開, 怎么保持長連接

消費端channel主動斷開后,可能存在的bug

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/301254.html

標籤:其他

上一篇:什么是HTTP/2?HTTP/2和HTTP/1.1區別是什么?

下一篇:stm32啟動程序、cortex-m3架構、堆疊代碼位置、編譯匯編鏈接分析

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more