WebSocket和nginx使用
http&ws https&wss
WebSocket 協議在2008年誕生,2011年成為國際標準,所有瀏覽器都已經支持了,
它的最大特點就是,服務器可以主動向客戶端推送資訊,客戶端也可以主動向服務器發送資訊,是真正的雙向平等對話,屬于服務器推送技術的一種,
WebSocket 協議在2008年誕生,2011年成為國際標準,所有瀏覽器都已經支持了,
它的最大特點就是,服務器可以主動向客戶端推送資訊,客戶端也可以主動向服務器發送資訊,是真正的雙向平等對話,屬于服務器推送技術的一種,

其他特點包括:
(1)建立在 TCP 協議之上,服務器端的實作比較容易,
(2)與 HTTP 協議有著良好的兼容性,默認埠也是80和443,并且握手階段采用 HTTP 協議,因此握手時不容易屏蔽,能通過各種 HTTP 代理服務器,
(3)資料格式比較輕量,性能開銷小,通信高效,
(4)可以發送文本,也可以發送二進制資料,
(5)沒有同源限制,客戶端可以與任意服務器通信,
(6)協議識別符號是ws(如果加密,則為wss),服務器網址就是 URL,
ws://example.com:80/some/path # 與http協議并行
wss://example.com:80/some/path # 與https協議并行

前端后端示例
vue
window.webSocket = new WebSocket(`${wsUrl}${id}`);
// window.webSocket = new Rwebsocket(`${wsUrl}${id}`, null, { debug: true, reconnectInterval: 3000,automaticOpen:false })
/*建立連接*/
webSocket.onopen = e => {
console.log('建立連接')
heartCheck.reset().start(); // 成功建立連接后,重置心跳檢測
};
/*連接關閉*/
webSocket.onclose = e => {
console.log("webSocket連接關閉");
};
// /*接收服務器推送訊息*/
webSocket.onmessage = e => {
heartCheck.reset().start(); // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
console.log(e,'接收服務器推送訊息')
let res=JSON.parse(e.data)
callBack(res)
};
// /*連接發生錯誤時*/
webSocket.onerror = e => {
console.log('webSocket連接失敗');
}
// 心跳檢測, 每隔一段時間檢測連接狀態,如果處于連接中,就向server端主動發送訊息,來重置server端與客戶端的最大連接時間,如果已經斷開了,發起重連,
let heartCheck = {
timeout: 55000, // 發一次心跳,比server端設定的連接時間稍微小一點,在接近斷開的情況下以通信的方式去重置連接時間,
serverTimeoutObj: null,
reset: function () {
// clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
this.serverTimeoutObj = setInterval(function () {
if (window.webSocket&&window.webSocket.readyState == 1) {
console.log("連接狀態,發送訊息保持連接");
window.webSocket.send(`{"toUserId":"${id}"}`);
heartCheck.reset().start(); // 如果獲取到訊息,說明連接是正常的,重置心跳檢測
} else {
console.log("斷開狀態,嘗試重連");
window.webSocket.close();
Socket();
}
}, this.timeout)
}
}
java
pom.xml依賴
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
pojo訊息物體
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SocketMessage {
/** 型別 (1-預警訊息、2-通知通告、3-系統訊息)*/
private Object type;
/** 業務主鍵 */
private String businessID;
/** 訊息標題 */
private String title;
/** 訊息內容 */
private String message;
/** 提報資料 */
private String commitType;
/** 資料提報訊息-組織ID */
private String orgId;
}
WebSocketServer
/**
* @description: WebSocket服務類
* @autor: WJY
* @create: 2021-10-27 11:15
* @since: 1.8
*/
@ServerEndpoint("/ws/{userId}")
@Component
public class WebSocketServer {
static Log log= LogFactory.get(WebSocketServer.class);
/**靜態變數,用來記錄當前在線連接數,應該把它設計成執行緒安全的,*/
private static int onlineCount = 0;
/**concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件,*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**與某個客戶端的連接會話,需要通過它來給客戶端發送資料*/
private Session session;
/**接收userId*/
private String userId="";
/**
* 連接建立成功呼叫的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
this.userId=userId;
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
webSocketMap.put(userId,this);
//加入set中
}else{
webSocketMap.put(userId,this);
//加入set中
addOnlineCount();
//在線數加1
}
log.info("用戶連接:"+userId+",當前在線人數為:" + getOnlineCount());
try {
webSocketMap.get(userId).
sendMessage(JSONUtil.toJsonStr(new SocketMessage()));
session.setMaxIdleTimeout(3600000);
} catch (IOException e) {
log.error("用戶:"+userId+",網路例外!!!!!!");
}
}
/**
* 連接關閉呼叫的方法
*/
@OnClose
public void onClose() {
try {
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
//從set中洗掉
subOnlineCount();
}
log.info("用戶退出:"+userId+",當前在線人數為:" + getOnlineCount());
}catch (Exception e){
log.error("用戶關閉連接!!!");
}
}
/**
* 實作服務器主動推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 發送自定義訊息
* */
public void sendInfo(Object message, @PathParam("userId") String userId) throws IOException {
log.info("發送訊息到:"+userId+",報文:"+message);
if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
webSocketMap.get(userId).sendMessage(JSONUtil.toJsonStr(message));
}else{
log.error("用戶"+userId+",不在線!");
}
}
/**
* 收到客戶端訊息后呼叫的方法
*
* @param message 客戶端發送過來的訊息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用戶訊息:"+userId+",報文:"+message);
//可以群發訊息
//訊息保存到資料庫、redis
if(StringUtils.isNotBlank(message)){
try {
//決議發送的報文
JSONObject jsonObject = JSON.parseObject(message);
//追加發送人(防止串改)
jsonObject.put("fromUserId",this.userId);
String toUserId = this.userId;
if (ObjectUtils.isNotEmpty(jsonObject) && ObjectUtils.isNotEmpty(jsonObject.getString("toUserId"))){
toUserId =jsonObject.getString("toUserId");
}
//傳送給對應toUserId用戶的websocket
if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
}else{
log.error("請求的userId:"+toUserId+"不在該服務器上");
//否則不在這個服務器上,發送到mysql或者redis
}
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* 錯誤訊息處理
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶錯誤:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
WebSocketConfig
/**
* @description: webSocket物件配置類
* @autor: WJY
* @create: 2021-10-27 11:12
* @since: 1.8
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
WebSocketController
/**
* @description: WebSocket訊息推送
* @autor: WJY
* @create: 2021-10-27 11:18
* @since: 1.8
*/
@RestController
public class ForwardNewsController {
@Resource
private WebSocketUtils<SocketMessage> webSocketUtils;
/**
* 前端推送訊息(測驗訊息推送介面)
*
* @param message
* @param toUserId
* @return
*/
@GetMapping("/push/{toUserId}")
public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) {
// 模擬訊息實時訊息推送
SocketMessage socketMessage = new SocketMessage("業務型別(1-預警訊息、2-通知通告、3-系統訊息)", "業務主鍵", "訊息標題", "訊息內容", null, null);
webSocketUtils.sendCustomizeMessage(socketMessage, toUserId);
return ResponseEntity.ok("MSG SEND SUCCESS");
}
}
// @GetMapping("index")
// public ResponseEntity<String> index(){
// return ResponseEntity.ok("請求成功");
// }
//
// /**
// * 獲取頁面資訊
// * @return
// */
// @GetMapping("page")
// public ModelAndView page(){
// return new ModelAndView("websocket");
// }
WebSocketUtils
/**
* @description: webSocket工具類
* @autor: WJY
* @create: 2021-10-28 9:53
* @since: 1.8
*/
@Component
@Slf4j
public class WebSocketUtils<T> {
@Resource
private WebSocketServer webSocketServer;
/**
* 發送自定義訊息
* @param msg
*/
public void sendCustomizeMessage(T msg, String userId){
try {
webSocketServer.sendInfo(msg, userId);
}catch (Exception i){
log.error("WebSocketUtils.sendCustomizeMessage訊息發送例外:{}", i.getMessage());
}
}
}
權限路徑過濾(springboog+springSecurity)
security:
permit:
list:
- /ws/*
前端API
WebSocket 建構式
WebSocket 物件作為一個建構式,用于新建 WebSocket 實體,
var ws = new WebSocket('ws://localhost:8080');
執行上面陳述句之后,客戶端就會與服務器進行連接,
實體物件的所有屬性和方法清單,參見這里,
webSocket.readyState
readyState屬性回傳實體物件的當前狀態,共有四種,
CONNECTING:值為0,表示正在連接,
OPEN:值為1,表示連接成功,可以通信了,
CLOSING:值為2,表示連接正在關閉,
CLOSED:值為3,表示連接已經關閉,或者打開連接失敗,
下面是一個示例,
switch (ws.readyState) {
case WebSocket.CONNECTING:
// do something
break;
case WebSocket.OPEN:
// do something
break;
case WebSocket.CLOSING:
// do something
break;
case WebSocket.CLOSED:
// do something
break;
default:
// this never happens
break;
}
webSocket.onopen
實體物件的onopen屬性,用于指定連接成功后的回呼函式,
ws.onopen = function () {
ws.send('Hello Server!');
}
如果要指定多個回呼函式,可以使用addEventListener方法,
ws.addEventListener('open', function (event) {
ws.send('Hello Server!');
});
webSocket.onclose
實體物件的onclose屬性,用于指定連接關閉后的回呼函式,
ws.onclose = function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
};
ws.addEventListener("close", function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
});
webSocket.onmessage
實體物件的onmessage屬性,用于指定收到服務器資料后的回呼函式,
ws.onmessage = function(event) {
var data = event.data;
// 處理資料
};
ws.addEventListener("message", function(event) {
var data = event.data;
// 處理資料
});
注意,服務器資料可能是文本,也可能是二進制資料(blob物件或Arraybuffer物件),
ws.onmessage = function(event){
if(typeof event.data === String) {
console.log("Received data string");
}
if(event.data instanceof ArrayBuffer){
var buffer = event.data;
console.log("Received arraybuffer");
}
}
除了動態判斷收到的資料型別,也可以使用binaryType屬性,顯式指定收到的二進制資料型別,
// 收到的是 blob 資料
ws.binaryType = "blob";
ws.onmessage = function(e) {
console.log(e.data.size);
};
// 收到的是 ArrayBuffer 資料
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {
console.log(e.data.byteLength);
};
webSocket.send()
實體物件的send()方法用于向服務器發送資料,
發送文本的例子,
ws.send('your message');
發送 Blob 物件的例子,
var file = document
.querySelector('input[type="file"]')
.files[0];
ws.send(file);
發送 ArrayBuffer 物件的例子,
// Sending canvas ImageData as ArrayBuffer
var img = canvas_context.getImageData(0, 0, 400, 320);
var binary = new Uint8Array(img.data.length);
for (var i = 0; i < img.data.length; i++) {
binary[i] = img.data[i];
}
ws.send(binary.buffer);
webSocket.bufferedAmount
實體物件的bufferedAmount屬性,表示還有多少位元組的二進制資料沒有發送出去,它可以用來判斷發送是否結束,
var data = new ArrayBuffer(10000000);
socket.send(data);
if (socket.bufferedAmount === 0) {
// 發送完畢
} else {
// 發送還沒結束
}
webSocket.onerror
實體物件的onerror屬性,用于指定報錯時的回呼函式,
socket.onerror = function(event) {
// handle error event
};
socket.addEventListener("error", function(event) {
// handle error event
});
nginx相關配置
# 重點的兩行配置
# 將nginx的請求頭從http升級到websocket.
proxy_set_header Upgrade $http_upgrade;
# 進行nginx連接websocket
proxy_set_header Connection "upgrade";
user root;
worker_processes 2;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
tcp_nodelay on;
proxy_http_version 1.1;
#keepalive_timeout 0; 0是禁止等待后臺服務回應時間
keepalive_timeout 65;
server {
listen 80;
server_name localhost;
client_max_body_size 2000M;
# 開啟gzip
gzip on;
# 啟用gzip壓縮的最小檔案,小于設定值的檔案將不會壓縮
gzip_min_length 1k;
# gzip 壓縮級別,1-9,數字越大壓縮的越好,也越占用CPU時間,后面會有詳細說明
gzip_comp_level 9;
# 進行壓縮的檔案型別,javascript有多種形式,后面的圖片壓縮不需要的可以自行洗掉
gzip_types text/plain application/javascript application/x-javascript text/css application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png;
# 是否在http header中添加Vary: Accept-Encoding,建議開啟
gzip_vary on;
# 設定壓縮所需要的緩沖區大小
gzip_buffers 4 16k;
#獲取用戶真實ip
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 1800;
proxy_send_timeout 1800;
proxy_read_timeout 1800;
location / {
root /tmp/production;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/local/nginx/html;
}
# webSocket使用過必要的配置項項(單獨配置的路徑代理)
location /wsk/ {
proxy_pass 代理的ip地址;
# 重點的兩行配置
# 將nginx的請求頭從http升級到websocket.
proxy_set_header Upgrade $http_upgrade;
#進行nginx連接websocket
proxy_set_header Connection "upgrade";
}
}
}
參考文獻
https://www.ruanyifeng.com/blog/2017/05/websocket.html
https://www.tutorialspoint.com/websockets/websockets_communicating_server.htm
http://www.eclipse.org/jetty/documentation.php
https://www.cnblogs.com/mafly/p/websocket.html (nginx中502報錯)
http://nginx.org/en/docs/http/websocket.html (nginx配置詳解)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/356862.html
標籤:其他
上一篇:(4)安裝機器人并對接青龍面板
