場景
目前做了一個介面:邀請用戶成為某課程的管理員,于是我感覺有能在用戶被邀請之后能有個立馬通知他本人的機(類似微博、朋友圈被點贊后就有立馬能收到通知一樣),于是就閑來沒事搞了一套,
?
涉及技術堆疊
- Springboot
- Websocket 協議
- JWT
- (非必要)RabbitMQ 訊息中間件
Websocket 協議
?推薦閱讀:Websocket 協議簡介
WebSocket協議是基于TCP的一種新的網路協議,它實作了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送資訊給客戶端,
為什么使用Websocket?
因為普通的http協議一個最大的問題就是:通信只能由客戶端發起,服務器回應(半雙工),而我們希望可以全雙工通信,
?
因此一句話總結就是:建立websocket(以下簡稱為ws)連接是為了讓服務器主動向前端發訊息,而無需等待前端的發起請求呼叫介面,
業務邏輯
我們現在有:
用戶A用戶BSpringboot服務器- 場景:
用戶A呼叫介面邀請用戶B成為課程成員 - 涉及資料庫
MySQL的資料表:course_member_invitation,記錄課程邀請記錄,其形式如下(忽略時間等列):
| id | course_id | account_id | admin_id | is_accepted | bind_message_id |
|---|---|---|---|---|---|
| 邀請id | 課程id | 受邀用戶id | 邀請人id(因其本身為課程管理員) | 受邀用戶是否接受了邀請 | 系結的訊息id |
course_message,記錄訊息記錄,其形式如下(忽略時間等列):
| id | type | account_id | source_id | is_read | is_ignored |
|---|---|---|---|---|---|
| 訊息id | 訊息型別 | 收信人用戶id | 發信人用戶id | 是否已讀 | 收信人是否忽略 |
- (圖中沒有體現)
course_message_type,記錄訊息型別,其形式如下
| id | name | description |
|---|---|---|
| 訊息型別id | 訊息型別名稱 | 描述 |
- 涉及
RabbitMQ(因不是重點,所以此處暫不討論,最后一章敘述)

業務步驟主要涉及兩個方法addCourseMemberInvitation與sendMessage和一個組件CourseMemberInvitationListener,分別做:
addCourseMemberInvitation:
用戶A呼叫介面,邀請用戶B成為某門課程的管理員Springboot服務器收到請求,將這一請求生成邀請記錄、訊息記錄,寫入下表:course_member_invitationcourse_message
- 寫入DB后,呼叫
sendMessage處理發送訊息的業務, - 將執行的結果回傳給
用戶A
sendMessage:
- 將訊息記錄放入
RabbitMQ中對應的訊息佇列,
CourseMemberInvitationListener:
- 持續監聽其系結的訊息佇列
- 一旦訊息佇列中有新訊息,就嘗試通過ws連接發送訊息,
- 若
用戶B在線,則可發送, - 否則,則消費掉該訊息,待用戶上線后從DB中讀入,
- 若
在Springboot中配置Websocket
pom.xml檔案
<!-- WebSocket相關 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Websocket Server組件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/**
* 進行前后端即時通信
* https://blog.csdn.net/qq_33833327/article/details/105415393
* session: https://www.codeleading.com/article/6950456772/
* @author jojo
*/
@ServerEndpoint(value = "https://www.cnblogs.com/ws/{uid}",configurator = WebSocketConfig.class) //回應路徑為 /ws/{uid} 的連接請求
@Component
public class WebSocketServer {
/**
* 靜態變數,用來記錄當前在線連接數,應該把它設計成執行緒安全的
*/
private static int onlineCount = 0;
/**
* concurrent 包的執行緒安全Set,用來存放每個客戶端對應的 myWebSocket物件
* 根據 用戶id 來獲取對應的 WebSocketServer 示例
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 與某個客戶端的連接會話,需要通過它來給客戶端發送資料
*/
private Session session;
/**
* 用戶id
*/
private String accountId ="";
/**
* logger
*/
private static Logger LOGGER = LoggerUtil.getLogger();
/**
* 連接建立成功呼叫的方法
*
* @param session
* @param uid 用戶id
*/
@OnOpen
public void onOpen(Session session, @PathParam("uid") String uid) {
this.session = session;
//設定超時,同httpSession
session.setMaxIdleTimeout(3600000);
this.accountId = uid;
//存盤websocket連接,存在記憶體中,若有同一個用戶同時在線,也會存,不會覆寫原有記錄
webSocketMap.put(accountId, this);
LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
addOnlineCount(); // 在線數 +1
LOGGER.info("有新視窗開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());
try {
sendMessage(JSON.toJSONString("連接成功"));
} catch (IOException e) {
e.printStackTrace();
throw new ApiException("websocket IO例外!!!!");
}
}
/**
* 關閉連接
*/
@OnClose
public void onClose() {
if (webSocketMap.get(this.accountId) != null) {
webSocketMap.remove(this.accountId);
subOnlineCount(); // 人數 -1
LOGGER.info("有一連接關閉,當前在線人數為:" + getOnlineCount());
}
}
/**
* 收到客戶端訊息后呼叫的方法
* 這段代碼尚未有在使用,可以先不看,在哪天有需求時再改寫啟用
* @param message 客戶端發送過來的訊息
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("收到來自用戶 [" + this.accountId + "] 的資訊:" + message);
if (!StringTools.isNullOrEmpty(message)) {
try {
// 決議發送的報文
JSONObject jsonObject = JSON.parseObject(message);
// 追加發送人(防竄改)
jsonObject.put("fromUserId", this.accountId);
String toUserId = jsonObject.getString("toUserId");
// 傳送給對應 toUserId 用戶的 WebSocket
if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
} else {
// 否則不在這個服務器上,發送到 MySQL 或者 Redis
LOGGER.info("請求的userId:" + toUserId + "不在該服務器上");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void one rror(Session session, Throwable error) {
LOGGER.error("用戶錯誤:" + this.accountId + ",原因:" + error);
}
/**
* 實作服務器主動推送
*
* @param message 訊息字串
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
//需要使用同步機制,否則多并發時會因阻塞而報錯
synchronized(this.session) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("發送給用戶 ["+this.accountId +"] 的訊息出現錯誤",e.getMessage());
throw e;
}
}
}
/**
* 點對點發送
* 指定用戶id
* @param message 訊息字串
* @param userId 目標用戶id
* @throws IOException
*/
public static void sendInfo(String message, String userId) throws Exception {
Iterator entrys = webSocketMap.entrySet().iterator();
while (entrys.hasNext()) {
Map.Entry entry = (Map.Entry) entrys.next();
if (entry.getKey().toString().equals(userId)) {
webSocketMap.get(entry.getKey()).sendMessage(message);
LOGGER.info("發送訊息到用戶id為 [" + userId + "] ,訊息:" + message);
return;
}
}
//錯誤說明用戶沒有在線,不用記錄log
throw new Exception("用戶沒有在線");
}
private static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
幾點說明:
- onOpen方法:服務器與前端建立ws連接成功時自動呼叫,
- sendInfo方法:是服務器通過用戶id向指定用戶發送訊息的方法,其為靜態公有方法,因此可供各service呼叫,呼叫的例子:
// WebSocket 通知前端
try {
//呼叫WebsocketServer向目標用戶推送訊息
WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
}
@ServerEndpoint注解:
@ServerEndpoint(value = "https://www.cnblogs.com/ws/{uid}",configurator = WebSocketConfig.class) //回應路徑為 /ws/{uid} 的連接請求
這么注解之后,前端只用發起 ws://xxx.xxx:xxxx/ws/{uid} 即可開啟ws連接(或者wss協議,增加TLS), 比如前端js代碼這么寫:
<script>
var socket;
/* 啟動ws連接 */
function openSocket() {
if(typeof(WebSocket) == "undefined") {
console.log("您的瀏覽器不支持WebSocket");
}else{
console.log("您的瀏覽器支持WebSocket");
//實作化WebSocket物件,指定要連接的服務器地址與埠 建立連接
var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
console.log("正在連接:"+socketUrl);
if(socket!=null){
socket.close();
socket=null;
}
socket = new WebSocket(socketUrl);
/* websocket 基本方法 */
//打開事件
socket.onopen = function() {
console.log(new Date()+"websocket已打開,正在連接...");
//socket.send("這是來自客戶端的訊息" + location.href + new Date());
};
//獲得訊息事件
socket.onmessage = function(msg) {
console.log(msg.data);
//發現訊息進入 開始處理前端觸發邏輯
};
//關閉事件
socket.onclose = function() {
console.log(new Date()+"websocket已關閉,連接失敗...");
//重新請求token
};
//發生了錯誤事件
socket.onerror = function() {
console.log("websocket連接發生發生了錯誤");
}
}
}
/* 發送訊息 */
function sendMessage() {
if(typeof(WebSocket) == "undefined") {
console.log("您的瀏覽器不支持WebSocket");
}else {
console.log("您的瀏覽器支持WebSocket");
console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
}
}
</script>
存在的問題
一切看起來很順利,我只要放個用戶id進去,就可以想跟誰通訊就跟誰通訊咯!
但設想一個場景, 我是小明,uid為250,我想找uid為520的小花聊天,理論上我只要發起ws://xxx.xxx:xxxx/ws/250請求與服務器連接,小花也發起ws://xxx.xxx:xxxx/ws/520與服務器建立ws連接,我們就能互發訊息了吧!
這時候出現了uid為1的小黃,他竟然想挖墻腳!?他竟然學過js,自己發了ws://xxx.xxx:xxxx/ws/520跟服務器建立ws連接,而小花根本不想和我發訊息,所以實際上是小黃冒充了小花,把小花NTR了(實際上人家并不在乎??),跟我愉快地聊天?!
那怎么辦啊?我怎么才能知道在跟我Websocket的究竟是美女小花還是黃毛小黃啊??!
這就引入了JWT!
JWT——JSON WEB TOKEN
可以看到后端會回應/ws/{token}的連接請求,前端可以發/ws/{token}的連接請求,一開始寫的時候看網上的都是用/ws/{userId}來建立該id的用戶與服務器的ws連接,但這樣的話可能就很不安全,無法保證使用某個id建立的ws確實就是真實用戶發起的連接,(小花被小黃NTR的悲慘故事)
所以在調研了很多公開的解決方案,看到有改用令牌(token)來建立ws連接的說法,同時驗證用戶身份(事實上一些其他介面也可以用令牌(token)來保證介面安全性),于是打算自己試試看,未必是最好的,甚至可能有點頭痛醫頭,腳痛醫腳,但總歸是個經驗,記錄一下,,
//Websocket Server
@ServerEndpoint(value = "https://www.cnblogs.com/ws/{token}",configurator = WebSocketConfig.class) //回應路徑為 /ws/{token} 的連接請求
@Component
public class WebSocketServer {
...
}
js:
var socketUrl="http://xxx.xxx.xxx.xxx:xxxx/ws/"+$("#token").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
....
socket = new WebSocket(socketUrl);
業務邏輯

為什么用JWT
最初考慮的是用/ws/{userId}來建立ws連接,然后在后臺拿session中的user來對比用戶id,判斷合法性,
結果發現ws的session和http的session是不同的,還不好拿,可能得想辦法把http的session存到redis或者DB(也可以存在記憶體中,只是可能又要消耗記憶體資源),在建立ws連接之前去拿出來驗證合法性,后面查到了還有JWT這種好東西,
JWT好在哪里?
?推薦閱讀:什么是 JWT -- JSON WEB TOKEN?
我的總結:
- token可以過期
- 驗證token可以不用存在redis或者DB或者記憶體,完全依賴演算法即可
- 只要從前端請求中拿到token,后端就可以根據封裝好的演算法驗證這個token合不合法,有沒有被篡改過(這點很重要),過期了沒有
- 可以將用戶id、用戶名等非敏感資料一同封裝到token中,后端拿到token后可以解碼拿到資料,只要這個token合法,這些發來的資料就是可信的(小黃就算自己發明了token也不作數),是沒有被篡改的(小黃就算把我小花的token偷走把用戶id改成自己的也沒用,后臺可以算出來被改過),可以建立ws連接,呼叫websocket server進行通訊,
教程
JWT教程 整合到本專案中
pom.xml
<!-- JWT 相關 -->
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.12.1</version>
</dependency>
<!-- base64 -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
token的前兩個部分是由base64編碼的,所以需要codec進行解碼,
?
實作一個JWT工具類
目前基本當作工具使用
com.xxxx.course.util.JWTUtil
?
/**
* @author jojo
* JWT 令牌工具類
*/
public class JWTUtil {
/**
* 默認本地密鑰
* @notice: 非常重要,請勿泄露
*/
private static final String SECRET = "doyoulikevanyouxi?" //亂打的
/**
* 默認有效時間單位,為分鐘
*/
private static final int TIME_TYPE = Calendar.MINUTE;
/**
* 默認有效時間長度,同http Session時長,為60分鐘
*/
private static final int TIME_AMOUNT = 600;
/**
* 全自定生成令牌
* @param payload payload部分
* @param secret 本地密鑰
* @param timeType 時間型別:按Calender類中的常量傳入:
* Calendar.YEAR;
* Calendar.MONTH;
* Calendar.HOUR;
* Calendar.MINUTE;
* Calendar.SECOND;等
* @param expiredTime 過期時間,單位由 timeType 決定
* @return 令牌
*/
public static String generateToken(Map<String,String> payload,String secret,int timeType,int expiredTime){
JWTCreator.Builder builder = JWT.create();
//payload部分
payload.forEach((k,v)->{
builder.withClaim(k,v);
});
Calendar instance = Calendar.getInstance();
instance.add(timeType,expiredTime);
//設定超時時間
builder.withExpiresAt(instance.getTime());
//簽名
return builder.sign(Algorithm.HMAC256(secret)).toString();
}
/**
* 生成token
* @param payload payload部分
* @return 令牌
*/
public static String generateToken(Map<String,String> payload){
return generateToken(payload,SECRET,TIME_TYPE,TIME_AMOUNT);
}
省略了多載方法....
/**
* 驗證令牌合法性
* @param token 令牌
* @return
*/
public static void verify(String token) {
//如果有任何驗證例外,此處都會拋出例外
JWT.require(Algorithm.HMAC256(SECRET)).build().verify(token);
}
/**
* 自定義密鑰決議
* @param token 令牌
* @param secret 密鑰
* @return 結果
*/
public static DecodedJWT parseToken(String token,String secret) {
DecodedJWT decodedJWT = JWT.require(Algorithm.HMAC256(secret)).build().verify(token);
return decodedJWT;
}
/**
* 決議令牌
* 當令牌不合法將拋出錯誤
* @param token
* @return
*/
public static DecodedJWT parseToken(String token) {
return parseToken(token,SECRET);
}
/**
* 決議令牌獲得payload,值為claims形式
* @param token
* @param secret
* @return
*/
public static Map<String,Claim> getPayloadClaims(String token,String secret){
DecodedJWT decodedJWT = parseToken(token,secret);
return decodedJWT.getClaims();
}
/**
* 默認決議令牌獲得payload,值為claims形式
* @param token 令牌
* @return
*/
public static Map<String,Claim> getPayloadClaims(String token){
return getPayloadClaims(token,SECRET);
}
/**
* 決議令牌獲得payload,值為String形式
* @param token 令牌
* @return
*/
public static Map<String,String> getPayloadString(String token,String secret){
Map<String, Claim> claims = getPayloadClaims(token,secret);
Map<String,String> payload = new HashMap<>();
claims.forEach((k,v)->{
if("exp".equals(k)){
payload.put(k,v.asDate().toString());
}
else {
payload.put(k, v.asString());
}
});
return payload;
}
/**
* 默認決議令牌獲得payload,值為String形式
* @param token 令牌
* @return
*/
public static Map<String,String> getPayloadString(String token){
return getPayloadString(token,SECRET);
}
/**
* 通過用戶物體生成令牌
* @param user 用戶物體
* @return
*/
public static String generateUserToken(Account user){
return generateUserToken(user.getId());
}
/**
* 通過用戶id生成令牌
* @param accountId 用戶id
* @return
*/
public static String generateUserToken(Integer accountId){
return generateUserToken(accountId.toString());
}
/**
* 通過用戶id生成令牌
* @param accountId 用戶id
* @return
*/
public static String generateUserToken(String accountId){
Map<String,String> payload = new HashMap<>();
payload.put("accountId",accountId);
return generateToken(payload);
}
/**
* 從令牌中決議出用戶id
* @param token 令牌
* @return
*/
public static String parseUserToken(String token){
Map<String, String> payload = getPayloadString(token);
return payload.get("accountId");
}
}
調整登陸 service 中,登陸時回傳一個token
com.xxxx.course.service.impl.AccountServiceImpl
public JSONObject login(){
登陸成功...
...
//生成并放入通信令牌token,令牌中帶有用戶id,用以鑒別身份
String token = JWTUtil.generateUserToken(user);
jsonObject.put("token",token);
...
后續操作...
return jsonObject;
}
WebSocket 連接握手時進行身份驗證
之后前端只要攜帶token進行ws連接即可,寫了個ws的配置類,繼承了一個websocket連接的監聽器ServerEndpointConfig.Configurator,進行token的驗證,
com.XXXXX.course.config.webSocket.WebSocketConfig
/**
* 開啟 WebSocket 支持,進行前后端即時通訊
* https://blog.csdn.net/qq_33833327/article/details/105415393
* session配置:https://www.codeleading.com/article/6950456772/
* @author jojo
*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements WebSocketConfigurer {
/**
* logger
*/
private static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 監聽websocket連接,處理握手前行為
* @param sec
* @param request
* @param response
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
String[] path = request.getRequestURI().getPath().split("/");
String token = path[path.length-1];
//todo 驗證用戶令牌是否有效
try {
JWTUtil.verify(token);
} catch (Exception e) {
LOGGER.info("攔截了非法連接",e.getMessage());
return;
}
super.modifyHandshake(sec, request, response);
}
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
...
}
這樣,每次服務器建立ws連接前,都要驗證token的合法性,僅僅通過JWTUtil.verify(token);即可!當token不合法,就會拋出例外,
?
再配合重寫websocket server的onOpen方法,應該就能進行身份可信的通信了!
/**
* 連接建立成功呼叫的方法
*
* @param session
* @param token 用戶令牌
*/
@OnOpen
public void onOpen(Session session, @PathParam("token") String token) {
this.session = session;
this.token = token;
//設定超時,同httpSession
session.setMaxIdleTimeout(3600000);
//決議令牌,拿取用戶資訊
Map<String, String> payload = JWTUtil.getPayloadString(token);
String accountId = payload.get("accountId");
this.accountId = accountId;
//存盤websocket連接,存在記憶體中,若有同一個用戶同時在線,也會存,不會覆寫原有記錄
webSocketMap.put(accountId, this);
LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
addOnlineCount(); // 在線數 +1
LOGGER.info("有新視窗開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());
...
(非必須)RabbitMQ訊息中間件
教程
- RabbitMQ 基本介紹+入門使用 : P14-P19,看完這個視頻 基本上能知道 RabbitMQ 是什么、怎么部署(推薦使用docker,學習時若使用centOS推薦使用7.x版本,
8.x我真的不會用)、怎么整合到springboot,
為什么我要用RabbitMQ
- 正經理由:
- 可以將寫DB與發送訊息兩件事情異步處理,這樣回應會更快,
- 未來可以拓展為集群
- 真正理由:
- 人傻
- 閑的
整合到本專案中
pom.xml
<!-- rabbitMQ 相關-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ 配置類
com.xxxx.course.config.rabbitMQ.RabbitMQConfig
/**
* @author jojo
*/
@Configuration
public class RabbitMQConfig {
/**
* 指定環境
*/
@Value("${spring.profiles.active}")
private String env;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 交換機名
*/
public String MEMBER_INVITATION_EXCHANGE = RabbitMQConst.MEMBER_INVITATION_EXCHANGE;
/**
* 交換機佇列
*/
public String MEMBER_INVITATION_QUEUE = RabbitMQConst.MEMBER_INVITATION_QUEUE;
/**
* 宣告 課程成員邀請訊息 交換機
* @return
*/
@Bean("memberInvitationDirectExchange")
public Exchange memberInvitationDirectExchange(){
//根據專案環境起名,比如開發環境會帶dev字樣
String exchangeName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_EXCHANGE);
return ExchangeBuilder.directExchange(exchangeName).durable(true).build();
}
/**
* 宣告 課程成員邀請訊息 佇列
* @return
*/
@Bean("memberInvitationQueue")
public Queue memberInvitationQueue(){
//同上
String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
return QueueBuilder.durable(queueName).build();
}
/**
* 課程成員邀請訊息的佇列與交換機系結
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding memberInvitationBinding(@Qualifier("memberInvitationQueue") Queue queue,@Qualifier("memberInvitationDirectExchange") Exchange exchange){
String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
return BindingBuilder.bind(queue).to(exchange).with(queueName).noargs();
}
/**
* Springboot啟動時, 驗證佇列名根據環境命名正確
*/
@Bean
public void verify(){
Queue memberInvitationQueue = SpringUtil.getBean("memberInvitationQueue", Queue.class);
Exchange memberInvitationDirectExchange = SpringUtil.getBean("memberInvitationDirectExchange", Exchange.class);
LOGGER.info("訊息佇列 ["+memberInvitationQueue.getName()+"] 創建成功");
LOGGER.info("訊息交換器 ["+memberInvitationDirectExchange.getName()+"] 創建成功");
//放入映射中存盤
RabbitMQConst.QUEUE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationQueue.getName());
RabbitMQConst.EXCHANGE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationDirectExchange.getName());
}
/**
* 自定義messageConverter使得訊息中攜帶的pojo序列化成json格式
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
專案運行后,在 RabbitMQ服務器 中就會出現剛剛注冊的佇列與交換器(圖是舊的,沒有體現根據環境命名佇列,但是其實做到了):


課程管理成員邀請介面
com.scholat.course.service.impl.CourseMemberInvitationServiceImpl- 首先在介面中注入操作rabbitMQ的Bean
@Autowired
RabbitTemplate rabbitTemplate;
- 在課程管理員業務代碼中加入向rabbitMQ發送訊息的邏輯
CourseMemberInvitationServiceImpl
@Override
@Transactional(rollbackFor = Exception.class) //開啟事務,以防萬一
public JSONObject addCourseMemberInvitation(Integer courseId, String username, String requestIp) {
//檢查課程是否存在
courseService.hasCourse(courseId);
//檢查用戶是否已加入課程平臺
accountService.hasAccount(username);
/* 若存在則查看邀請記錄是否已經存在 */
//獲取用戶id
Account account = accountService.getAccountByUsernameOrEmail(username);
//檢查用戶名是否存在
if(account==null){
JSONObject result = new JSONObject();
result.put(RESULT,FAILED);
result.put(MSG,"用戶不存在");
return result;
}
Integer accountId = account.getId();
//獲得發出邀請人的id
Account user = (Account) SecurityUtils.getSubject().getSession().getAttribute("user");
Integer adminId = user.getId();
//檢查是否自己邀請自己,是則不再執行
hasInvitedOneself(accountId,adminId);
//檢查是否已經邀請過,是則不再執行
hasInvited(courseId,accountId,adminId);
/* 若不存在則新建邀請記錄 */
CourseMemberInvitation courseMemberInvitation = new CourseMemberInvitation();
courseMemberInvitation.setCourseId(courseId);
courseMemberInvitation.setAccountId(accountId);
courseMemberInvitation.setAdminId(adminId);
courseMemberInvitation.setCreateTime(new Date());
courseMemberInvitation.setCreateIp(requestIp);
//新建訊息
CourseMessage courseMessage = courseMessageService.newMessage(MessageConst.MEMBER_INVITATION, accountId, adminId);
//系結邀請記錄與訊息記錄
courseMemberInvitation.setBindMessageId(courseMessage.getId());
//插入資料庫(這里用的是MybatisPlus)
int insertResult = courseMemberInvitationDao.insert(courseMemberInvitation);
//根據資料庫插入回傳值封裝json
JSONObject result = insertCourseMemberInvitationResult(insertResult, courseMemberInvitation);
if(result.get(RESULT).equals(FAILED)){
//若資料庫操作沒有成功,則直接回傳json
return result;
}
/* 發送訊息 */
courseMessageService.sendMessage(courseMessage);
//根據插入情況回傳json
return result;
}
courseMessageService中實作的sendMessage方法:
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(CourseMessage courseMessage) {
//嘗試發送
//將訊息放入rabbitMQ
storeInRabbitMQ(courseMessage);
}
private void storeInRabbitMQ(CourseMessage courseMessage){
//將訊息放入rabbitMQ
String exchangeName = (String) RabbitMQConst.EXCHANGE_MAP.get(courseMessage.getType());
String routeKey = (String) RabbitMQConst.QUEUE_MAP.get(courseMessage.getType());
try {
//送到rabbitMQ佇列中
rabbitTemplate.convertAndSend(exchangeName,routeKey,courseMessage);
}
catch (Exception e){
LOGGER.error("插入rabbitMQ失敗",e);
}
}
com.xxxx.course.rabbitMQ.listener.CourseMemberInvitationListener
該類是用以監聽_課程成員邀請_訊息的,即是在rabbitMQ服務器建立的member_invitation佇列,
/**
* @author jojo
*/
@Component
public class CourseMemberInvitationListener {
@Autowired
MessageHandler messageHandler;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* spEL運算式
* 一旦佇列中有新訊息,這個方法就會被觸發
*/
@RabbitListener(queues = "#{memberInvitationQueue.name}")
public void listenCourseMemberInvitation(Message message){
messageHandler.handleMessage(message);
}
}
com.xxxx.course.rabbitMQ.MessageHandler, 該類是用來處理監聽事件的:
@Service
public class MessageHandler {
@Autowired
MessageConverter messageConverter;
@Autowired
RabbitTemplate rabbitTemplate;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 佇列訊息處理業務
* @param message
*/
public void handleMessage(Message message){
CourseMessage courseMessage = (CourseMessage) messageConverter.fromMessage(message);
// WebSocket 通知前端
try {
//將訊息發給指定用戶
WebSocketServer.sendInfo(JSON.toJSONString(courseMessage),courseMessage.getAccountId().toString());
} catch (Exception e) {
//訊息存在資料庫中了,待用戶上線后再獲取
LOGGER.info("發送訊息id為 ["+courseMessage.getId()+"] 的訊息給->訊息待收方id為 ["+courseMessage.getAccountId().toString()+"] 的用戶,但其不在線上,");
}
}
}
這樣做應該就可以用RabbitMQ了,
?
總結
本文的難點是ws的認證問題,雖然用超級好用的JWT解決了,但是隨之而來的還有很多問題,比如:
- 注銷登錄等場景下 token 還有效
與之類似的具體相關場景有:- 退出登錄;
- 修改密碼;
- 服務端修改了某個用戶具有的權限或者角色;
- 用戶的帳戶被洗掉/暫停,
- 用戶由管理員注銷;
- token 的續簽問題
token 有效期一般都建議設定的不太長,那么 token 過期后如何認證,如何實作動態重繪 token,避免用戶經常需要重新登錄?
這些問題還是有待解決是??
本文就當記錄一下自己的胡作非為吧??
總之,至少小花再也不怕被小黃NTR了
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/424876.html
標籤:Java

