首發CSDN:徐同學呀,原創不易,轉載請注明源鏈接,我是徐同學,用心輸出高質量文章,希望對你有所幫助, 本篇基于Tomcat10.0.6,建議收藏起來慢慢看,
文章目錄
- 一、前言
- 二、什么是WebSocket
- 1、HTTP/1.1的缺陷
- 2、WebSocket發展歷史
- (1)背景
- (2)歷史
- 3、WebSocket握手和雙向通信
- (1)定義
- (2)握手(建立連接)
- (3)訊息幀
- (4)揮手(關閉連接)
- 4、WebSocket優點
- 三、Java API for WebSocket(JSR356)
- 1、服務端API
- (1)注解方式@ServerEndpoint
- (2)繼承抽象類Endpoint
- 2、客戶端API
- 3、背景關系Session
- 4、HandshakeRequest 和 HandshakeResponse
- (1)HandshakeRequest
- (2)HandshakeResponse
- 5、WebSocketContainer
- 四、WebSocket基于Tomcat應用
- 1、服務器端實作
- (1)@ServerEndpoint注解方式
- (2)繼承抽象類Endpoint方式
- (3)早期Tomcat7中Server端實作對比
- 2、客戶端實作
- (1)前端js版
- (2)@ClientEndpoint注解方式
- (3)繼承抽象類Endpoint方式
- 3、基于Nginx反向代理注意事項
- 五、WebSocket在Tomcat中的原始碼實作
- 1、WsSci初始化
- (1)WsSci#onStartup
- (2)WsServerContainer#addEndpoint
- (3)PojoMethodMapping方法映射和形參決議
- 2、協議升級(握手)
- (1)WsFilter
- (2)UpgradeUtil#doUpgrade
- (3)Request#upgrade
- (4)回呼機制ActionHook#action
- (5)ConnectionHandler#process
- (6)WsHttpUpgradeHandler#init握手成功
- 3、資料傳輸和決議
- (1)接收客戶端訊息
- (2)發送訊息給客戶端
- 六、要點回顧
- 七、參考文獻
一、前言
WebSocket是一種全雙工通信協議,即客戶端可以向服務端發送請求,服務端也可以主動向客戶端推送資料,這樣的特點,使得它在一些實時性要求比較高的場景效果斐然(比如微信朋友圈實時通知、在線協同編輯等),主流瀏覽器以及一些常見服務端通信框架(Tomcat、netty、undertow、webLogic等)都對WebSocket進行了技術支持,那么,WebSocket具體是什么?為什么會出現WebSocket?如何做到全雙工通信?解決了什么問題?

二、什么是WebSocket
1、HTTP/1.1的缺陷
HTTP/1.1最初是為網路中超文本資源(HTML),請求-回應傳輸而設計的,后來支持了傳輸更多型別的資源,如圖片、視頻等,但都沒有改變它單向的請求-回應模式,
隨著互聯網的日益壯大,HTTP/1.1功能使用上已體現捉襟見肘的疲態,雖然可以通過某些方式滿足需求(如Ajax、Comet),但是性能上還是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:
- 請求-回應模式,只能客戶端發送請求給服務端,服務端才可以發送回應資料給客戶端,
- 傳輸資料為文本格式,且請求/回應頭部冗長重復,
(為了區分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)
2、WebSocket發展歷史
(1)背景
在WebSocket出現之前,主要通過長輪詢和HTTP長連接實作實時資料更新,這種方式有個統稱叫Comet,Tomcat8.5之前有對Comet基于流的HTTP長連接做支持,后來因為WebSocket的成熟和標準化,以及Comet自身依然是基于HTTP,在性能消耗和瓶頸上無法跳脫HTTP,就把Comet廢棄了,
還有一個SPDY技術,也對HTTP進行了改進,多路復用流、服務器推送等,后來演化成HTTP/2.0,因為適用場景和解決的問題不同,暫不對HTTP/2.0做過多解釋,不過對于HTTP/2.0和WebSocket在Tomcat實作中都是作為協議升級來處理的,
(Comet和SPDY的原理不是本篇重點,沒有展開講解,感興趣的同學可自行百度)
(2)歷史
在這種背景下,HTML5制定了WebSocket
- 籌備階段,
WebSocket被劃分為HTML5標準的一部分,2008年6月,Michael Carter進行了一系列討論,最終形成了稱為WebSocket的協議, - 2009年12月,Google Chrome 4是第一個提供標準支持的瀏覽器,默認情況下啟用了
WebSocket, - 2010年2月,
WebSocket協議的開發從W3C和WHATWG小組轉移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指導下進行了兩次修訂, - 2011年,
IETF將WebSocket協議標準化為RFC 6455起,大多數Web瀏覽器都在實作支持WebSocket協議的客戶端API,此外,已經開發了許多實作WebSocket協議的Java庫, - 2013年,發布JSR356標準,Java API for WebSocket,
(為什么要去了解WebSocket的發展歷史和背景呢?個人認為可以更好的理解某個技術實作的演變歷程,比如Tomcat,早期有Comet沒有WebSocket時,Tomcat就對Comet做了支持,后來有WebSocket了,但是還沒出JSR356標準,Tomcat就對Websocket做了支持,自定義API,再后來有了JSR356,Tomcat立馬緊跟潮流,廢棄自定義的API,實作JSR356那一套,這就使得在Tomcat7使用WebSocket的同學,想升為Tomcat8(其實Tomcat7.0.47之后就是JSR356標準了),發現WebSocket接入方式變了,而且一些細節也變了,)
3、WebSocket握手和雙向通信
(1)定義
WebSocket全雙工通信協議,在客戶端和服務端建立連接后,可以持續雙向通信,和HTTP同屬于應用層協議,并且都依賴于傳輸層的TCP/IP協議,
雖然WebSocket有別于HTTP,是一種新協議,但是RFC 6455中規定:
it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.
WebSocket通過HTTP埠80和443進行作業,并支持HTTP代理和中介,從而使其與HTTP協議兼容,- 為了實作兼容性,
WebSocket握手使用HTTPUpgrade頭從HTTP協議更改為WebSocket協議, Websocket使用ws或wss的統一資源標志符(URI),分別對應明文和加密連接,
(2)握手(建立連接)
在雙向通信之前,必須通過握手建立連接,Websocket通過 HTTP/1.1 協議的101狀態碼進行握手,首先客戶端(如瀏覽器)發出帶有特殊訊息頭(Upgrade、Connection)的請求到服務器,服務器判斷是否支持升級,支持則回傳回應狀態碼101,表示協議升級成功,對于WebSocket就是握手成功,
客戶端請求示例:
GET /test HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg==
Origin: http://example.com
Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Version: 13
Connection必須設定Upgrade,表示客戶端希望連接升級,Upgrade: websocket表明協議升級為websocket,Sec-WebSocket-Key欄位內記錄著握手程序中必不可少的鍵值,由客戶端(瀏覽器)生成,可以盡量避免普通HTTP請求被誤認為Websocket協議,Sec-WebSocket-Version表示支持的Websocket版本,RFC6455要求使用的版本是13,Origin欄位是必須的,如果缺少origin欄位,WebSocket服務器需要回復HTTP 403 狀態碼(禁止訪問),通過Origin可以做安全校驗,
服務端回應示例:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A=
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
Sec-WebSocket-Protocol: v12.stomp
Sec-WebSocket-Accept的欄位值是由握手請求中的Sec-WebSocket-Key的欄位值生成的,成功握手確立WebSocket連接之后,通信時不再使用HTTP的資料幀,而采用WebSocket獨立的資料幀,

(3)訊息幀
WebSocket使用二進制訊息幀作為雙向通信的媒介,何為訊息幀?發送方將每個應用程式訊息拆分為一個或多個幀,通過網路將它們傳輸到目的地,并重新組裝決議出一個完整訊息,
有別于HTTP/1.1文本訊息格式(冗長的訊息頭和分隔符等),WebSocket訊息幀規定一定的格式,以二進制傳輸,更加短小精悍,二者相同之處就是都是基于TCP/IP流式協議(沒有規定訊息邊界),
如下是訊息幀的基本結構圖:

FIN: 1 bit,表示該幀是否為訊息的最后一幀,1-是,0-否,RSV1,RSV2,RSV3: 1 bit each,預留(3位),擴展的預留標志,一般情況為0,除非協商的擴展定義為非零值,如果接收到非零值且不為協商擴展定義,接收端必須使連接失敗,Opcode: 4 bits,定義訊息幀的操作型別,如果接收到一個未知Opcode,接收端必須使連接失敗,(0x0-延續幀,0x1-文本幀,0x2-二進制幀,0x8-關閉幀,0x9-PING幀,0xA-PONG幀(在接收到PING幀時,終端必須發送一個PONG幀回應,除非它已經接收到關閉幀),0x3-0x7保留給未來的非控制幀,0xB-F保留給未來的控制幀)Mask: 1 bit,表示該幀是否為隱藏的,即被加密保護的,1-是,0-否,Mask=1時,必須傳一個Masking-key,用于解除隱藏(客戶端發送訊息給服務器端,Mask必須為1),Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效載荷資料的長度(擴展資料長度+應用資料長度,擴展資料長度可以為0),
if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.
Masking-key: 0 or 4 bytes,用于解除幀隱藏(加密)的key,Mask=1時不為空,Mask=0時不用傳,Payload data: (x+y) bytes,有效載荷資料包括擴展資料(x bytes)和應用資料(y bytes),有效載荷資料是用戶真正要傳輸的資料,
這樣的二進制訊息幀設計,與HTTP協議相比,WebSocket協議可以提供約500:1的流量減少和3:1的延遲減少,
(4)揮手(關閉連接)
揮手相對于握手要簡單很多,客戶端和服務器端任何一方都可以通過發送關閉幀來發起揮手請求,發送關閉幀的一方,之后不再發送任何資料給對方;接收到關閉幀的一方,如果之前沒有發送過關閉幀,則必須發送一個關閉幀作為回應,關閉幀中可以攜帶關閉原因,
在發送和接收一個關閉幀訊息之后,就認為WebSocket連接已關閉,且必須關閉底層TCP連接,
除了通過關閉握手來關閉連接外,WebSocket連接也可能在另一方離開或底層TCP連接關閉時突然關閉,
4、WebSocket優點
-
較少的控制開銷,在連接建立后,服務器和客戶端之間交換資料時,用于協議控制的資料包頭部相對于HTTP請求每次都要攜帶完整的頭部,顯著減少,
-
更強的實時性,由于協議是全雙工的,所以服務器可以隨時主動給客戶端下發資料,相對于HTTP請求需要等待客戶端發起請求服務端才能回應,延遲明顯更少,
-
保持連接狀態,與HTTP不同的是,
Websocket需要先建立連接,這就使得其成為一種有狀態的協議,之后通信時可以省略部分狀態資訊,而HTTP請求可能需要在每個請求都攜帶狀態資訊(如身份認證等), -
更好的二進制支持,
Websocket定義了二進制幀,相對HTTP,可以更輕松地處理二進制內容, -
支持擴展,
Websocket定義了擴展,用戶可以擴展協議、實作部分自定義的子協議, -
更好的壓縮效果,相對于HTTP壓縮,
Websocket在適當的擴展支持下,可以沿用之前內容的背景關系,在傳遞類似的資料時,可以顯著提高壓縮率,

三、Java API for WebSocket(JSR356)
JSR356在Java EE7時歸為Java EE標準的一部分(后來Java EE更名為Jakarta EE,世上再無Java EE,以下統一稱Jakarta EE),所有兼容Jakarta EE的應用服務器,都必須遵循JSR356標準的WebSocket協議API,

根據JSR356規定, 建立WebSocket連接的服務器端和客戶端,兩端對稱,可以互相通信,差異性較小,抽象成API,就是一個個Endpoint(端點),只不過服務器端的叫ServerEndpoint,客戶端的叫ClientEndpoint,客戶端向服務端發送WebSocket握手請求,建立連接后就創建一個ServerEndpoint物件,(這里的Endpoint和Tomcat連接器里的AbstractEndpoint名稱上有點像,但是兩個毫不相干的東西,就像周杰倫和周杰的關系,)
ServerEndpoint和ClientEndpoint在API上差異也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之處是ServerEndpoint作為服務器端點,可以指定一個URI路徑供客戶端連接,ClientEndpoint沒有,
1、服務端API
服務器端的Endpoint有兩種實作方式,一種是注解方式@ServerEndpoint,一種是繼承抽象類Endpoint,
(1)注解方式@ServerEndpoint
首先看看@ServerEndpoint有哪些要素:
value,可以指定一個URI路徑標識一個Endpoint,subprotocols,用戶在WebSocket協議下自定義擴展一些子協議,decoders,用戶可以自定義一些訊息解碼器,比如通信的訊息是一個物件,接收到訊息可以自動解碼封裝成訊息物件,encoders,有解碼器就有編碼器,定義解碼器和編碼器的好處是可以規范使用層訊息的傳輸,configurator,ServerEndpoint配置類,主要提供ServerEndpoint物件的創建方式擴展(如果使用Tomcat的WebSocket實作,默認是反射創建ServerEndpoint物件),

@ServerEndpoint可以注解到任何類上,但是想實作服務端的完整功能,還需要配合幾個生命周期的注解使用,這些生命周期注解只能注解在方法上:
@OnOpen建立連接時觸發,@OnClose關閉連接時觸發,@OnError發生例外時觸發,@OnMessage接收到訊息時觸發,
(2)繼承抽象類Endpoint
繼承抽象類Endpoint,重寫幾個生命周期方法,

怎么沒有onMessage方法,實作onMessage還需要繼承實作一個介面jakarta.websocket.MessageHandler,MessageHandler介面又分為Partial和Whole,實作的MessageHandler需要在onOpen觸發時注冊到jakarta.websocket.Session中,

繼承抽象類Endpoint的方式相對于注解方式要麻煩的多,除了繼承Endpoint和實作介面MessageHandler外,還必須實作一個jakarta.websocket.server.ServerApplicationConfig來管理Endpoint,比如給Endpoint分配URI路徑,

而encoders、decoders、configurator等配置資訊由jakarta.websocket.server.ServerEndpointConfig管理,默認實作jakarta.websocket.server.DefaultServerEndpointConfig,
所以如果使用 Java 版WebSocket服務器端實作首推注解方式,
2、客戶端API
對于客戶端API,也是有注解方式和繼承抽象類Endpoint方式,
- 注解方式,只需要將
@ServerEndpoint換成@ClientEndpoint, - 繼承抽象類
Endpoint方式,需要一個jakarta.websocket.ClientEndpointConfig來管理encoders、decoders、configurator等配置資訊,默認實作jakarta.websocket.DefaultClientEndpointConfig,
3、背景關系Session
WebSocket是一個有狀態的連接,建立連接后的通信都是通過jakarta.websocket.Session保持狀態,一個連接一個Session,每一個Session有一個唯一標識Id,
Session的主要職責涉及:
- 基礎資訊管理(
request資訊(getRequestURI、getRequestParameterMap、getPathParameters等)、協議版本getProtocolVersion、子協議getNegotiatedSubprotocol等), - 連接管理(狀態判斷
isOpen、接收訊息的MessageHandler、發送訊息的異步遠程端點RemoteEndpoint.Async和同步遠程端點RemoteEndpoint.Basic等),
4、HandshakeRequest 和 HandshakeResponse
HandshakeRequest 和 HandshakeResponse了解即可,這兩個介面主要用于WebScoket握手升級程序中握手請求回應的封裝,如果只是單純使用WebSocket,不會接觸到這兩個介面,
(1)HandshakeRequest

(2)HandshakeResponse

Sec-WebSocket-Accept根據客戶端傳的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket原始碼實作中生成Sec-WebSocket-Accept的演算法:
private static String getWebSocketAccept(String key) {
byte[] digest = ConcurrentMessageDigest.digestSHA1(
key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
return Base64.encodeBase64String(digest);
}
5、WebSocketContainer
jakarta.websocket.WebSocketContainer顧名思義,就是WebSocket的容器,集大成者,其主要職責包括但不限于connectToServer,客戶端連接服務器端,基于瀏覽器的WebSocket客戶端連接服務器端,由瀏覽器支持,但是基于Java版的WebSocket客戶端就可以通過WebSocketContainer#connectToServer向服務端發起連接請求,


四、WebSocket基于Tomcat應用
(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是測驗專案基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致,)
1、服務器端實作
(1)@ServerEndpoint注解方式
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class)
public class WebSocketServerEndpoint {
private Session session;
private String userId;
@OnOpen
public void OnOpen(Session session, @PathParam(value = "userId") String userId) {
this.session = session;
this.userId = userId;
// 建立連接后,將連接存到一個map里
endpointMap.put(userId, this);
Message message = new Message(0, "connected, hello " + userId);
sendMsg(message);
}
@OnClose
public void OnClose() {
// 關閉連接時觸發,從map中洗掉連接
endpointMap.remove(userId);
System.out.println("server closed...");
}
@OnMessage
public void onMessage(Message message) {
System.out.println("server recive message=" + message.toString());
}
@OnError
public void onError(Throwable t) throws Throwable {
this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系統例外"));
t.printStackTrace();
}
/**
* 群發
* @param data
*/
public void sendAllMsg(Message data) {
for (WebSocketServerEndpoint value : endpointMap.values()) {
value.sendMsgAsync(data);
}
}
/**
* 推送訊息給指定 userId
* @param data
* @param userId
*/
public void sendMsg(Message data, String userId) {
WebSocketServerEndpoint endpoint = endpointMap.get(userId);
if (endpoint == null) {
System.out.println("not conected to " + userId);
return;
}
endpoint.sendMsgAsync(data);
}
private void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
private void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
// 存盤建立連接的Endpoint
private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();
}
每一個客戶端與服務器端建立連接后,都會生成一個WebSocketServerEndpoint,可以通過一個Map將其與userId對應存起來,為后續群發廣播和單獨推送訊息給某個客戶端提供便利,
注意:@ServerEndpoint的encoders、decoders、configurator等配置資訊在實際使用中可以不定義,如果專案簡單,完全可以用默認的,
如果通信訊息被封裝成一個物件,如示例的Message(因為原始碼過于簡單就不展示了,屬性主要有code、msg、data),就必須提供編碼器和解碼器,也可以在每次發送訊息時硬編碼轉為字串,在接收到訊息時轉為Message,有了編碼器和解碼器,顯得比較規范,轉為字串由編碼器做,字串轉為物件由解碼器做,但也使得架構變復雜了,視專案需求而定,


Configurator的用處就是自定義Endpoint物件創建方式,默認Tomcat提供的是通過反射,WebScoket是每個連接都會創建一個Endpoint物件,如果連接比較多,很頻繁,通過反射創建,用后即毀,可能不是一個好主意,所以可以搞一個物件池,用過回收,用時先從物件池中拿,有就重置,省去實體化分配記憶體等消耗程序,

如果使用SpringBoot內置Tomcat、undertow、Netty等,接入WebSocket時除了加@ServerEndpoint還需要加一個@Component,再給Spring注冊一個ServerEndpointExporter類,這樣,服務端Endpoint就交由Spring去掃描注冊了,
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();
return serverEndpointExporter;
}
}
外置Tomcat就不需要這么麻煩,Tomcat會默認掃描classpath下帶有@ServerEndpoint注解的類,(SpringBoot接入Websocket后續會單獨出文章講解,也挺有意思的)
(2)繼承抽象類Endpoint方式
import javax.websocket.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketServerEndpoint extends Endpoint {
private Session session;
private String userId;
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
this.session = session;
this.userId = session.getPathParameters().get("userId");
session.addMessageHandler(new MessageHandler());
endpointMap.put(userId, this);
Message message = new Message(0, "connected, hello " + userId);
sendMsg(message);
}
@Override
public void onClose(Session session, CloseReason closeReason) {
endpointMap.remove(userId);
}
@Override
public void one rror(Session session, Throwable throwable) {
throwable.printStackTrace();
}
/**
* 群發
* @param data
*/
public void sendAllMsg(Message data) {
for (WebSocketServerEndpoint value : endpointMap.values()) {
value.sendMsgAsync(data);
}
}
/**
* 推送訊息給指定 userId
* @param data
* @param userId
*/
public void sendMsg(Message data, String userId) {
WebSocketServerEndpoint endpoint = endpointMap.get(userId);
if (endpoint == null) {
System.out.println("not conected to " + userId);
return;
}
endpoint.sendMsgAsync(data);
}
private void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
private void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
private class MessageHandler implements javax.websocket.MessageHandler.Whole<Message> {
@Override
public void onMessage(Message message) {
System.out.println("server recive message=" + message.toString());
}
}
private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();
}
繼承抽象類Endpoint方式比加注解@ServerEndpoint方式麻煩的很,主要是需要自己實作MessageHandler和ServerApplicationConfig,@ServerEndpoint的話都是使用默認的,原理上差不多,只是注解更自動化,更簡潔,
MessageHandler做的事情,一個@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一個@ServerEndpoint就可以了,
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class MyServerApplicationConfig implements ServerApplicationConfig {
@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();
List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
decoderList.add(MessageDecoder.class);
List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
encoderList.add(MessageEncoder.class);
if (set.contains(WebSocketServerEndpoint3.class)) {
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
.create(WebSocketServerEndpoint3.class, "/ws/test3")
.decoders(decoderList)
.encoders(encoderList)
.configurator(new MyServerConfigurator())
.build();
result.add(serverEndpointConfig);
}
return result;
}
@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
return set;
}
}
如果使用SpringBoot內置Tomcat,則不需要ServerApplicationConfig了,但是需要給Spring注冊一個ServerEndpointConfig,
@Bean
public ServerEndpointConfig serverEndpointConfig() {
List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
decoderList.add(MessageDecoder.class);
List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
encoderList.add(MessageEncoder.class);
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
.create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}")
.decoders(decoderList)
.encoders(encoderList)
.configurator(new MyServerConfigurator())
.build();
return serverEndpointConfig;
}
(3)早期Tomcat7中Server端實作對比
Tomcat7早期版本7.0.47之前還沒有出JSR 356時,自己搞了一套介面,其實就是一個Servlet,
和遵循JSR356標準的版本對比,有一個比較大的變化是,createWebSocketInbound創建生命周期事件處理器StreamInbound的時機是WebSocket協議升級之前,此時還可以通過用戶執行緒快取(ThreadLocal等)的HttpServletRequest物件,獲取一些請求頭等資訊,
而遵循JSR356標準的版本實作,創建生命周期事件處理的Endpoint是在WebSocket協議升級完成(經過HTTP握手)之后創建的,而WebSocket握手成功給客戶端回應101前,會結束銷毀HttpServletRequest物件,此時是獲取不到請求頭等資訊的,
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;
@WebServlet(urlPatterns = "/ws/test")
public class MyWeSocketServlet extends WebSocketServlet {
@Override
protected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {
MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);
return messageInbound;
}
}
import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.WsOutbound;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
public class MyMessageInbound extends MessageInbound {
private String subProtocol;
private HttpServletRequest request;
public MyMessageInbound(String subProtocol, HttpServletRequest request) {
this.subProtocol = subProtocol;
this.request = request;
}
@Override
protected void onOpen(WsOutbound outbound) {
String msg = "connected, hello";
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
try {
outbound.writeBinaryMessage(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void onClose(int status) {
}
@Override
protected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {
// 接收到客戶端資訊
}
@Override
protected void onTextMessage(CharBuffer charBuffer) throws IOException {
// 接收到客戶端資訊
}
}
2、客戶端實作
(1)前端js版
js版的客戶端主要依托瀏覽器對WebScoket的支持,在生命周期事件觸發上和服務器端的差不多,這也應證了建立WebSocket連接的兩端是對等的,
撰寫WebSocket客戶端需要注意以下幾點:
- 和服務器端商議好傳輸的訊息的格式,一般為json字串,比較直觀,編碼解碼都很簡單,也可以是其他商定的格式,
- 需要心跳檢測,定時給服務器端發送訊息,保持連接正常,
- 正常關閉連接,即關閉瀏覽器視窗前主動關閉連接,以免服務器端拋例外,
- 如果因為例外斷開連接,支持重連,
// 對websocket進行簡單封裝
WebSocketOption.prototype = {
// 創建websocket操作
createWebSocket: function () {
try {
if('WebSocket' in window) {
this.ws = new WebSocket(this.wsUrl);
} else if('MozWebSocket' in window) {
this.ws = new MozWebSocket(this.wsUrl);
} else {
alert("您的瀏覽器不支持websocket協議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10以下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!");
}
this.lifeEventHandle();
} catch(e) {
this.reconnect(this.wsUrl);
console.log(e);
}
},
// 生命周期事件操作
lifeEventHandle: function() {
var self = this;
this.ws.onopen = function (event) {
self.connectCount = 1;
//心跳檢測重置
if (self.heartCheck == null) {
self.heartCheck = new HeartCheckObj(self.ws);
}
self.sendMsg(5, "")
self.heartCheck.reset().start();
console.log("websocket連接成功!" + new Date().toUTCString());
};
this.ws.onclose = function (event) {
// 全部設定為初始值
self.heartCheck = null;
self.reconnect(self.wsUrl);
console.log("websocket連接關閉!" + new Date().toUTCString());
};
this.ws.onerror = function () {
self.reconnect(self.wsUrl);
console.log("websocket連接錯誤!");
};
//如果獲取到訊息,心跳檢測重置
this.ws.onmessage = function (event) {
//心跳檢測重置
if (self.heartCheck == null) {
self.heartCheck = new HeartCheckObj(self.ws);
}
self.heartCheck.reset().start();
console.log("websocket收到訊息啦:" + event.data);
// 業務處理
// 接收到的訊息可以放到localStorage里,然后在其他地方取出來
}
},
// 斷線重連操作
reconnect: function() {
var self = this;
if (this.lockReconnect) return;
console.log(this.lockReconnect)
this.lockReconnect = true;
//沒連接上會一直重連,設定延遲避免請求過多,重連時間設定按倍數增加
setTimeout(function () {
self.createWebSocket(self.wsUrl);
self.lockReconnect = false;
self.connectCount++;
}, 10000 * (self.connectCount));
},
// 發送訊息操作
sendMsg: function(cmd, data) {
var sendData = {"cmd": cmd, "msg": data};
try {
this.ws.send(JSON.stringify(sendData));
} catch(err) {
console.log("發送資料失敗, err=" + err)
}
},
// 關閉websocket介面操作
closeWs: function() {
this.ws.close();
}
}
/**
* 封裝心跳檢測物件<p>
*/
function HeartCheckObj(ws) {
this.ws = ws;
// 心跳時間
this.timeout = 10000;
// 定時事件
this.timeoutObj = null;
// 自動斷開事件
this.serverTimeoutObj = null;
}
HeartCheckObj.prototype = {
setWs: function(ws) {
this.ws = ws;
},
reset: function() {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
// 開始心跳檢測
start: function() {
var self = this;
this.timeoutObj = setTimeout(function() {
//這里發送一個心跳,后端收到后,回傳一個心跳訊息,
//onmessage拿到回傳的心跳就說明連接正常
var ping = {"cmd":1, "msg": "ping"};
self.ws.send(JSON.stringify(ping));
//如果onmessage那里超過一定時間還沒重置,說明后端主動斷開了
self.serverTimeoutObj = setTimeout(function() {
//如果onclose會執行reconnect,我們執行ws.close()就行了.如果直接執行reconnect 會觸發onclose導致重連兩次
self.ws.close();
}, self.timeout)
}, self.timeout)
}
}
/**
* -------------------------
* 創建websocket的主流程 *
* -------------------------
*/
var currentDomain = document.domain;
var wsUrl = "ws://" + currentDomain + "/test"
var webSocketOption = new WebSocketOption(wsUrl)
webSocketOption.createWebSocket()
// 監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉視窗,server端會拋例外,
window.onbeforeunload = function() {
webSocketOption.closeWs();
}
這里推薦一個在線測驗WebSocket連接和發送訊息的網站easyswoole.com/wstool.html:

真的很牛逼,很方便,很簡單,還有原始碼github:https://github.com/easy-swoole/wstool,感興趣可以看看,
(2)@ClientEndpoint注解方式
Java版客戶端不用多說,把@ServerEndpoint換成@ClientEndpoint就可以了,其他都一樣,@ClientEndpoint比@ServerEndpoint就少了一個value,不需要設定URI,
@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class})
public class WebSocketClientEndpoint {
private Session session;
@OnOpen
public void OnOpen(Session session) {
this.session = session;
Message message = new Message(0, "connecting...");
sendMsg(message);
}
@OnClose
public void OnClose() {
Message message = new Message(0, "client closed...");
sendMsg(message);
System.out.println("client closed");
}
@OnMessage
public void onMessage(Message message) {
System.out.println("client recive message=" + message.toString());
}
@OnError
public void onError(Throwable t) throws Throwable {
t.printStackTrace();
}
public void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
public void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
}
連接服務器端:
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(WebSocketClientEndpoint.class,
new URI("ws://localhost:8080/ws/test"));
(3)繼承抽象類Endpoint方式
繼承抽象類Endpoint方式也和服務器端的差不多,但是不需要實作ServerApplicationConfig,需要實體化一個ClientEndpointConfig,Endpoint實作類和服務器端的一樣,就省略了,如下是連接服務器端的代碼:
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,
new URI("ws://localhost:8080/websocket/hello"));
3、基于Nginx反向代理注意事項
一般web服務器會用Nginx做反向代理,經過Nginx反向轉發的HTTP請求不會帶上Upgrade和Connection訊息頭,所以需要在Nginx配置里顯式指定需要升級為WebSocket的URI帶上這兩個頭:
location /chat/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_connect_timeout 4s;
proxy_read_timeout 7200s;
proxy_send_timeout 12s;
}
默認情況下,如果代理服務器在60秒內沒有傳輸任何資料,連接將被關閉,這個超時可以通過proxy_read_timeout指令來增加,或者,可以將代理服務器配置為定期發送WebSocket PING幀以重置超時并檢查連接是否仍然活躍,
具體可參考:http://nginx.org/en/docs/http/websocket.html

五、WebSocket在Tomcat中的原始碼實作
所有兼容Java EE的應用服務器,必須遵循JSR356 WebSocket Java API標準,Tomcat也不例外,而且Tomcat也是支持WebSocket最早的Web應用服務器框架(之一),在還沒有出JSR356標準時,就已經自定義了一套WebSocket API,但是JSR356一出,不得不改弦更張,
通過前面的講解,在使用上完全沒有問題,但是有幾個問題完全是黑盒的:
Server Endpoint是如何被掃描加載的?WebSocket是如何借助HTTP 進行握手升級的?WebSocket建立連接后如何保持連接不斷,互相通信的?
(如下原始碼決議,需要對Tomcat連接器原始碼有一定了解)
1、WsSci初始化
Tomcat 提供了一個org.apache.tomcat.websocket.server.WsSci類來初始化、加載WebSocket,從類名上顧名思義,利用了Sci加載機制,何為Sci加載機制?就是實作介面 jakarta.servlet.ServletContainerInitializer,在Tomcat部署裝載Web專案(org.apache.catalina.core.StandardContext#startInternal)時主動觸發ServletContainerInitializer#onStartup,做一些擴展的初始化操作,
WsSci主要做了一件事,就是掃描加載Server Endpoint,并將其加到WebSocket容器里jakarta.websocket.WebSocketContainer,
WsSci主要會掃描三種類:
- 加了
@ServerEndpoint的類, Endpoint的子類,ServerApplicationConfig的子類,
(1)WsSci#onStartup
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,
Endpoint.class})
public class WsSci implements ServletContainerInitializer {
@Override
public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
throws ServletException {
WsServerContainer sc = init(ctx, true);
if (clazzes == null || clazzes.size() == 0) {
return;
}
// Group the discovered classes by type
Set<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();
Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();
Set<Class<?>> scannedPojoEndpoints = new HashSet<>();
try {
// wsPackage is "jakarta.websocket."
String wsPackage = ContainerProvider.class.getName();
wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);
for (Class<?> clazz : clazzes) {
JreCompat jreCompat = JreCompat.getInstance();
int modifiers = clazz.getModifiers();
if (!Modifier.isPublic(modifiers) ||
Modifier.isAbstract(modifiers) ||
Modifier.isInterface(modifiers) ||
!jreCompat.isExported(clazz)) {
// Non-public, abstract, interface or not in an exported
// package (Java 9+) - skip it.
continue;
}
// Protect against scanning the WebSocket API JARs
// 防止掃描WebSocket API jar
if (clazz.getName().startsWith(wsPackage)) {
continue;
}
if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {
// 1、clazz是ServerApplicationConfig子類
serverApplicationConfigs.add(
(ServerApplicationConfig) clazz.getConstructor().newInstance());
}
if (Endpoint.class.isAssignableFrom(clazz)) {
// 2、clazz是Endpoint子類
@SuppressWarnings("unchecked")
Class<? extends Endpoint> endpoint =
(Class<? extends Endpoint>) clazz;
scannedEndpointClazzes.add(endpoint);
}
if (clazz.isAnnotationPresent(ServerEndpoint.class)) {
// 3、clazz是加了注解ServerEndpoint的類
scannedPojoEndpoints.add(clazz);
}
}
} catch (ReflectiveOperationException e) {
throw new ServletException(e);
}
// Filter the results
Set<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();
Set<Class<?>> filteredPojoEndpoints = new HashSet<>();
if (serverApplicationConfigs.isEmpty()) {
// 從這里看出@ServerEndpoint的服務器端是可以不用ServerApplicationConfig的
filteredPojoEndpoints.addAll(scannedPojoEndpoints);
} else {
// serverApplicationConfigs不為空,
for (ServerApplicationConfig config : serverApplicationConfigs) {
Set<ServerEndpointConfig> configFilteredEndpoints =
config.getEndpointConfigs(scannedEndpointClazzes);
if (configFilteredEndpoints != null) {
filteredEndpointConfigs.addAll(configFilteredEndpoints);
}
// getAnnotatedEndpointClasses 對于 scannedPojoEndpoints起到一個過濾作用
// 不滿足條件的后面不加到WsServerContainer里
Set<Class<?>> configFilteredPojos =
config.getAnnotatedEndpointClasses(
scannedPojoEndpoints);
if (configFilteredPojos != null) {
filteredPojoEndpoints.addAll(configFilteredPojos);
}
}
}
try {
// 繼承抽象類Endpoint的需要使用者手動封裝成ServerEndpointConfig
// 而加了注解@ServerEndpoint的類 Tomcat會自動封裝成ServerEndpointConfig
// Deploy endpoints
for (ServerEndpointConfig config : filteredEndpointConfigs) {
sc.addEndpoint(config);
}
// Deploy POJOs
for (Class<?> clazz : filteredPojoEndpoints) {
sc.addEndpoint(clazz, true);
}
} catch (DeploymentException e) {
throw new ServletException(e);
}
}
static WsServerContainer init(ServletContext servletContext,
boolean initBySciMechanism) {
WsServerContainer sc = new WsServerContainer(servletContext);
servletContext.setAttribute(
Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
// 注冊監聽器WsSessionListener給servletContext,
// 在http session銷毀時觸發 ws session的關閉銷毀
servletContext.addListener(new WsSessionListener(sc));
// Can't register the ContextListener again if the ContextListener is
// calling this method
if (initBySciMechanism) {
// 注冊監聽器WsContextListener給servletContext,
// 在 servletContext初始化時觸發WsSci.init
// 在 servletContext銷毀時觸發WsServerContainer的銷毀
// 不過呢,只在WsSci.onStartup時注冊一次
servletContext.addListener(new WsContextListener());
}
return sc;
}
}
從上述原始碼中可以看出ServerApplicationConfig起到一個過濾的作用:
- 當沒有
ServerApplicationConfig時,加了@ServerEndpoint的類會默認全部加到一個Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的類可以不需要自定義實作ServerApplicationConfig, - 當有
ServerApplicationConfig時,ServerApplicationConfig#getEndpointConfigs用來過濾Endpoint子類,并且Endpoint子類必須封裝成一個ServerEndpointConfig, ServerApplicationConfig#getAnnotatedEndpointClasses用來過濾加了注解@ServerEndpoint的類,一般空實作就行了(如果不想某個類被加到WsServerContainer里,那不加@ServerEndpoint不就可以了),
過濾之后的Endpoint子類和加了注解@ServerEndpoint的類會分別呼叫不同形參的WsServerContainer#addEndpoint,將其加到WsServerContainer里,
(2)WsServerContainer#addEndpoint
- 將
Endpoint子類加到WsServerContainer里,呼叫的是形參為ServerEndpointConfig的addEndpoint:
public void addEndpoint(ServerEndpointConfig sec) throws DeploymentException {
addEndpoint(sec, false);
}
因為Endpoint子類需要使用者封裝成ServerEndpointConfig,不需要Tomcat來封裝,
- 將加了注解
@ServerEndpoint的類加到WsServerContainer,呼叫的是形參為Class<?>的addEndpoint(fromAnnotatedPojo引數暫時在這個方法里沒什么用處):
該方法主要職責就是決議@ServerEndpoint,獲取path、decoders、encoders、configurator等構建一個ServerEndpointConfig物件

最終呼叫的都是如下這個比較復雜的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的類,主要做了兩件事:
-
對加了
@ServerEndpoint類的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的掃描和映射封裝, -
對
path的有效性檢查和path param決議,

(3)PojoMethodMapping方法映射和形參決議
PojoMethodMapping建構式比較長,主要是對加了@OnOpen、@OnClose、@OnError、@OnMessage的方法進行校驗和映射,以及對每個方法的形參進行決議和校驗,主要邏輯總結如下:
- 對當前類以及其父類中的方法進行掃描,
- 當前類中不能存在多個相同注解的方法,否則會拋出Duplicate annotation例外,
- 父類和子類中存在相同注解的方法,子類必須重寫該方法,否則會拋出Duplicate annotation例外,
- 對于
@OnMessage,可以有多個,但是接收訊息的型別必須不同,訊息型別大概分為三種:PongMessage心跳訊息、位元組型、字符型, - 如果掃描到對的注解都是父類的方法,子類重寫了該方法,但是沒有加回應的注解,則會被清除,
- 形參決議,
public PojoMethodMapping(Class<?> clazzPojo, List<Class<? extends Decoder>> decoderClazzes, String wsPath,
InstanceManager instanceManager) throws DeploymentException {
this.wsPath = wsPath;
List<DecoderEntry> decoders = Util.getDecoders(decoderClazzes, instanceManager);
Method open = null;
Method close = null;
Method error = null;
Method[] clazzPojoMethods = null;
Class<?> currentClazz = clazzPojo;
while (!currentClazz.equals(Object.class)) {
Method[] currentClazzMethods = currentClazz.getDeclaredMethods();
if (currentClazz == clazzPojo) {
clazzPojoMethods = currentClazzMethods;
}
for (Method method : currentClazzMethods) {
if (method.isSynthetic()) {
// Skip all synthetic methods.
// They may have copies of annotations from methods we are
// interested in and they will use the wrong parameter type
// (they always use Object) so we can't used them here.
continue;
}
if (method.getAnnotation(OnOpen.class) != null) {
checkPublic(method);
if (open == null) {
open = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(open, method)) {
// Duplicate annotation
// 拋出Duplicate annotation例外的兩種情況:
// 1. 當前的類有多個相同注解的方法,如有兩個@OnOpen
// 2. 當前類時父類,有相同注解的方法,但是其子類沒有重寫這個方法
// 即 父類和子類有多個相同注解的方法,且沒有重寫關系
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnOpen.class, currentClazz));
}
}
} else if (method.getAnnotation(OnClose.class) != null) {
checkPublic(method);
if (close == null) {
close = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(close, method)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnClose.class, currentClazz));
}
}
} else if (method.getAnnotation(OnError.class) != null) {
checkPublic(method);
if (error == null) {
error = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(error, method)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnError.class, currentClazz));
}
}
} else if (method.getAnnotation(OnMessage.class) != null) {
checkPublic(method);
MessageHandlerInfo messageHandler = new MessageHandlerInfo(method, decoders);
boolean found = false;
// 第一次掃描OnMessage時,onMessage為空,不會走下面的for,然后就把messageHandler加到onMessage里
// 如果非首次掃描到這里,即向上掃描父類,允許有多個接收訊息型別完全不同的onmessage
for (MessageHandlerInfo otherMessageHandler : onMessage) {
// 如果多個onmessage接收的訊息型別有相同的,則可能會拋出Duplicate annotation
// 1. 同一個類中多個onmessage有接收相同型別的訊息
// 2. 父子類中多個onmessage有接收相同型別的訊息,但不是重寫關系
if (messageHandler.targetsSameWebSocketMessageType(otherMessageHandler)) {
found = true;
if (currentClazz == clazzPojo ||
!isMethodOverride(messageHandler.m, otherMessageHandler.m)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnMessage.class, currentClazz));
}
}
}
if (!found) {
onMessage.add(messageHandler);
}
} else {
// Method not annotated
}
}
currentClazz = currentClazz.getSuperclass();
}
// If the methods are not on clazzPojo and they are overridden
// by a non annotated method in clazzPojo, they should be ignored
if (open != null && open.getDeclaringClass() != clazzPojo) {
// open 有可能是父類的,子類即clazzPojo有重寫該方法,但是沒有加OnOpen注解
// 則 open置為null
if (isOverridenWithoutAnnotation(clazzPojoMethods, open, OnOpen.class)) {
open = null;
}
}
if (close != null && close.getDeclaringClass() != clazzPojo) {
if (isOverridenWithoutAnnotation(clazzPojoMethods, close, OnClose.class)) {
close = null;
}
}
if (error != null && error.getDeclaringClass() != clazzPojo) {
if (isOverridenWithoutAnnotation(clazzPojoMethods, error, OnError.class)) {
error = null;
}
}
List<MessageHandlerInfo> overriddenOnMessage = new ArrayList<>();
for (MessageHandlerInfo messageHandler : onMessage) {
if (messageHandler.m.getDeclaringClass() != clazzPojo
&& isOverridenWithoutAnnotation(clazzPojoMethods, messageHandler.m, OnMessage.class)) {
overriddenOnMessage.add(messageHandler);
}
}
// 子類重寫了的onmessage方法,但沒有加OnMessage注解的需要從onMessage list 中洗掉
for (MessageHandlerInfo messageHandler : overriddenOnMessage) {
onMessage.remove(messageHandler);
}
this.onOpen = open;
this.onClose = close;
this.onError = error;
// 引數決議
onOpenParams = getPathParams(onOpen, MethodType.ON_OPEN);
onCloseParams = getPathParams(onClose, MethodType.ON_CLOSE);
one rrorParams = getPathParams(onError, MethodType.ON_ERROR);
}
雖然方法名可以隨意,但是形參卻有著強制限制:
@onOpen方法,可以有的引數Session、EndpointConfig、@PathParam,不能有其他引數,@onError方法,可以有的引數Session、@PathParam, 必須有Throwable,不能有其他引數,@onClose方法,可以有的引數Session,CloseReason,@PathParam,不能有其他引數,

2、協議升級(握手)
Tomcat中WebSocket是通過UpgradeToken機制實作的,其具體的升級處理器為WsHttpUpgradeHandler,WebSocket協議升級的程序比較曲折,首先要通過過濾器WsFilter進行升級判斷,然后呼叫org.apache.catalina.connector.Request#upgrade進行UpgradeToken的構建,最后通過org.apache.catalina.connector.Request#coyoteRequest回呼函式action將UpgradeToken回傳給連接器為后續升級處理做準備,

(1)WsFilter
WebSocket協議升級的程序比較曲折,帶有WebSocket握手的請求會平安經過Tomcat的Connector,被轉發到Servlet容器中,在業務處理之前經過過濾器WsFilter判斷是否需要升級(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain過濾鏈中觸發):
- 首先判斷
WsServerContainer是否有進行Endpoint的掃描和注冊以及請頭中是否有Upgrade: websocket, - 獲取請求
path即uri在WsServerContainer中找對應的ServerEndpointConfig, - 呼叫
UpgradeUtil.doUpgrade進行升級,

(2)UpgradeUtil#doUpgrade
UpgradeUtil#doUpgrade主要做了如下幾件事情:
- 檢查
HttpServletRequest的一些請求頭的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等, - 給
HttpServletResponse設定一些回應頭,如Upgrade:websocket、Connection: upgrade、根據Sec-WebSocket-Key的值生成回應頭Sec-WebSocket-Accept的值, - 封裝
WsHandshakeRequest和WsHandshakeResponse, - 呼叫
HttpServletRequest#upgrade進行升級,并獲取WsHttpUpgradeHandler(具體的升級流程處理器),
// org.apache.tomcat.websocket.server.UpgradeUtil#doUpgrade
public static void doUpgrade(WsServerContainer sc, HttpServletRequest req,
HttpServletResponse resp, ServerEndpointConfig sec,
Map<String,String> pathParams)
throws ServletException, IOException {
// Validate the rest of the headers and reject the request if that
// validation fails
String key;
String subProtocol = null;
// 檢查請求頭中是否有 Connection: upgrade
if (!headerContainsToken(req, Constants.CONNECTION_HEADER_NAME,
Constants.CONNECTION_HEADER_VALUE)) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
// 檢查請求頭中的 Sec-WebSocket-Version:13
if (!headerContainsToken(req, Constants.WS_VERSION_HEADER_NAME,
Constants.WS_VERSION_HEADER_VALUE)) {
resp.setStatus(426);
resp.setHeader(Constants.WS_VERSION_HEADER_NAME,
Constants.WS_VERSION_HEADER_VALUE);
return;
}
// 獲取 Sec-WebSocket-Key
key = req.getHeader(Constants.WS_KEY_HEADER_NAME);
if (key == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
// Origin check,校驗 Origin 是否有權限
String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME);
if (!sec.getConfigurator().checkOrigin(origin)) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN);
return;
}
// Sub-protocols
List<String> subProtocols = getTokensFromHeader(req,
Constants.WS_PROTOCOL_HEADER_NAME);
subProtocol = sec.getConfigurator().getNegotiatedSubprotocol(
sec.getSubprotocols(), subProtocols);
// Extensions
// Should normally only be one header but handle the case of multiple
// headers
List<Extension> extensionsRequested = new ArrayList<>();
Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME);
while (extHeaders.hasMoreElements()) {
Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());
}
// Negotiation phase 1. By default this simply filters out the
// extensions that the server does not support but applications could
// use a custom configurator to do more than this.
List<Extension> installedExtensions = null;
if (sec.getExtensions().size() == 0) {
installedExtensions = Constants.INSTALLED_EXTENSIONS;
} else {
installedExtensions = new ArrayList<>();
installedExtensions.addAll(sec.getExtensions());
installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS);
}
List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions(
installedExtensions, extensionsRequested);
// Negotiation phase 2. Create the Transformations that will be applied
// to this connection. Note than an extension may be dropped at this
// point if the client has requested a configuration that the server is
// unable to support.
List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1);
List<Extension> negotiatedExtensionsPhase2;
if (transformations.isEmpty()) {
negotiatedExtensionsPhase2 = Collections.emptyList();
} else {
negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size());
for (Transformation t : transformations) {
negotiatedExtensionsPhase2.add(t.getExtensionResponse());
}
}
// Build the transformation pipeline
Transformation transformation = null;
StringBuilder responseHeaderExtensions = new StringBuilder();
boolean first = true;
for (Transformation t : transformations) {
if (first) {
first = false;
} else {
responseHeaderExtensions.append(',');
}
append(responseHeaderExtensions, t.getExtensionResponse());
if (transformation == null) {
transformation = t;
} else {
transformation.setNext(t);
}
}
// Now we have the full pipeline, validate the use of the RSV bits.
if (transformation != null && !transformation.validateRsvBits(0)) {
throw new ServletException(sm.getString("upgradeUtil.incompatibleRsv"));
}
// 設定resp的回應頭Upgrade:websocket、 Connection: upgrade 、Sec-WebSocket-Accept:
// If we got this far, all is good. Accept the connection.
resp.setHeader(Constants.UPGRADE_HEADER_NAME,
Constants.UPGRADE_HEADER_VALUE);
resp.setHeader(Constants.CONNECTION_HEADER_NAME,
Constants.CONNECTION_HEADER_VALUE);
// 通過Sec-WebSocket-Key生成Sec-WebSocket-Accept的值
resp.setHeader(HandshakeResponse.SEC_WEBSOCKET_ACCEPT,
getWebSocketAccept(key));
if (subProtocol != null && subProtocol.length() > 0) {
// RFC6455 4.2.2 explicitly states "" is not valid here
resp.setHeader(Constants.WS_PROTOCOL_HEADER_NAME, subProtocol);
}
if (!transformations.isEmpty()) {
resp.setHeader(Constants.WS_EXTENSIONS_HEADER_NAME, responseHeaderExtensions.toString());
}
WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
WsHandshakeResponse wsResponse = new WsHandshakeResponse();
WsPerSessionServerEndpointConfig perSessionServerEndpointConfig =
new WsPerSessionServerEndpointConfig(sec);
sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig,
wsRequest, wsResponse);
wsRequest.finished();
// Add any additional headers
for (Entry<String,List<String>> entry :
wsResponse.getHeaders().entrySet()) {
for (String headerValue: entry.getValue()) {
resp.addHeader(entry.getKey(), headerValue);
}
}
// 呼叫 request.upgrade 進行升級
WsHttpUpgradeHandler wsHandler =
req.upgrade(WsHttpUpgradeHandler.class);
wsHandler.preInit(perSessionServerEndpointConfig, sc, wsRequest,
negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
req.isSecure());
}
(3)Request#upgrade
Request#upgrade主要做了三件事:
- 實體化
WsHttpUpgradeHandler并構建UpgradeToken, - 回呼
coyoteRequest.action,將UpgradeToken回傳給連接器, - 設定回應碼101,
// org.apache.catalina.connector.Request#upgrade
public <T extends HttpUpgradeHandler> T upgrade(
Class<T> httpUpgradeHandlerClass) throws java.io.IOException, ServletException {
T handler;
InstanceManager instanceManager = null;
try {
// Do not go through the instance manager for internal Tomcat classes since they don't
// need injection
if (InternalHttpUpgradeHandler.class.isAssignableFrom(httpUpgradeHandlerClass)) {
handler = httpUpgradeHandlerClass.getConstructor().newInstance();
} else {
instanceManager = getContext().getInstanceManager();
handler = (T) instanceManager.newInstance(httpUpgradeHandlerClass);
}
} catch (ReflectiveOperationException | NamingException | IllegalArgumentException |
SecurityException e) {
throw new ServletException(e);
}
// 構建 UpgradeToken,UpgradeToken主要包含WsHttpUpgradeHandler、context、協議名稱protocol
UpgradeToken upgradeToken = new UpgradeToken(handler, getContext(), instanceManager,
getUpgradeProtocolName(httpUpgradeHandlerClass));
// 回呼action 進行升級
coyoteRequest.action(ActionCode.UPGRADE, upgradeToken);
// Output required by RFC2616. Protocol specific headers should have
// already been set.
// 設定回應101
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
return handler;
}
(4)回呼機制ActionHook#action
一些發生在Servlet容器的動作可能需要回傳給連接器做處理,比如WebSocket的握手升級,所以連接器就給org.apache.coyote.Request設定了一個動作鉤子``ActionHook#action,一些動作表示定義在列舉類ActionCode中,ActionCode.UPGRADE就代表協議升級動作,org.apache.coyote.AbstractProcessor實作了ActionHook介面,ActionCode.UPGRADE動作會呼叫org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是簡單將upgradeToken設定給Http11Processor`,


(5)ConnectionHandler#process
Tomcat連接器是同步呼叫容器業務處理,容器中的業務處理結束后還是回到連接器繼續往下執行,
連接器將請求轉發給容器處理是在配接器里完成的,容器中流程處理結束回傳到org.apache.catalina.connector.CoyoteAdapter#service,繼續往下執行,最終結束并回收HttpServletrequest、HttpServletreponse物件,

org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中呼叫的,
Http11Processor#service是HTTP請求處理主流程,通過upgradeToken != null來判斷是否為升級操作,s是則回傳SocketState.UPGRADING,
最后來到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一個連接處理的主流程,根據Http11Processor#service回傳SocketState.UPGRADING來進行升級操作,如下只截取了和WebSocket協議升級相關流程的代碼:
- 獲取
UpgradeToken,從中取出HttpUpgradeHandler,對于WebSocket來說是WsHttpUpgradeHandler, - 呼叫
WsHttpUpgradeHandler#init啟動協議升級處理,

(6)WsHttpUpgradeHandler#init握手成功
走到這里,基本上就是握手成功了,接下來就是創建WsSession和觸發onOpen,

WsSession的構建中會實體化Endpoint,如果實體化出來的物件不是Endpoint型別,即加了@ServerEndpoint的實體物件,則用一個PojoEndpointServer進行包裝,而PojoEndpointServer是繼承了抽象類Endpoint的,
觸發onOpen時會將WsSession傳進去,對于加PojoEndpointServer,因為用戶自定義的方法名和形參不確定,所以通過反射呼叫用戶自定義的onopen形式的方法,并且會將通過@onMessage決議出的MessageHandler設定給WsSession,

3、資料傳輸和決議
握手成功之后就建立了雙向通信的連接,該連接有別于HTTP/1.1長連接(應用服務器中作業執行緒回圈占用),而是占用一條TCP連接,在連接建立是進行TCP三次握手,之后全雙工互相通信,將不需要再進行耗時的TCP的三次握手和四次揮手,一方需要關閉WebSocket連接時,發送關閉幀,另一方接收到關閉幀之后,也發送個關閉幀作為回應,之后就認為WebSocket連接關閉了,并且關閉底層TCP連接(四次揮手),
實則WebSocket全雙工是建立在TCP的長鏈接上的,TCP長鏈接長時間沒有訊息通信,會定時保活,一般WebSocket會通過代理如nginx等進行連接通信,nginx有一個連接超時沒有任何資訊傳輸時,會斷開,所以需要WebSocket一端定時發送心跳保活,
(1)接收客戶端訊息
客戶端來了訊息,由連接器的Poller輪詢監測socket底層是否有資料到來,有資料可讀,則封裝成一個SocketProcessor扔到執行緒池里處理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有處理升級協議連接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是專門處理WebSocket連接的處理器,
org.apache.tomcat.websocket.server.WsFrameServer是對服務器端訊息幀處理的封裝,包括讀取底層資料,按訊息幀格式決議、拼裝出有效載荷資料,觸發onMessage,
因為原始碼篇幅較多,只展示具體原始碼呼叫流程:

(2)發送訊息給客戶端
一般,客戶端發送WebSocket握手請求,和服務器端建立連接后,服務器端需要將連接(Endpoint+WsSession)保存起來,為后續主動推送訊息給客戶端提供方便,
Tomcat提供了可以發送三種資料型別(文本、二進制、Object物件)和兩種發送方式(同步、異步)的發送訊息的方法,
org.apache.tomcat.websocket.WsRemoteEndpointAsync異步發送,org.apache.tomcat.websocket.WsRemoteEndpointBasic同步發送,
發送訊息也同樣需要按訊息幀格式封裝,然后通過socket寫到網路里即可,

六、要點回顧
WebSocket的出現不是空穴來風,起初在HTTP/1.1基礎上通過輪詢和長連接達到資訊實時同步的功能,但是這并沒有跳出HTTP/1.1自身的缺陷,HTTP/1.1明顯的兩個缺陷:訊息頭冗長且為文本傳輸,請求回應模式,為此,WebSocket誕生了,跳出HTTP/1.1,建立一個新的真正全雙工通信協議,
不僅僅要會在專案中使用WebSocket,還要知道其通信原理和在應用服務器中的實作原理,很多注意事項都是在查閱了官方資源和原始碼之后恍然大悟的,
- 在Tomcat中使用
WebSocket不可以在Endpoint里獲取快取的HttpServletRequest物件,因為在WebSocket握手之前,HTTP/1.1請求就算結束了(HttpServletRequest物件被回收),建立連接之后就更是獨立于HTTP/1.1了, - 建立連接的
WebSocket,會生成新的Endpoint和WsSession, - 使用內置Tomcat需要注意,
WsSci做的事情交給了Spring做, WebSocket全雙工是建立在TCP長連接的基礎之上,- … …

七、參考文獻
- https://datatracker.ietf.org/doc/html/rfc6455(可能需要翻墻)
- https://www.oracle.com/technical-resources/articles/java/jsr356.html
- https://medium.com/swlh/websockets-with-spring-part-1-http-and-websocket-36c69df1c2ee(可能需要翻墻)
- http://nginx.org/en/docs/http/websocket.html
- https://zh.wikipedia.org/wiki/WebSocket
- 書籍:《Tomcat架構決議》劉光瑞(Tomcat8.5)11.3.4 Tomcat的WebSocket實作
- 書籍:《Tomcat內核設計剖析》汪建(Tomcat7)10.6 WebSocket協議的支持
- 書籍:《圖解HTTP》9.3 使用瀏覽器進行全雙工通信的WebSocket
- 極客時間:《深入拆解Tomcat & Jetty》李號雙(Tomcat9.x)18.新特性:Tomcat如何支持WebSocket?
- Tomcat注釋原始碼:https://gitee.com/stefanpy/tomcat-source-code-learning
如若文章有錯誤理解,歡迎批評指正,同時非常期待你的留言和點贊,如果覺得有用,不妨點個在看,讓更多人受益,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/296850.html
標籤:其他
上一篇:grpc-go原始碼剖析六十一之假設在一條呼叫鏈上,存在多個grpc服務的呼叫,如A服務呼叫B服務呼叫C服務,那么他們的超時時間如何?
