目錄
前言
Maven依賴
代碼
總結
前言
在作業中是否會遇到實用websocket客戶端連接服務端的時候,網路波動,服務端斷連的情況,會導致客戶端被動斷開連接,為了解決這個問題,需要對被動斷開連接的情況進行捕獲,并重新創建連接,這篇文章主要是提供可以直接使用的斷線重連websocket客戶端代碼,
Maven依賴
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
代碼
不廢話,上代碼,
package ai.guiji.csdn.ws.client;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import javax.net.ssl.*;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/** @Author huyi @Date 2021/10/15 20:03 @Description: 重連websocket客戶端 */
@Slf4j
public class ReConnectWebSocketClient {
/** 字串訊息回呼 */
private Consumer<String> msgStr;
/** 位元組流訊息回呼 */
private Consumer<ByteBuffer> msgByte;
/** 例外回呼 */
private Consumer<Exception> error;
/** 連接標識 */
private String key;
/** ws服務端連接 */
private URI serverUri;
/** 嘗試重連標識 */
private AtomicBoolean tryReconnect;
/** 需要ping標識 */
private AtomicBoolean needPing;
/** websocket連接物體 */
private WebSocketClient webSocketClient;
/** 重連次數 */
private AtomicInteger reConnectTimes;
/** 連接結束標識 */
private AtomicBoolean end;
/** 連接后初始發送報文,這里也可以不需要,如果服務端主動斷開連接,重連后可以繼續推送報文的話, */
private String initReConnectReq;
/** 結束回呼 */
private Consumer<String> endConsumer;
public ReConnectWebSocketClient(
URI serverUri,
String key,
Consumer<String> msgStr,
Consumer<ByteBuffer> msgByte,
Consumer<Exception> error) {
this.msgStr = msgStr;
this.msgByte = msgByte;
this.error = error;
this.key = key;
this.serverUri = serverUri;
this.tryReconnect = new AtomicBoolean(false);
this.needPing = new AtomicBoolean(true);
this.reConnectTimes = new AtomicInteger(0);
this.end = new AtomicBoolean(false);
this.endConsumer = this::close;
init();
}
/** 初始化連接 */
public void init() {
// 創建連接
createWebSocketClient();
// ping執行緒
circlePing();
}
private void needReconnect() throws Exception {
ThreadUtil.sleep(10, TimeUnit.SECONDS);
int cul = reConnectTimes.incrementAndGet();
if (cul > 3) {
close("real stop");
throw new Exception("服務端斷連,3次重連均失敗");
}
log.warn("[{}]第[{}]次斷開重連", key, cul);
if (tryReconnect.get()) {
log.error("[{}]第[{}]次斷開重連結果 -> 連接正在重連,本次重連請求放棄", key, cul);
needReconnect();
return;
}
try {
tryReconnect.set(true);
if (webSocketClient.isOpen()) {
log.warn("[{}]第[{}]次斷開重連,關閉舊連接", key, cul);
webSocketClient.closeConnection(2, "reconnect stop");
}
webSocketClient = null;
createWebSocketClient();
connect();
if (StrUtil.hasBlank(initReConnectReq)) {
send(initReConnectReq);
}
} catch (Exception exception) {
log.error("[{}]第[{}]次斷開重連結果 -> 連接正在重連,重連例外:[{}]", key, cul, exception.getMessage());
needReconnect();
} finally {
tryReconnect.set(false);
}
}
private void createWebSocketClient() {
webSocketClient =
new WebSocketClient(serverUri) {
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("[{}]ReConnectWebSocketClient [onOpen]連接成功{}", key, getRemoteSocketAddress());
tryReconnect.set(false);
}
@Override
public void onMessage(String text) {
log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務端資料:text={}", key, text);
msgStr.accept(text);
}
@Override
public void onMessage(ByteBuffer bytes) {
log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務端資料:bytes={}", key, bytes);
msgByte.accept(bytes);
}
@Override
public void onWebsocketPong(WebSocket conn, Framedata f) {
log.info(
"[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode={}",
key,
f.getOpcode());
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("[{}]ReConnectWebSocketClient [onClose]關閉,s={},b={}", key, s, b);
if (StrUtil.hasBlank(s) || s.contains("https")) {
if (end.get()) {
return;
}
try {
needReconnect();
} catch (Exception exception) {
endConsumer.accept("reconnect error");
error.accept(exception);
}
}
}
@Override
public void one rror(Exception e) {
log.info("[{}]ReConnectWebSocketClient [onError]例外,e={}", key, e);
endConsumer.accept("error close");
error.accept(e);
}
};
if (serverUri.toString().contains("wss://")) {
trustAllHosts(webSocketClient);
}
}
public void circlePing() {
new Thread(
() -> {
while (needPing.get()) {
if (webSocketClient.isOpen()) {
webSocketClient.sendPing();
}
ThreadUtil.sleep(5, TimeUnit.SECONDS);
}
log.warn("[{}]Ping回圈關閉", key);
})
.start();
}
/**
* 連接
*
* @throws Exception 例外
*/
public void connect() throws Exception {
webSocketClient.connectBlocking(10, TimeUnit.SECONDS);
}
/**
* 發送
*
* @param msg 訊息
* @throws Exception 例外
*/
public void send(String msg) throws Exception {
this.initReConnectReq = msg;
if (webSocketClient.isOpen()) {
webSocketClient.send(msg);
}
}
/**
* 關閉
*
* @param msg 關閉訊息
*/
public void close(String msg) {
needPing.set(false);
end.set(true);
if (webSocketClient != null) {
webSocketClient.closeConnection(3, msg);
}
}
/**
* 忽略證書
*
* @param client
*/
public void trustAllHosts(WebSocketClient client) {
TrustManager[] trustAllCerts =
new TrustManager[] {
new X509ExtendedTrustManager() {
@Override
public void checkClientTrusted(
X509Certificate[] x509Certificates, String s, Socket socket)
throws CertificateException {}
@Override
public void checkServerTrusted(
X509Certificate[] x509Certificates, String s, Socket socket)
throws CertificateException {}
@Override
public void checkClientTrusted(
X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
throws CertificateException {}
@Override
public void checkServerTrusted(
X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
throws CertificateException {}
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
};
try {
SSLContext ssl = SSLContext.getInstance("SSL");
ssl.init(null, trustAllCerts, new java.security.SecureRandom());
SSLSocketFactory socketFactory = ssl.getSocketFactory();
client.setSocketFactory(socketFactory);
} catch (Exception e) {
log.error("ReConnectWebSocketClient trustAllHosts 例外,e={0}", e);
}
}
}
代碼說明:
1、參數的重連次數可以配置,
2、增加異步pingpong執行緒,一旦結束連接會自動關閉,
3、對字串、位元組流、例外都有回呼措施,
測驗代碼方法
public static void main(String[] args) throws Exception {
ReConnectWebSocketClient client =
new ReConnectWebSocketClient(
new URI(String.format("wss://192.168.1.77:24009")),
"test",
// 字串訊息處理
msg -> {
// todo 字串訊息處理
System.out.println("字串訊息:" + msg);
},
null,
// 例外回呼
error -> {
// todo 字串訊息處理
System.out.println("例外:" + error.getMessage());
});
client.connect();
client.send("haha");
}
驗證結果
16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務端資料:text=connect success from tcp4:192.168.6.63:11018!
字串訊息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次斷開重連
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務端資料:text=connect success from tcp4:192.168.6.63:11038!
字串訊息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端資料:opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次斷開重連
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]例外,e={}
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=error close,b=false
例外:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping回圈關閉
這里我才用的是手動關閉服務端方式觸發,客戶端被動斷連情況,重連兩次,第二次服務端還未啟動導致例外觸發,
總結
沒啥好總結的,代碼注釋比較清楚,
如果對你有用,一健三連走一波!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/336228.html
標籤:其他
下一篇:一看就懂的貪心演算法
