問題引入
相信很多人用過Netty寫過聊天室的簡單案例吧,可以說是模板代碼了,沒有結合業務,如果我們要做專案中的即時通訊模塊(IM),需要將用戶A發的訊息轉發給用戶B,將會不可避免的遇到一個問題:如何快速找到用戶B所建立的Channel (用戶 -> Channel 的映射)?圍繞我們的聊天業務,離線訊息又如何進行推送?一個用戶建立Channel之后,我們要想知道他有沒有未簽收的離線訊息,就必定要知道用戶標識,原則來講,我們又如何避免一個用戶重復創建Channel?
換言之,在IM業務中,我們要解決:Channel和用戶標識系結的問題,他們的映射關系是一對一的,
"系結"型別的訊息需要攜帶token
要想系結用戶標識,客戶端就必須在WebSocket建立之后(Channel建立之后),立馬發送一條系結型別的訊息給后端,該訊息必須要攜帶用戶唯一標識,后端建立并維護Channel和用戶的一對一映射關系,那么系結型別的訊息,攜帶的用戶標識是什么?客戶端本地存盤的userid?其實這并不合理,應該攜帶token!(我這里用的jwt,jwt里面的載荷有userid),
為什么攜帶token更加合理?因為token可以代表用戶的一次有效的登錄狀態,我們可以在后端驗證用戶登錄狀態有效性(嚴格的可以做單點登錄的驗證),并且可以查出用戶的身份資訊,包括userid,系結型別的訊息如果攜帶userid,之所以說不合理,是因為:假設用戶id就是自增長的unsigned int,那么userid就很容易猜到,就是一個純數字嘛,那么拿一個純數字,就可以隨便跟我后端建立websocket連接,對后端來說,必然不安全,
離線訊息和訊息簽收
什么是離線訊息?比如說用戶A給用戶B發送一條訊息,后端轉發的時候發現用戶B不在線(換言之,就是沒有建立WebSocket連接,沒有建立Channel),那么這條訊息對于B來說就是離線訊息,
什么是簽收?用戶A給用戶B發送一條訊息,B同時也在線,他就能收到這一條訊息,那么這條訊息就是“已簽收”,假如,此時B不在線,那他肯定就沒辦法收到,就稱這條訊息“未簽收”,
如果B不在線,我們顯然沒有辦法立馬將訊息轉發給B,需要將訊息暫存到資料庫,當用戶B上線(Channel建立之后),我們去資料庫查詢他是否有未簽收的訊息,如果有,則將未簽收的訊息立刻推送給B,
這里,我們遇到2個問題,
1、我們要去資料查詢用戶未簽收的訊息,就必須知道這是哪個用戶,(Channel -> 用戶),我們前面討論的 Channel和用戶標識的雙向系結 就解決了這個問題,
2、要想知道訊息有沒有成功被B收到,我們就必須給訊息(資料庫表)增設一個簽收狀態欄位,同時用戶在成功收到訊息之后,要立馬告訴后端,該訊息已經簽收了,所以我們還有一種型別的訊息,稱為 “簽收”型別的訊息
業務層面的訊息型別和訊息模型定義
除了我們上面討論到的
1、“系結”型別的訊息:攜帶token
2、“簽收”型別的訊息:攜帶訊息id,分為單簽和多簽,為了方便,如果是多簽,我們與前端約定,將多個訊息id之間以逗號作為分隔符拼接成字串
之外,我們的聊天業務中還涉及到其他一些型別的訊息,比如說:聊天訊息,好友申請訊息,拉取新好友型別的訊息,以及 心跳型別的訊息,等等,下面我們再來分析一下聊天型別的訊息,
3、“聊天”型別的訊息
這個基于業務,可以分為:單聊和群聊,根據訊息內容的不同,又可以分為:文字訊息、圖片訊息、語音訊息等,這里簡單起見,我們以 單聊、文字訊息 為例子進行討論,這種型別的訊息,需要攜帶哪些資料?顯然接收者的userid和文字訊息的內容(String型別)是必須的,如果是群聊,還得攜帶上groupid,
每種型別的訊息,攜帶的資料可能都不同,顯然需要定義泛型,見下面:
/**
* @author passerbyYSQ
* @create 2021-02-05 22:31
*/
public class MsgModel<T> implements Serializable {
// 訊息型別
private Integer action;
// 訊息物體
private T data;
public Integer getAction() {
return action;
}
public void setAction(Integer action) {
this.action = action;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
為了規范定義訊息型別,我們另外定義列舉類:
/**
*
* @Description: 發送訊息的動作 列舉
*/
public enum MsgActionEnum {
BIND(1, "第一次(或重連)初始化連接"),
CHAT(2, "聊天訊息"),
SIGNED(3, "訊息簽收"),
KEEP_ALIVE(4, "心跳訊息"),
PULL_FRIEND(5, "拉取好友"),
FRIEND_REQUEST(6, "請求添加為好友");
public final Integer type;
public final String content;
MsgActionEnum(Integer type, String content){
this.type = type;
this.content = content;
}
public Integer getType() {
return type;
}
}
核心代碼
維護用戶標識和Channel映射關系的UserChannelRepository
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.utils.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 用戶id和channel關聯的倉庫
*
* @author passerbyYSQ
* @create 2021-02-05 23:20
*/
public class UserChannelRepository {
private final static Logger logger = LoggerFactory.getLogger(UserChannelRepository.class);
public static ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static Map<String, Channel> USER_CHANNEL = new ConcurrentHashMap<>();
public static synchronized void bind(String userId, Channel channel) {
// 此時channel一定已經在ChannelGroup中了
// 之前已經系結過了,移除并釋放掉之前系結的channel
if (USER_CHANNEL.containsKey(userId)) { // map userId --> channel
Channel oldChannel = USER_CHANNEL.get(userId);
CHANNEL_GROUP.remove(oldChannel);
oldChannel.close();
}
// 雙向系結
// channel -> userId
AttributeKey<String> key = AttributeKey.valueOf("userId");
channel.attr(key).set(userId);
// userId -> channel
USER_CHANNEL.put(userId, channel);
}
/**
* 從通道中獲取userId,只要userId和channel系結周,這個方法就一定能獲取的到
* @param channel
* @return
*/
public static String getUserId(Channel channel) {
AttributeKey<String> key = AttributeKey.valueOf("userId");
return channel.attr(key).get();
}
public static void add(Channel channel) {
CHANNEL_GROUP.add(channel);
}
public static synchronized void remove(Channel channel) {
String userId = getUserId(channel);
// userId有可能為空,可能chanelActive之后,由于前端原因(或者網路原因)沒有及時系結userId,
// 此時netty認為channelInactive了,就移除通道,這時userId就是null
if (!StringUtils.isEmpty(userId)) {
USER_CHANNEL.remove(userId); // map
}
CHANNEL_GROUP.remove(channel);
// 關閉channel
channel.close();
}
/**
* 判斷用戶是否在線
* map和channelGroup中均能找得到對應的channel說明用戶在線
* @return 在線就回傳對應的channel,不在線回傳null
*/
public static Channel isOnline(String userId) {
Channel channel = USER_CHANNEL.get(userId); // map
if (ObjectUtils.isEmpty(channel)) {
return null;
}
return CHANNEL_GROUP.find(channel.id());
}
/**
* 訊息推送
* @param receiverId
* @param msgModel
*/
public static void pushMsg(String receiverId, MsgModel msgModel) {
Channel receiverChannel = isOnline(receiverId);
if (!ObjectUtils.isEmpty(receiverChannel)) {
// 在線,就推送;離線,不做處理
ObjectMapper mapper = SpringUtils.getBean(ObjectMapper.class);
String json = null;
try {
json = mapper.writeValueAsString(msgModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
TextWebSocketFrame frame = new TextWebSocketFrame(json);
receiverChannel.writeAndFlush(frame);
} else {
// 離線狀態
logger.info("{} 用戶離線", receiverId);
}
}
public synchronized static void print() {
logger.info("所有通道的長id:");
for (Channel channel : CHANNEL_GROUP) {
logger.info(channel.id().asLongText());
}
logger.info("userId -> channel 的映射:");
for (Map.Entry<String, Channel> entry : USER_CHANNEL.entrySet()) {
logger.info("userId: {} ---> channelId: {}", entry.getKey(), entry.getValue().id().asLongText());
}
}
}
業務Handler:TextMsgHandler
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import net.ysq.webchat.netty.entity.MsgActionEnum;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.netty.entity.SingleChatMsgRequest;
import net.ysq.webchat.netty.entity.SingleChatMsgResponse;
import net.ysq.webchat.po.ChatMsg;
import net.ysq.webchat.service.ChatMsgService;
import net.ysq.webchat.utils.JwtUtils;
import net.ysq.webchat.utils.RedisUtils;
import net.ysq.webchat.utils.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.util.HtmlUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 用于處理文本訊息的handler
*
* @author passerbyYSQ
* @create 2021-02-05 21:23
*/
public class TextMsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// json串
logger.info("接收到的文本訊息:{}", msg.text());
ChatMsgService chatMsgService = (ChatMsgService) SpringUtils.getBean("chatMsgServiceImpl");
RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
ObjectMapper objectMapper = SpringUtils.getBean(ObjectMapper.class);
// 訊息型別
JsonNode rootNode = objectMapper.readTree(msg.text());
Integer action = rootNode.get("action").asInt();
// 取出資料部分,不同的訊息型別,資料部分對應的泛型不一樣
JsonNode dataNode = rootNode.get("data");
Channel channel = ctx.channel();
// 判斷訊息型別
// 根據不同的訊息型別,處理不同的業務
if (action.equals(MsgActionEnum.BIND.type)) {
// 1、當websocket第一次open的時候,初始化channel,把channel和userId關聯起來
// 如果是CONNECT型別,與前端約定,data部分是token
String token = objectMapper.treeToValue(dataNode, String.class);
// 先驗證是否過期,如果過期會拋出例外,全域捕獲,之后的代碼不會執行
JwtUtils.verifyJwt(token, JwtUtils.DEFAULT_SECRET);
// 如果沒有拋出例外,表示token有效,則在Redis中尋找對應的登錄資訊
String userId = JwtUtils.getClaimByKey(token, "userId");
String redisToken = (String) redisUtils.get("user:" + userId);
if (!StringUtils.isEmpty(redisToken) && token.equals(redisToken)) {
UserChannelRepository.bind(userId, channel);
// 查詢是否有未簽收的訊息,如果有,就一次性全部推送(并不是逐條推送)
List<SingleChatMsgResponse> unsignedMsgList = chatMsgService.getUnsignedMsg(userId);
if (unsignedMsgList.size() > 0) { // 不為空才推送
MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
model.setAction(MsgActionEnum.CHAT.type);
model.setData(unsignedMsgList);
UserChannelRepository.pushMsg(userId, model);
}
}
// 測驗
UserChannelRepository.print();
} else if (action.equals(MsgActionEnum.CHAT.type)) {
// 2、聊天型別的訊息,把訊息保存到資料庫,同時標記訊息狀態為[未簽收]
SingleChatMsgRequest data = objectMapper.treeToValue(dataNode, SingleChatMsgRequest.class);
// 由于是通過websocket,而并非http協議,所以mica-xss的攔截器沒有作用,此處需要我們自己轉義
data.setContent(HtmlUtils.htmlEscape(data.getContent(), "UTF-8"));
// 對于聊天訊息,channel所系結的user是發送者
String senderId = UserChannelRepository.getUserId(channel);
// 如果是空的,說明系結失敗了(可能是token過期了),不做處理
if (!StringUtils.isEmpty(senderId)) {
// 往訊息表插入資料
ChatMsg chatMsg = chatMsgService.saveMsg(senderId, data);
// 構建訊息物體
MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
model.setAction(MsgActionEnum.CHAT.type);
List<SingleChatMsgResponse> unsignedMsgList = new ArrayList<>();
unsignedMsgList.add(new SingleChatMsgResponse(chatMsg));
model.setData(unsignedMsgList);
// 推送訊息
UserChannelRepository.pushMsg(data.getReceiverId(), model);
}
} else if (action.equals(MsgActionEnum.SIGNED.type)) {
// 3、簽收訊息的型別,針對具體的訊息進行簽收,修改資料庫對應的訊息狀態為[已簽收]
// 簽收狀態并非是指用戶有沒有讀了訊息,而是訊息是否已經被推送到達用戶的手機設備
// 在簽收型別的訊息中,代表需要簽收的訊息的id,多個id之間用,分隔
String msgIdsStr = objectMapper.treeToValue(dataNode, String.class);
// 對于要簽收型別訊息,只有是我收到的訊息,我才能簽收,所以我是接收者
String receiverId = UserChannelRepository.getUserId(channel);
if (!StringUtils.isEmpty(msgIdsStr)) {
String[] msgIds = msgIdsStr.split(",");
if (!ObjectUtils.isEmpty(msgIds)) {
chatMsgService.signMsg(receiverId, msgIds);
}
}
} else if (action.equals(MsgActionEnum.KEEP_ALIVE.type)) {
// 4、心跳型別的訊息
// 假如客戶端行程被正常退出,websocket主動斷開連接,那么服務端對應的channel是會釋放的
// 但是如果客戶端關閉網路后,重啟網路,會導致服務端會再新建一個channel
// 而舊的channel已經沒用了,但是并沒有被移除
// logger.info("收到來自于channel {} 的心跳包", channel.id().asLongText());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UserChannelRepository.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
UserChannelRepository.remove(ctx.channel());
logger.info("剩余通道個數:{}", UserChannelRepository.CHANNEL_GROUP.size());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
UserChannelRepository.remove(ctx.channel());
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/259988.html
標籤:其他
