原文轉載自「劉悅的技術博客」https://v3u.cn/a_id_202
“表達欲”是人類成長史上的強大“源動力”,恩格斯早就直截了當地指出,處在蒙昧時代即低級階段的人類,“以果實、堅果、根作為食物;音節清晰的語言的產生是這一時期的主要成就”,而在網路時代人們的表達欲往往更容易被滿足,因為有聊天軟體的存在,通常意義上,聊天大抵都基于兩種形式:群聊和單聊,群聊或者群組聊天我們可以理解為聊天室,可以有人數上限,而單聊則可以認為是上限為2個人的特殊聊天室,
為了開發高質量的聊天系統,開發者應該具備客戶機和服務器如何通信的基本知識,在聊天系統中,客戶端可以是移動應用程式(C端)或web應用程式(B端),客戶端之間不直接通信,相反,每個客戶端都連接到一個聊天服務,該服務支撐雙方通信的功能,所以該服務在業務上必須支持的最基本功能:
1.能夠實時接收來自其他客戶端的資訊,
2.能夠將每條資訊實時推送給收件人,
當客戶端打算啟動聊天時,它會使用一個或多個網路協議連接聊天服務,對于聊天服務,網路協議的選擇至關重要,這里,我們選擇Tornado框架內置Websocket協議的介面,簡單而又方便,安裝tornado6.1
pip3 install tornado==6.1
隨后撰寫程式啟動檔案main.py:
import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
import redis
import threading
import asyncio
# 用戶串列
users = []
# websocket協議
class WB(tornado.websocket.WebSocketHandler):
# 跨域支持
def check_origin(self,origin):
return True
# 開啟鏈接
def open(self):
users.append(self)
# 接收訊息
def on_message(self,message):
self.write_message(message['data'])
# 斷開
def on_close(self):
users.remove(self)
# 建立torando實體
app = tornado.web.Application(
[
(r'/wb/',WB)
],debug=True
)
if __name__ == '__main__':
# 宣告服務器
http_server_1 = tornado.httpserver.HTTPServer(app)
# 監聽埠
http_server_1.listen(8000)
# 開啟事件回圈
tornado.ioloop.IOLoop.instance().start()
如此,就在短時間搭建起了一套websocket協議服務,每一次有客戶端發起websocket連接請求,我們都會將它添加到用戶串列中,等待用戶的推送或者接收資訊的動作,
下面我們需要通過某種形式將訊息的發送方和接收方聯系起來,以達到“聊天”的目的,這里選擇Redis的發布訂閱模式(pubsub),以一個demo來實體說明,server.py
import redis
r = redis.Redis()
r.publish("test",'hello')
隨后撰寫 client.py:
import redis
r = redis.Redis()
ps = r.pubsub()
ps.subscribe('test')
for item in ps.listen():
if item['type'] == 'message':
print(item['data'])
可以這么理解:訂閱者(listener)負責訂閱頻道(channel);發送者(publisher)負責向頻道(channel)發送二進制的字串訊息,然后頻道收到訊息時,推送給訂閱者,
頻道不僅可以聯系發布者和訂閱者,同時,也可以利用頻道進行“訊息隔離”,即不同頻道的訊息只會給訂閱該頻道的用戶進行推送:

根據發布者訂閱者邏輯,改寫main.py:
import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
import redis
import threading
import asyncio
# 用戶串列
users = []
# 頻道串列
channels = ["channel_1","channel_2"]
# websocket協議
class WB(tornado.websocket.WebSocketHandler):
# 跨域支持
def check_origin(self,origin):
return True
# 開啟鏈接
def open(self):
users.append(self)
# 接收訊息
def on_message(self,message):
self.write_message(message['data'])
# 斷開
def on_close(self):
users.remove(self)
# 基于redis監聽發布者發布訊息
def redis_listener(loop):
asyncio.set_event_loop(loop)
async def listen():
r = redis.Redis(decode_responses=True)
# 宣告pubsb實體
ps = r.pubsub()
# 訂閱聊天室頻道
ps.subscribe(["channel_1","channel_2"])
# 監聽訊息
for message in ps.listen():
print(message)
# 遍歷鏈接上的用戶
for user in users:
print(user)
if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):
user.write_message(message["data"])
future = asyncio.gather(listen())
loop.run_until_complete(future)
# 介面 發布資訊
class Msg(tornado.web.RequestHandler):
# 重寫父類方法
def set_default_headers(self):
# 設定請求頭資訊
print("開始設定")
# 域名資訊
self.set_header("Access-Control-Allow-Origin","*")
# 請求資訊
self.set_header("Access-Control-Allow-Headers","x-requested-with")
# 請求方式
self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")
# 發布資訊
async def post(self):
data = self.get_argument("data",None)
channel = self.get_argument("channel","channel_1")
print(data)
# 發布
r = redis.Redis()
r.publish(channel,data)
return self.write("ok")
# 建立torando實體
app = tornado.web.Application(
[
(r'/send/',Msg),
(r'/wb/',WB)
],debug=True
)
if __name__ == '__main__':
loop = asyncio.new_event_loop()
# 單執行緒啟動訂閱者服務
threading.Thread(target=redis_listener,args=(loop,)).start()
# 宣告服務器
http_server_1 = tornado.httpserver.HTTPServer(app)
# 監聽埠
http_server_1.listen(8000)
# 開啟事件回圈
tornado.ioloop.IOLoop.instance().start()
這里假設默認有兩個頻道,邏輯是這樣的:由前端控制websocket鏈接用戶選擇將訊息發布到那個頻道上,同時每個用戶通過前端cookie的設定具備頻道屬性,當具備頻道屬性的用戶對該頻道發布了一條訊息之后,所有其他具備該頻道屬性的用戶通過redis進行訂閱后主動推送剛剛發布的訊息,而頻道的推送只匹配訂閱該頻道的用戶,達到訊息隔離的目的,
需要注意的一點是,通過執行緒啟動redis訂閱服務時,需要將當前的loop實體傳遞給協程物件,否則在訂閱方法內將會獲取不到websocket實體,報這個錯誤:
IOLoop.current() doesn't work in non-main
這是因為Tornado底層基于事件回圈ioloop,而同步框架模式的Django或者Flask則沒有這個問題,
下面撰寫前端代碼,這里我們使用時下最流行的vue3.0框架,撰寫chat.vue:
<template>
<div>
<h1>聊天視窗</h1>
<van-tabs v-model:active="active" @click="change_channel">
<van-tab title="客服1號">
<table>
<tr v-for="item,index in msglist" :key="index">
{{ item }}
</tr>
</table>
</van-tab>
<van-tab title="客服2號">
<table>
<tr v-for="item,index in msglist" :key="index">
{{ item }}
</tr>
</table>
</van-tab>
</van-tabs>
<van-field label="聊天資訊" v-model="msg" />
<van-button color="gray" @click="commit">發送</van-button>
</div>
</template>
<script>
export default {
data() {
return {
auditlist:[],
//聊天記錄
msglist:[],
msg:"",
websock: null, //建立的連接
lockReconnect: false, //是否真正建立連接
timeout: 3 * 1000, //30秒一次心跳
timeoutObj: null, //外層心跳倒計時
serverTimeoutObj: null, //內層心跳檢測
timeoutnum: null, //斷開 重連倒計時
active:0,
channel:"channel_1"
}
},
methods:{
//切換頻道
change_channel:function(){
if(this.active === 0){
this.channel = "channel_1";
var name = "channel";
var value = "channel_1";
}else{
this.channel = "channel_2";
var name = "channel";
var value = "channel_2";
}
//清空聊天記錄
this.msglist = [];
var d = new Date();
d.setTime(d.getTime() + (24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = name + "=" + value + "; " + expires;
this.reconnect();
},
initWebSocket() {
//初始化weosocket
const wsuri = "ws://localhost:8000/wb/";
this.websock = new WebSocket(wsuri);
this.websock.onopen = this.websocketonopen;
this.websock.onmessage = this.websocketonmessage;
this.websock.onerror = this.websocketonerror;
this.websock.onclose = this.websocketclose;
},
reconnect() {
//重新連接
var that = this;
if (that.lockReconnect) {
// 是否真正建立連接
return;
}
that.lockReconnect = true;
//沒連接上會一直重連,設定延遲避免請求過多
that.timeoutnum && clearTimeout(that.timeoutnum);
// 如果到了這里斷開重連的倒計時還有值的話就清除掉
that.timeoutnum = setTimeout(function() {
//然后新連接
that.initWebSocket();
that.lockReconnect = false;
}, 5000);
},
reset() {
//重置心跳
var that = this;
//清除時間(清除內外兩個心跳計時)
clearTimeout(that.timeoutObj);
clearTimeout(that.serverTimeoutObj);
//重啟心跳
that.start();
},
start() {
//開啟心跳
var self = this;
self.timeoutObj && clearTimeout(self.timeoutObj);
// 如果外層心跳倒計時存在的話,清除掉
self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);
// 如果內層心跳檢測倒計時存在的話,清除掉
self.timeoutObj = setTimeout(function() {
// 重新賦值重新發送 進行心跳檢測
//這里發送一個心跳,后端收到后,回傳一個心跳訊息,
if (self.websock.readyState == 1) {
//如果連接正常
// self.websock.send("heartCheck");
} else {
//否則重連
self.reconnect();
}
self.serverTimeoutObj = setTimeout(function() {
// 在三秒一次的心跳檢測中如果某個值3秒沒回應就關掉這次連接
//超時關閉
// self.websock.close();
}, self.timeout);
}, self.timeout);
// 3s一次
},
websocketonopen(e) {
//連接建立之后執行send方法發送資料
console.log("成功");
// this.websock.send("123");
// this.websocketsend(JSON.stringify(actions));
},
websocketonerror() {
//連接建立失敗重連
console.log("失敗");
this.initWebSocket();
},
websocketonmessage(e) {
console.log(e);
//資料接收
//const redata = JSON.parse(e.data);
const redata = e.data;
//累加
this.msglist.push(redata);
console.log(redata);
},
websocketsend(Data) {
//資料發送
this.websock.send(Data);
},
websocketclose(e) {
//關閉
this.reconnect()
console.log("斷開連接", e);
},
//提交表單
commit:function(){
//發送請求
this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{
console.log(data);
});
},
},
mounted(){
//連接后端websocket服務
this.initWebSocket();
var d = new Date();
d.setTime(d.getTime() + (24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = "channel" + "=" + "channel_1" + "; " + expires;
}
}
</script>
<style scoped>
@import url("../assets/style.css");
.chatbox{
color:black;
}
.mymsg{
background-color:green;
}
</style>
這里前端在線客戶端定期向狀態服務器發送心跳事件,如果服務端在特定時間內(例如x秒)從客戶端接收到心跳事件,則認為用戶處于聯機狀態,否則,它將處于脫機狀態,脫機后在閾值時間內可以進行重新連接的動作,同時利用vant框架的標簽頁可以同步切換頻道,切換后將頻道標識寫入cookie,便于后端服務識別后匹配推送,
效果是這樣的:

誠然,功能業已實作,但是如果我們處在一個高并發場景之下呢?試想一下如果一個頻道有10萬人同時在線,每秒有100條新訊息,那么后臺tornado的websocket服務推送頻率是100w*10/s = 1000w/s ,
這樣的系統架構如果不做負載均衡的話,很難抗住壓力,那么瓶頸在哪里呢?沒錯,就是資料庫redis,這里我們需要異步redis庫aioredis的幫助:
pip3 install aioredis
aioredis通過協程異步操作redis讀寫,避免了io阻塞問題,使訊息的發布和訂閱操作非阻塞,
此時,可以新建一個異步訂閱服務檔案main_with_aioredis.py:
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout
之后主要的修改邏輯是,通過aioredis異步建立redis鏈接,并且異步訂閱多個頻道,隨后通過原生協程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注冊訂閱消費的異步任務reader:
async def setup():
r = await aioredis.from_url("redis://localhost", decode_responses=True)
pubsub = r.pubsub()
print(pubsub)
await pubsub.subscribe("channel_1","channel_2")
#asyncio.ensure_future(reader(pubsub))
asyncio.create_task(reader(pubsub))
在訂閱消費方法中,異步監聽所訂閱頻道中的發布資訊,同時和之前的同步方法一樣,比對用戶的頻道屬性并且進行按頻道推送:
async def reader(channel: aioredis.client.PubSub):
while True:
try:
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
for user in users:
if user.get_cookie("channel") == message["channel"]:
user.write_message(message["data"])
await asyncio.sleep(0.01)
except asyncio.TimeoutError:
pass
最后,利用tornado事件回圈IOLoop傳遞中執行回呼方法,將setup方法加入到事件回呼中:
if __name__ == '__main__':
# 監聽埠
application.listen(8000)
loop = IOLoop.current()
loop.add_callback(setup)
loop.start()
完整的異步訊息發布、訂閱、推送服務改造 main_aioredis.py:
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout
users = []
# websocket協議
class WB(tornado.websocket.WebSocketHandler):
# 跨域支持
def check_origin(self,origin):
return True
# 開啟鏈接
def open(self):
users.append(self)
# 接收訊息
def on_message(self,message):
self.write_message(message['data'])
# 斷開
def on_close(self):
users.remove(self)
class Msg(web.RequestHandler):
# 重寫父類方法
def set_default_headers(self):
# 設定請求頭資訊
print("開始設定")
# 域名資訊
self.set_header("Access-Control-Allow-Origin","*")
# 請求資訊
self.set_header("Access-Control-Allow-Headers","x-requested-with")
# 請求方式
self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")
# 發布資訊
async def post(self):
data = self.get_argument("data",None)
channel = self.get_argument("channel","channel_1")
print(data)
# 發布
r = await aioredis.from_url("redis://localhost", decode_responses=True)
await r.publish(channel,data)
return self.write("ok")
async def reader(channel: aioredis.client.PubSub):
while True:
try:
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
for user in users:
if user.get_cookie("channel") == message["channel"]:
user.write_message(message["data"])
await asyncio.sleep(0.01)
except asyncio.TimeoutError:
pass
async def setup():
r = await aioredis.from_url("redis://localhost", decode_responses=True)
pubsub = r.pubsub()
print(pubsub)
await pubsub.subscribe("channel_1","channel_2")
#asyncio.ensure_future(reader(pubsub))
asyncio.create_task(reader(pubsub))
application = web.Application([
(r'/send/',Msg),
(r'/wb/', WB),
],debug=True)
if __name__ == '__main__':
# 監聽埠
application.listen(8000)
loop = IOLoop.current()
loop.add_callback(setup)
loop.start()
從程式設計角度上講,充分利用了協程的異步執行思想,更加地絲滑流暢,
結語:實踐操作來看,Redis發布訂閱模式,非常契合這種實時(websocket)通信聊天系統的場景,但是發布的訊息如果沒有對應的頻道或者消費者,訊息則會被丟棄,假如我們在生產環境在消費的時候,突然斷網,導致其中一個訂閱者掛掉了一段時間,那么當它重新連接上的時候,中間這一段時間產生的訊息也將不會存在,所以如果想要保證系統的健壯性,還需要其他服務來設計高可用的實時存盤方案,不過那就是另外一個故事了,最后奉上專案地址,與眾鄉親同饗:https://github.com/zcxey2911/tornado_redis_vue3_chatroom
原文轉載自「劉悅的技術博客」 https://v3u.cn/a_id_202
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/390623.html
標籤:其他
