
文章目錄
- 環境配置與基本知識
- redis.hpp
- redis.cpp
- chatservice修改
- 從redis訊息佇列中獲取訂閱的訊息
環境配置與基本知識
C++搭建集群聊天室(十七):ngnix簡介及tcp負載均衡配置
Redis環境搭建與配置
hiredis從安裝到實操,一條龍服務
redis事務處理機制,但當涉獵
了解更多 redis 相關知識:我的redis專欄
上面該看的看完了,咱往下可就直接上碼啦!!!
這次的改動會有有點大,
redis.hpp
愿意放哪兒放哪兒,我覺著吧,怎么說redis也是個資料庫,就放 db 檔案夾下吧,
#ifndef REDIS_H
#define REDIS_H
#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;
class Redis
{
public:
Redis();
~Redis();
// 連接redis服務器
bool connect();
// 向redis指定的通道channel發布訊息
bool publish(int channel, string message);
// 向redis指定的通道subscribe訂閱訊息
bool subscribe(int channel);
// 向redis指定的通道unsubscribe取消訂閱訊息
bool unsubscribe(int channel);
// 在獨立執行緒中接收訂閱通道中的訊息
void observer_channel_message();
// 初始化向業務層上報通道訊息的回呼物件
void init_notify_handler(function<void(int, string)> fn);
private:
// hiredis同步背景關系物件,負責publish訊息
redisContext *_publish_context;
// hiredis同步背景關系物件,負責subscribe訊息
redisContext *_subcribe_context;
// 回呼操作,收到訂閱的訊息,給service層上報
function<void(int, string)> _notify_message_handler;
};
#endif
redis.cpp
這一套,可以在做輕量級集群服務器間通信用,封裝好了的,
#include "redis.hpp"
#include <iostream>
using namespace std;
Redis::Redis():_publish_context(nullptr), _subcribe_context(nullptr){}
Redis::~Redis(){
if (_publish_context != nullptr){
redisFree(_publish_context);
}
if (_subcribe_context != nullptr){
redisFree(_subcribe_context);
}
}
bool Redis::connect(){
// 負責publish發布訊息的背景關系連接
_publish_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _publish_context){
cerr << "connect redis failed!" << endl;
return false;
}
// 負責subscribe訂閱訊息的背景關系連接
_subcribe_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _subcribe_context){
cerr << "connect redis failed!" << endl;
return false;
}
// 在單獨的執行緒中,監聽通道上的事件,有訊息給業務層進行上報
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
// 向redis指定的通道channel發布訊息
bool Redis::publish(int channel, string message){
redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
if (nullptr == reply){
cerr << "publish command failed!" << endl;
return false;
}
freeReplyObject(reply);
return true;
}
// 向redis指定的通道subscribe訂閱訊息
bool Redis::subscribe(int channel){
// SUBSCRIBE命令本身會造成執行緒阻塞等待通道里面發生訊息,這里只做訂閱通道,不接收通道訊息
// 通道訊息的接收專門在observer_channel_message函式中的獨立執行緒中進行
// 只負責發送命令,不阻塞接收redis server回應訊息,否則和notifyMsg執行緒搶占回應資源
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){
cerr << "subscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以回圈發送緩沖區,直到緩沖區資料發送完畢(done被置為1)
int done = 0;
while (!done){
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr << "subscribe command failed!" << endl;
return false;
}
}
// redisGetReply
return true;
}
// 向redis指定的通道unsubscribe取消訂閱訊息
bool Redis::unsubscribe(int channel){
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){
cerr << "unsubscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以回圈發送緩沖區,直到緩沖區資料發送完畢(done被置為1)
int done = 0;
while (!done){
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr << "unsubscribe command failed!" << endl;
return false;
}
}
return true;
}
// 在獨立執行緒中接收訂閱通道中的訊息
void Redis::observer_channel_message(){
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){
// 訂閱收到的訊息是一個帶三元素的陣列
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
// 給業務層上報通道上發生的訊息
_notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
void Redis::init_notify_handler(function<void(int,string)> fn){
this->_notify_message_handler = fn;
}
chatservice修改
頭檔案里面自行修改吧,這里放出源檔案的修改范圍,
建構式中連接上redis:
// 連接redis服務器
if (_redis.connect()){
// 設定上報訊息的回呼
_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));
}
登錄成功后,向redis訊息佇列進行訂閱:
// id用戶登錄成功后,向redis訂閱channel(id)
_redis.subscribe(id);
用戶注銷之后,取消訂閱:
// 用戶注銷,相當于就是下線,在redis中取消訂閱通道
_redis.unsubscribe(userid);
(客戶端里以外掉線也給它來上這么一下)
單聊:
//一對一聊天
void ChatService::onechat(const TcpConnectionPtr &conn,json &js,Timestamp time){
// cout<<js<<endl;
int toid = js["toid"].get<int>(); //這里bug
// bool userstate = false;
//開辟鎖的作用域
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(toid);
if(it != _userConnMap.end()){
//用戶在線,轉發訊息
it->second->send(js.dump());
return;
}
}
// 查詢toid是否在線
User user = _usermodel.query(toid);
if (user.getstate() == "online"){
_redis.publish(toid, js.dump());
return;
}
// toid不在線,存盤離線訊息
_offlineMsgmodel.insert(toid, js.dump());
}
群聊:
// 群組聊天業務
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid);
lock_guard<mutex> lock(_connMutex);
for (int id : useridVec){
auto it = _userConnMap.find(id);
if (it != _userConnMap.end()){
// 轉發群訊息
it->second->send(js.dump());
}
else{
User user = _usermodel.query(id);
if (user.getstate() == "online"){
_redis.publish(id, js.dump());
}
else{
// 存盤離線群訊息
_offlineMsgmodel.insert(id, js.dump());
}
}
}
}
從redis訊息佇列中獲取訂閱的訊息
void ChatService::handleRedisSubscribeMessage(int userid, string msg){
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end())
{
it->second->send(msg);
return;
}
// 存盤該用戶的離線訊息
_offlineMsgmodel.insert(userid, msg);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/295369.html
標籤:其他
