SpringBoot-SSE開發實踐
SSE介紹
SSE(Server-SentEvents,即服務器發送事件)是圍繞只讀Comet互動推出的API或者模式,SSE API用于創建到服務器的單向連接,服務器通過這個連接可以發送任意數量的資料,服務器回應的MIME型別必須是text/event-stream,而且是瀏覽器中的JavaScript API能決議格式輸出,SSE支持短輪詢、長輪詢和HTTP流,而且能在斷開連接時自動確定何時重新連接,
- SSE特點:實作簡單、 單向通信、自動重連、···
- 業務場景:客戶端與服務端建立連接后,只需要服務端給客戶端發送資料,客戶端無需要給服務端發送資料
開發實踐
專案框架
Github地址
.
├── README.md
├── pom.xml
└── src
└── main
├── java
│ └── cn
│ └── zuster
│ └── sse
│ ├── SseApplication.java【啟動類】
│ ├── controller 【控制器】
│ ├── exception 【例外】
│ ├── service 【服務介面】
│ │ └── impl 【服務實作】
│ ├── session 【SESSION管理】
│ └── task 【任務管理】
└── resources
└── application.properties 【組態檔】
專案依賴
SpringBoot中已經有SseEmitter了,所以不需要額外引入其他包,
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.zuster</groupId>
<artifactId>my-demo-springboot-sse</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>my-demo-springboot-sse</name>
<description>Spring Boot And SSE Demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Session管理
服務端和客戶端建立連接時,往往會保持很多個SSE會話,為此,我們需要統一的對會話進行管理,此處我們使用 ConcurrentHashMap進行會話管理,其中 key 為客戶端ID,value 為 SseEmitter 物件,當然 value 也可以按照業務進行封裝,
主要方法說明:
- boolean exist(String id):檢測指定的客戶端Session是否存在,
- boolean add(String id, SseEmitter emitter):添加Session,如果有相同的客戶端ID,則先結束掉之前的Session,重新建立新的Session,
- boolean del(String id):洗掉指定客戶端Session,
- boolean send(String id, Object msg):給指定的客戶端發送資料,注意,此處沒有指定 MediaType ,即默認發送的就是 data,如果需要發送其他型別的資料,可進行自由擴展,
- void onCompletion(String id, ScheduledFuture<?> future):當 SseEmitter 觸發 onCompletion時業務中需要處理的邏輯,包括停止執行緒池中的執行緒執行(比如心跳),移除快取的Session等,
- void onError(String id, SseException e):當 SseEmitter 觸發 one rror 和 onTimeout 時業務中需要處理的邏輯,這里我取到快取的Session,然后繼續觸發 completeWithError(),最侄訓是會執行到上面的 onCompletion() 方法中,
package cn.zuster.sse.session;
// 省略 import
/**
* SSE Session
*
* @author zuster
* @date 2021/1/5
*/
public class SseSession {
private static final Logger logger = LoggerFactory.getLogger(SseSession.class);
/**
* Session維護Map
*/
private static Map<String, SseEmitter> SESSION = new ConcurrentHashMap<>();
/**
* 判斷Session是否存在
*
* @param id 客戶端ID
* @return
*/
public static boolean exist(String id) {
return SESSION.get(id) == null;
}
/**
* 增加Session
*
* @param id 客戶端ID
* @param emitter SseEmitter
*/
public static void add(String id, SseEmitter emitter) {
final SseEmitter oldEmitter = SESSION.get(id);
if (oldEmitter != null) {
oldEmitter.completeWithError(new SseException("RepeatConnect(Id:" + id + ")"));
}
SESSION.put(id, emitter);
}
/**
* 洗掉Session
*
* @param id 客戶端ID
* @return
*/
public static boolean del(String id) {
final SseEmitter emitter = SESSION.remove(id);
if (emitter != null) {
emitter.complete();
return true;
}
return false;
}
/**
* 發送訊息
*
* @param id 客戶端ID
* @param msg 發送的訊息
* @return
*/
public static boolean send(String id, Object msg) {
final SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
try {
emitter.send(msg);
return true;
} catch (IOException e) {
logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e);
return false;
}
}
return false;
}
/**
* SseEmitter onCompletion 后執行的邏輯
*
* @param id 客戶端ID
* @param future
*/
public static void onCompletion(String id, ScheduledFuture<?> future) {
SESSION.remove(id);
if (future != null) {
// SseEmitter斷開后需要中斷心跳發送
future.cancel(true);
}
}
/**
* SseEmitter onTimeout 或 one rror 后執行的邏輯
*
* @param id
* @param e
*/
public static void onError(String id, SseException e) {
final SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
emitter.completeWithError(e);
}
}
}
業務介面
一般使用 SSE 時的業務包括:客戶端建立連接、給客戶端發送資料、客戶端終端連接,介面如下:
package cn.zuster.sse.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* SSE 相關業務介面
*
* @author zuster
* @date 2021/1/5
*/
public interface SseService {
/**
* 新建連接
*
* @param clientId 客戶端ID
* @return
*/
SseEmitter start(String clientId);
/**
* 發送資料
*
* @param clientId 客戶端ID
* @return
*/
String send(String clientId);
/**
* 關閉連接
*
* @param clientId 客戶端ID
* @return
*/
String close(String clientId);
}
業務實作
業務實作簡要介紹:
- ScheduledExecutorService heartbeatExecutors :使用執行緒池來管理客戶端連接后給客戶端發送心跳,我們的業務場景是建立連接后服務端需每隔10秒給客戶單發送一個訊息,若連續3次未收到心跳,則客戶端中斷連接,重新進行連接,很多地方使用while(true)…Thread.sleep()方式來實作此業務,但是在真實業務中問題很多,沒有用執行緒池優雅和高效,
- SseEmitter start(String clientId) :客戶端建立連接,建立連接后,需要將快取Session,同時設定心跳(如果有其他業務也可以在這里設定),另外在onCompletion、onTimeout、onError回呼事件中處理相關的業務,強調:一定要在回呼中處理掉Session和之前設定的Task,否則很容易OOM!
- String send(String clientId):向指定客戶端發送訊息,
- String close(String clientId):關閉連接,
package cn.zuster.sse.service.impl;
// 省略 import
/**
* SSE 相關業務實作
*
* @author zuster
* @date 2021/1/5
*/
@Service
public class SseServiceImpl implements SseService {
private static final Logger logger = LoggerFactory.getLogger(SseServiceImpl.class);
/**
* 發送心跳執行緒池
*/
private static ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(8);
/**
* 新建連接
*
* @param clientId 客戶端ID
* @return
*/
@Override
public SseEmitter start(String clientId) {
// 設定為0L為永不超時
// 次數設定30秒超時,方便測驗 timeout 事件
SseEmitter emitter = new SseEmitter(30_000L);
logger.info("MSG: SseConnect | EmitterHash: {} | ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.add(clientId, emitter);
final ScheduledFuture<?> future = heartbeatExecutors.scheduleAtFixedRate(new HeartBeatTask(clientId), 0, 10, TimeUnit.SECONDS);
emitter.onCompletion(() -> {
logger.info("MSG: SseConnectCompletion | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onCompletion(clientId, future);
});
emitter.onTimeout(() -> {
logger.error("MSG: SseConnectTimeout | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("TimeOut(clientId: " + clientId + ")"));
});
emitter.onError(t -> {
logger.error("MSG: SseConnectError | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("Error(clientId: " + clientId + ")"));
});
return emitter;
}
/**
* 發送資料
*
* @param clientId 客戶端ID
* @return
*/
@Override
public String send(String clientId) {
if (SseSession.send(clientId, System.currentTimeMillis())) {
return "Succeed!";
}
return "error";
}
/**
* 關閉連接
*
* @param clientId 客戶端ID
* @return
*/
@Override
public String close(String clientId) {
logger.info("MSG: SseConnectClose | ID: {} | Date: {}", clientId, new Date());
if (SseSession.del(clientId)) return "Succeed!";
return "Error!";
}
}
任務
我們的業務為建立連接后發送心跳資料,此處我只設定了客戶端ID,如果業務中有其他資料可以擴充,
package cn.zuster.sse.task;
// 省略 import
/**
* 心跳任務
*
* @author zuster
* @date 2021/1/5
*/
public class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private final String clientId;
public HeartBeatTask(String clientId) {
// 這里可以按照業務傳入需要的資料
this.clientId = clientId;
}
@Override
public void run() {
logger.info("MSG: SseHeartbeat | ID: {} | Date: {}", clientId, new Date());
SseSession.send(clientId, "ping");
}
}
例外
package cn.zuster.sse.exception;
/**
* SSE例外資訊
*
* @author zuster
* @date 2021/1/5
*/
public class SseException extends RuntimeException {
public SseException() {
}
public SseException(String message) {
super(message);
}
public SseException(String message, Throwable cause) {
super(message, cause);
}
public SseException(Throwable cause) {
super(cause);
}
public SseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
控制器
package cn.zuster.sse.controller;
// 省略 import
/**
* SSE測驗控制器
*
* @author songyh
* @date 2021/1/5
*/
@RestController
@RequestMapping("sse")
public class SseTestController {
private static final Logger logger = LoggerFactory.getLogger(SseTestController.class);
@Autowired
private SseService sseService;
@RequestMapping("start")
public SseEmitter start(@RequestParam String clientId) {
return sseService.start(clientId);
}
/**
* 將SseEmitter物件設定成完成
*
* @param clientId
* @return
*/
@RequestMapping("/end")
public String close(String clientId) {
return sseService.close(clientId);
}
}
測驗
代碼就上面這么多了,啟動起來測驗一下吧,
- 建立連接:http://localhost:8080/sse/start?clientId=888
- 關閉連接:http://localhost:8080/sse/end?clientId=111
需要測驗的點包括:
- 同時開啟多個連接
- 啟動兩個相同的連接
- 啟動后直接關掉
- 啟動后等待30秒超時
- 發送訊息(我controller中刪了發送訊息的,可以自行加上試試)
- 通過關閉連接介面關閉連接
好了,敬請的玩吧👏👏👏
如果還有其他更加優雅的方式,歡迎留言🤝🤝🤝
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/246160.html
標籤:java
下一篇:手敲Java:列印菱形!
