主頁 > 移動端開發 > 雨露均沾的OkHttp—WebSocket長連接的使用&原始碼決議

雨露均沾的OkHttp—WebSocket長連接的使用&原始碼決議

2020-10-22 17:01:35 移動端開發

前言

最近老板又來新需求了,要做一個物聯網相關的app,其中有個需求是客戶端需要收發服務器不定期發出的訊息,
內心OS:
?? 這咋整呢?通過介面輪詢?定時訪問介面,有資料就更新?
?? 不行不行,這樣浪費資源了,還耗電,會導致很多請求都是無效的網路操作,
?? 那就長連接唄?WebSocket協議好像不錯,通過握手建立長連接后,可以隨時收發服務器的訊息,那就它了!
?? 怎么集成呢?正好前段時間復習OkHttp原始碼的時候發現了它是支持Websocket協議的,那就用它試試吧!(戲好多,演不下去了??)

開淦!

WebSocket介紹

先簡單介紹下WebSocket
我們都知道Http是處于應用層的一個通信協議,但是只支持單向主動通信,做不到服務器主動向客戶端推送訊息,而且Http是無狀態的,即每次通信都沒有關聯性,導致跟服務器關系不緊密,

為了解決和服務器長時間通信的痛點呢,HTML5規范引出了WebSocket協議(知道這名字咋來的吧,人家HTML5規范引出的,隨爸姓),是一種建立在TCP協議基礎上的全雙工通信的協議,他跟Http同屬于應用層協議,下層還是需要通過TCP建立連接,

但是,WebSocketTCP連接建立后,還要通過Http進行一次握手,也就是通過Http發送一條GET請求訊息給服務器,告訴服務器我要建立WebSocket連接了,你準備好哦,具體做法就是在頭部資訊中添加相關引數,然后服務器回應我知道了,并且將連接協議改成WebSocket,開始建立長連接,

這里貼上請求頭和回應頭資訊,從網上找了一張圖:

3851594110877_.pic.jpg

簡單說明下引數:

  • URL一般是以ws或者wss開頭,ws對應Websocket協議,wss對應在TLS之上的WebSocket,類似于HttpHttps的關系,
  • 請求方法為GET方法,
  • Connection:Upgrade,表示客戶端要連接升級,不用Http協議,
  • Upgrade:websocket, 表示客戶端要升級建立Websocket連接,
  • Sec-Websocket-Key:key, 這個key是隨機生成的,服務器會通過這個引數驗證該請求是否有效,
  • Sec-WebSocket-Version:13, websocket使用的協議,一般就是13,
  • Sec-webSocket-Extension:permessage-deflate,客戶端指定的一些擴展協議,比如這里permessage-deflate就是WebSocket的一種壓縮協議,
  • 回應碼101,表示回應協議升級,后續的資料互動都按照Upgradet指定的WebSocket協議來,

OkHttp實作

添加OkHttp依賴

    implementation("com.squareup.okhttp3:okhttp:4.7.2")

實作代碼

首先是初始化OkHttpClientWebSocket實體:

    /**
     * 初始化WebSocket
     */
    public void init() {
        mWbSocketUrl = "ws://echo.websocket.org";
        mClient = new OkHttpClient.Builder()
                .pingInterval(10, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(mWbSocketUrl)
                .build();
        mWebSocket = mClient.newWebSocket(request, new WsListener());
    }

這里主要是配置了OkHttp的一些引數,以及WebSocket的連接地址,其中newWebSocket方法就是進行WebSocket的初始化和連接,

這里要注意的點是pingInterval方法的配置,這個方法主要是用來設定WebSocket連接的保活,
相信做過長連接的同學都知道,一個長連接一般要隔幾秒發送一條訊息告訴服務器我在線,而服務器也會回復一個訊息表示收到了,這樣就確認了連接正常,客戶端和服務器端都在線,

如果服務器沒有按時收到這個訊息那么服務器可能就會主動關閉這個連接,節約資源,
客戶端沒有正常收到這個回傳的訊息,也會做一些類似重連的操作,所以這個保活訊息非常重要,

我們稱這個訊息叫作心跳包,一般用PING,PONG表示,像乒乓球一樣,一來一回,
所以這里的pingInterval就是設定心跳包發送的間隔時間,設定了這個方法之后,OkHttp就會自動幫我們發送心跳包事件,也就是ping包,當間隔時間到了,沒有收到pong包的話,監聽事件中的onFailure方法就會被呼叫,此時我們就可以進行重連,

但是由于實際業務需求不一樣,以及okhttp中心跳包事件給予我們權限較少,所以我們也可以自己完成心跳包事件,即在WebSocket連接成功之后,開始定時發送ping包,在下一次發送ping包之前檢查上一個pong包是否收到,如果沒收到,就視為例外,開始重連,感興趣的同學可以看看文末的相關原始碼,

建立連接后,我們就可以正常發送和讀取訊息了,也就是在上文WsListener監聽事件中表現:

    //監聽事件,用于收訊息,監聽連接的狀態
    class WsListener extends WebSocketListener {
        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosed(webSocket, code, reason);
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosing(webSocket, code, reason);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
            super.onFailure(webSocket, t, response);
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            Log.e(TAG, "客戶端收到訊息:" + text);
            onWSDataChanged(DATE_NORMAL, text);
           //測驗發訊息
            webSocket.send("我是客戶端,你好啊");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
            super.onMessage(webSocket, bytes);
        }

        @Override
        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            Log.e(TAG,"連接成功!");
        }
    }
    
    
    //發送String訊息
    public void send(final String message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }
    
    /**
     * 發送byte訊息
     * @param message
     */
    public void send(final ByteString message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }    

    //主動斷開連接
    public void disconnect(int code, String reason) {
        if (mWebSocket != null)
            mWebSocket.close(code, reason);
    }
    

這里要注意,回呼的方法都是在子執行緒回呼的,如果需要更新UI,需要切換到主執行緒,

基本操作就這么多,還是很簡單的吧,初始化Websocket——連接——連接成功——收發訊息,

其中WebSocket類是一個操作介面,主要提供了以下幾個方法

  • send(text: String) 發送一個String型別的訊息
  • send(bytes: ByteString) 發送一個二進制型別的訊息
  • close(code: Int, reason: String?) 關閉WebSocket連接

如果有同學想測驗下WebSocket的功能但是又沒有實際的服務器,怎么辦呢?
其實OkHttp官方有一個MockWebSocket服務,可以用來模擬服務端,下面我們一起試一下:

模擬服務器

首先集成MockWebSocket服務庫:

    implementation 'com.squareup.okhttp3:mockwebserver:4.7.2'

然后就可以新建MockWebServer,并加入MockResponse作為接收訊息的回應,

        MockWebServer mMockWebServer = new MockWebServer();
        MockResponse response = new MockResponse()
                .withWebSocketUpgrade(new WebSocketListener() {
                    @Override
                    public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
                        super.onOpen(webSocket, response);
                        //有客戶端連接時回呼
                        Log.e(TAG, "服務器收到客戶端連接成功回呼:");
                        mWebSocket = webSocket;
                        mWebSocket.send("我是服務器,你好呀");
                    }

                    @Override
                    public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
                        super.onMessage(webSocket, text);

                        Log.e(TAG, "服務器收到訊息:" + text);
                    }

                    @Override
                    public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
                        super.onClosed(webSocket, code, reason);
                        Log.e(TAG, "onClosed:");
                    }
                });

        mMockWebServer.enqueue(response);

這里服務器端在收到客戶端連接成功訊息后,給客戶端發送了一條訊息,
要注意的是這段代碼要在子執行緒執行,因為主執行緒不能進行網路操作,

然后就可以去初始化Websocket客戶端了:

        //獲取連接url,初始化websocket客戶端
        String websocketUrl = "ws://" + mMockWebServer.getHostName() + ":" + mMockWebServer.getPort() + "/";
        WSManager.getInstance().init(websocketUrl);

ok,運行專案

    //運行結果
    E/jimu: mWbSocketUrl=ws://localhost:38355/
    E/jimu: 服務器收到客戶端連接成功回呼:
    E/jimu: 連接成功!
    E/jimu: 客戶端收到訊息:我是服務器,你好呀
    E/jimu: 服務器收到訊息:我是客戶端,你好啊

相關的WebSocket管理類和模擬服務器類我也上傳到github了,有需要的同學可以文末自取,

原始碼決議

WebSocket整個流程無非三個功能:連接,接收訊息,發送訊息,下面我們就從這三個方面分析下具體是怎么實作的,

連接

通過上面的代碼我們得知,WebSocket連接是通過newWebSocket方法,直接點進去看這個方法:

  override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
    val webSocket = RealWebSocket(
        taskRunner = TaskRunner.INSTANCE,
        originalRequest = request,
        listener = listener,
        random = Random(),
        pingIntervalMillis = pingIntervalMillis.toLong(),
        extensions = null, // Always null for clients.
        minimumDeflateSize = minWebSocketMessageToCompress
    )
    webSocket.connect(this)
    return webSocket
  }

這里做了兩件事:

  • 初始化RealWebSocket,主要是設定了一些引數(比如pingIntervalMillis心跳包時間間隔,還有監聽事件之類的)
  • connect 方法進行WebSocket連接

繼續查看connect方法:

connect(WebSocket連接握手)

  fun connect(client: OkHttpClient) {
    //***
    val webSocketClient = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build()
    val request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Extensions", "permessage-deflate")
        .build()
    call = RealCall(webSocketClient, request, forWebSocket = true)
    call!!.enqueue(object : Callback {
      override fun onResponse(call: Call, response: Response) {
        
        //得到資料流
        val streams: Streams
        try {
          checkUpgradeSuccess(response, exchange)
          streams = exchange!!.newWebSocketStreams()
        } 
        
        //***
        // Process all web socket messages.
        try {
          val name = "$okHttpName WebSocket ${request.url.redact()}"
          initReaderAndWriter(name, streams)
          listener.onOpen(this@RealWebSocket, response)
          loopReader()
        } catch (e: Exception) {
          failWebSocket(e, null)
        }
      }
    })
  }

上一篇使用篇文章中說過,Websocket連接需要一次Http協議的握手,然后才能把協議升級成WebSocket,所以這段代碼就體現出這個功能了,

首先就new了一個用來進行Http連接的request,其中Header的引數就表示我要進行WebSocket連接了,引數決議如下:

  • Connection:Upgrade,表示客戶端要連接升級
  • Upgrade:websocket, 表示客戶端要升級建立Websocket連接
  • Sec-Websocket-Key:key, 這個key是隨機生成的,服務器會通過這個引數驗證該請求是否有效
  • Sec-WebSocket-Version:13, websocket使用的版本,一般就是13
  • Sec-webSocket-Extension:permessage-deflate,客戶端指定的一些擴展協議,比如這里permessage-deflate就是WebSocket的一種壓縮協議,

Header設定好之后,就呼叫了callenqueue方法,這個方法大家應該都很熟悉吧,OkHttp里面對于Http請求的異步請求就是這個方法,
至此,握手結束,服務器回傳回應碼101,表示協議升級,

然后我們繼續看看獲取服務器回應之后又做了什么?
在發送Http請求成功之后,onResponse回應方法里面主要表現為四個處理邏輯:

  • Http流轉換成WebSocket流,得到Streams物件,這個流后面會轉化成輸入流和輸出流,也就是進行發送和讀取的操作流
  • listener.onOpen(this@RealWebSocket, response),回呼了介面WebSocketListeneronOpen方法,告訴用戶WebSocket已經連接
  • initReaderAndWriter(name, streams)
  • loopReader()

前兩個邏輯還是比較好理解,主要是后兩個方法,我們分別決議下,
首先看initReaderAndWriter方法,

initReaderAndWriter(初始化輸入流輸出流)

  //RealWebSocket.kt

  @Throws(IOException::class)
  fun initReaderAndWriter(name: String, streams: Streams) {
    val extensions = this.extensions!!
    synchronized(this) {
      //***
      
      //寫資料,發送資料的工具類
      this.writer = WebSocketWriter()
      
      //設定心跳包事件
      if (pingIntervalMillis != 0L) {
        val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
        taskQueue.schedule("$name ping", pingIntervalNanos) {
          writePingFrame()
          return@schedule pingIntervalNanos
        }
      }
      //***
    }

		//***
		
		//讀取資料的工具類
    reader = WebSocketReader(     
      ***
      frameCallback = this,
      ***
    )
  }
  
  internal fun writePingFrame() {
   //***
    try {
      writer.writePing(ByteString.EMPTY)
    } catch (e: IOException) {
      failWebSocket(e, null)
    }
  }  
  

這個方法主要干了兩件事:

  • 實體化輸出流輸入流工具類,也就是WebSocketWriterWebSocketReader,用來處理資料的收發,
  • 設定心跳包事件,如果pingIntervalMillis引數不為0,就通過計時器,每隔pingIntervalNanos發送一個ping訊息,其中writePingFrame方法就是發送了ping幀資料,

接收訊息處理訊息

loopReader

接著看看這個loopReader方法是干什么的,看這個名字我們大膽猜測下,難道這個方法就是用來回圈讀取資料的?去代碼里找找答案:

  fun loopReader() {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader!!.processNextFrame()
    }
  }

代碼很簡單,一個while回圈,回圈條件是receivedCloseCode == -1的時候,做的事情是reader!!.processNextFrame()方法,繼續:

  //WebSocketWriter.kt
  fun processNextFrame() {
    //讀取頭部資訊
    readHeader()
    if (isControlFrame) {
      //如果是控制幀,讀取控制幀內容
      readControlFrame()
    } else {
      //讀取普通訊息內容
      readMessageFrame()
    }
  }
  
  //讀取頭部資訊
  @Throws(IOException::class, ProtocolException::class)
  private fun readHeader() {
    if (closed) throw IOException("closed")
    
    try {
     //讀取資料,獲取資料幀的前8位
      b0 = source.readByte() and 0xff
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
    }    
    //***
    //獲取資料幀的opcode(資料格式)
    opcode = b0 and B0_MASK_OPCODE
    //是否為最終幀
    isFinalFrame = b0 and B0_FLAG_FIN != 0
    //是否為控制幀(指令)
    isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0

    //判斷最終幀,獲取幀長度等等
  }  
  
  
  //讀取控制幀(指令)
    @Throws(IOException::class)
  private fun readControlFrame() {
    if (frameLength > 0L) {
      source.readFully(controlFrameBuffer, frameLength)
    }

    when (opcode) {
      OPCODE_CONTROL_PING -> {
      //ping 幀
        frameCallback.onReadPing(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_PONG -> {
        //pong 幀
        frameCallback.onReadPong(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_CLOSE -> {
        //關閉 幀
        var code = CLOSE_NO_STATUS_CODE
        var reason = ""
        val bufferSize = controlFrameBuffer.size
        if (bufferSize == 1L) {
          throw ProtocolException("Malformed close payload length of 1.")
        } else if (bufferSize != 0L) {
          code = controlFrameBuffer.readShort().toInt()
          reason = controlFrameBuffer.readUtf8()
          val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
          if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
        }
        //回呼onReadClose方法
        frameCallback.onReadClose(code, reason)
        closed = true
      }
    }
  }
  
  //讀取普通訊息
  @Throws(IOException::class)
  private fun readMessageFrame() {
    
    readMessage()

    if (readingCompressedMessage) {
      val messageInflater = this.messageInflater
          ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
      messageInflater.inflate(messageFrameBuffer)
    }

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
    } else {
      frameCallback.onReadMessage(messageFrameBuffer.readByteString())
    }
  }  
  

代碼還是比較直觀,這個processNextFrame其實就是讀取資料用的,首先讀取頭部資訊,獲取資料幀的型別,判斷是否為控制幀,再分別去讀取控制幀資料或者普通訊息幀資料,

資料幀格式

問題來了,什么是資料頭部資訊,什么是控制幀
這里就要說下WebSocket的資料幀了,先附上一個資料幀格式:


   0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
  |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
  |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126)
  |N|V|V|V|       |  |S|             |
  | |1|2|3|       |  |K|             |
  +-+-+-+-+-------+  +-+-------------+
  |                      Extended payload length(if LENGTH=127)
  +                                  +-------------------------------
  |      Extended payload length     | Masking-key,if Mask set to 1
  +----------------------------------+-------------------------------
  |   Masking-key                    |       Data
  +----------------------------------+-------------------------------
  |                                Data
  +----------------------------------+-------------------------------


我承認,我懵逼了,
冷靜冷靜,一步一步分析下吧,

首先每一行代表4個位元組,一共也就是32位數,哦,那也就是幾個位元組而已嘛,每個位元組有他自己的代表意義唄,這樣想是不是就很簡單了,下面來具體看看每個位元組,

第1個位元組:

  • 第一位是FIN碼,其實就是一個標示位,因為資料可能多幀操作嘛,所以多幀情況下,只有最后一幀的FIN設定成1,標示結束幀,前面所有幀設定為0,
  • 第二位到第四位是RSV碼,一般通信兩端沒有設定自定義協議,就默認為0,
  • 后四位是opcode,我們叫它操作碼,這個就是判斷這個資料幀的型別了,一般有以下幾個被定義好的型別:

1) 0x0 表示附加資料幀
2) 0x1 表示文本資料幀
3) 0x2 表示二進制資料幀
4) 0x3-7 保留用于未來的非控制幀
5) 0x8 表示連接關閉
6) 0x9 表示ping
7) 0xA 表示pong
8) 0xB-F 保留用于未來的非控制幀

是不是發現了些什么,這不就對應了我們應用中的幾種格式嗎?2和3對應的是普通訊息幀,包括了文本和二進制資料,567對應的就是控制幀格式,包括了close,ping,pong

第2個位元組:

  • 第一位是Mask掩碼,其實就是標識資料是否加密混淆,1代表資料經過掩碼的,0是沒有經過掩碼的,如果是1的話,后續就會有4個位元組代表掩碼key,也就是資料幀中Masking-key所處的位置,
  • 后7位是LENGTH,用來標示資料長度,因為只有7位,所以最大只能儲存1111111對應的十進制數127長度的資料,如果需要更大的資料,這個儲存長度肯定就不夠了,
    所以規定來了,1) 小于126長度則資料用這七位表示實際長度,2) 如果長度設定為126,也就是二進制1111110,就代表取額外2個位元組表示資料長度,共是16位表示資料長度,3) 如果長度設定為127,也就是二進制1111111,就代表取額外8個位元組,共是64位表示資料長度,

需要注意的是LENGHT的三種情況在一個資料幀里面只會出現一種情況,不共存,所以在圖中是用if表示,同樣的,Masking-key也是當Mask為1的時候才存在,

所以也就有了資料幀里面的Extended payload length(LENGTH=126)所處的2個位元組,以及Extended payload length(LENGTH=127)所處的8個位元組,

最后的位元組部分自然就是掩碼key(Mask為1的時候才存在)和具體的傳輸資料了,
還是有點暈吧??,來張圖總結下:
資料幀格式.jpeg

好了,了解了資料幀格式后,我們再來讀原始碼就清晰多了,
先看看怎么讀的頭部資訊并決議的:

  //取資料幀前8位資料
  b0 = source.readByte() and 0xff
  //獲取資料幀的opcode(資料格式)
  opcode = b0 and B0_MASK_OPCODE(15)
  //是否為最終幀
  isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
  //是否為控制幀(指令)
  isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0  
  • 第一句獲取頭資訊,and是按位與計算,and 0xff 意思就是按位與11111111,所以頭部資訊其實就是取了資料幀的前8位資料,一個位元組,
  • 第二句獲取opcodeand 15也就是按位與00001111,其實也就是取了后四位資料,剛好對應上opcode的位置,第一個位元組的后四位,
  • 第三句獲取是否為最終幀,剛才資料幀格式中說過,第一位FIN標識了是否為最后一幀資料,1代表結束幀,所以這里and 128 也就是按位與10000000,也就是取的第一位數,
  • 第四句獲取是否為控制幀,and 8也就是按位與00001000,取得是第五位,也就是opcode的第一位,這是什么意思呢?我們看看剛才的資料幀格式,發現從0x8開始就是所謂的控制幀了,0x8對應的二進制是1000,0x7對應的二進制是0111,發現了吧,如果為控制幀的時候,opcode第一位肯定是為1的,所以這里就判斷的第五位,

后面還有讀取第二個位元組的代碼,大家可以自己沿著這個思路自己看看,包括了讀取MASK,讀取資料長度的三種長度等,

所以這個processNextFrame方法主要做了三件事:

  • readHeader方法中,判斷了是否為控制幀,是否為結束幀,然后獲取了Mask標識,幀長度等引數
  • readControlFrame方法中,主要處理了該幀資料為ping,pong,close三種情況,并且在收到close關閉幀的情況下,回呼了onReadClose方法,這個待會要細看下,
  • readMessageFrame方法中,主要是讀取了訊息后,回呼了onReadMessage方法,

至此可以發現,其實WebSocket傳輸資料并不是一個簡單的事,只是OkHttp都幫我們封裝好了,我們只需要直接傳輸資料即可,感謝這些三方庫為我們開發作出的貢獻,不知道什么時候我也能做出點貢獻呢??,

對了,剛才說回呼也很重要,接著看看,onReadCloseonReadMessage回呼到哪了呢?還記得上文初始化WebSocketWriter的時候設定了回呼介面嗎,所以就是回呼給RealWebSocket了:

  //RealWebSocket.kt
  override fun onReadClose(code: Int, reason: String) {
    require(code != -1)

    var toClose: Streams? = null
    var readerToClose: WebSocketReader? = null
    var writerToClose: WebSocketWriter? = null
    synchronized(this) {
      check(receivedCloseCode == -1) { "already closed" }
      receivedCloseCode = code
      receivedCloseReason = reason 
      //...
    }

    try {
      listener.onClosing(this, code, reason)

      if (toClose != null) {
        listener.onClosed(this, code, reason)
      }
    } finally {
      toClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }
  
  @Throws(IOException::class)
  override fun onReadMessage(text: String) {
    listener.onMessage(this, text)
  }

  @Throws(IOException::class)
  override fun onReadMessage(bytes: ByteString) {
    listener.onMessage(this, bytes)
  }  

onReadClose回呼方法里面有個關鍵的引數,receivedCloseCode,還記得這個引數嗎?上文中決議訊息的回圈條件就是receivedCloseCode == -1,所以當收到關閉幀的時候,receivedCloseCode就不再等于-1(規定大于1000),也就不再去讀取決議訊息了,這樣整個流程就結束了,

其中還有一些WebSocketListener的回呼,比如onClosing,onClosed,onMessage等,就直接回呼給用戶使用了,至此,接收訊息處理訊息說完了,

發訊息

好了,接著說發送,看看send方法:

  @Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
    // ***
    // Enqueue the message frame.
    queueSize += data.size.toLong()
    messageAndCloseQueue.add(Message(formatOpcode, data))
    runWriter()
    return true
  }

首先,把要發送的data封裝成Message物件,然后入佇列messageAndCloseQueue,最后執行runWriter方法,這都不用猜了,runWriter肯定就要開始發送訊息了,繼續看:

  //RealWebSocket.kt
  private fun runWriter() {
    this.assertThreadHoldsLock()

    val writerTask = writerTask
    if (writerTask != null) {
      taskQueue.schedule(writerTask)
    }
  }
  
  private inner class WriterTask : Task("$name writer") {
    override fun runOnce(): Long {
      try {
        if (writeOneFrame()) return 0L
      } catch (e: IOException) {
        failWebSocket(e, null)
      }
      return -1L
    }
  }  
  
  //以下是schedule方法轉到WriterTask的runOnce方法程序

  //TaskQueue.kt
  fun schedule(task: Task, delayNanos: Long = 0L) {
    synchronized(taskRunner) {
      if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
        taskRunner.kickCoordinator(this)
      }
    }
  }
  
  internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
    //***
    if (insertAt == -1) insertAt = futureTasks.size
    futureTasks.add(insertAt, task)

    // Impact the coordinator if we inserted at the front.
    return insertAt == 0
  }  

  //TaskRunner.kt
  internal fun kickCoordinator(taskQueue: TaskQueue) {
    this.assertThreadHoldsLock()
    
    if (taskQueue.activeTask == null) {
      if (taskQueue.futureTasks.isNotEmpty()) {
        readyQueues.addIfAbsent(taskQueue)
      } else {
        readyQueues.remove(taskQueue)
      }
    }    
    
    if (coordinatorWaiting) {
      backend.coordinatorNotify(this@TaskRunner)
    } else {
      backend.execute(runnable)
    }
  }  
  
  private val runnable: Runnable = object : Runnable {
    override fun run() {
      while (true) {
        val task = synchronized(this@TaskRunner) {
          awaitTaskToRun()
        } ?: return

        logElapsed(task, task.queue!!) {
          var completedNormally = false
          try {
            runTask(task)
            completedNormally = true
          } finally {
            // If the task is crashing start another thread to service the queues.
            if (!completedNormally) {
              backend.execute(this)
            }
          }
        }
      }
    }
  }
  
  private fun runTask(task: Task) {
    try {
      delayNanos = task.runOnce()
    } 
  }  
  

代碼有點長,這里是從runWriter開始跟的幾個方法,拿到writerTask實體后,存到TaskQueuefutureTasks串列里,然后到runnable這里可以看到是一個while死回圈,不斷的從futureTasks中取出Task并執行runTask方法,直到Task為空,回圈停止,

其中涉及到兩個新的類:

  • TaskQueue類主要就是管理訊息任務串列,保證按順序執行
  • TaskRunner類主要就是做一些任務的具體操作,比如執行緒池里執行任務,記錄訊息任務的狀態(準備發送的任務佇列readyQueues,正在執行的任務佇列busyQueues等等)

而每一個Task最后都是執行到了WriterTaskrunOnce方法,也就是writeOneFrame方法:

  internal fun writeOneFrame(): Boolean {
    synchronized(this@RealWebSocket) {
      if (failed) {
        return false // Failed web socket.
      }
      writer = this.writer
      pong = pongQueue.poll()
      if (pong == null) {
        messageOrClose = messageAndCloseQueue.poll()
        if (messageOrClose is Close) {
        } else if (messageOrClose == null) {
            return false // The queue is exhausted.
        }
      }
    }

   //發送訊息邏輯,包括`pong`訊息,普通訊息,關閉訊息
    try {
      if (pong != null) {
        writer!!.writePong(pong)
      } else if (messageOrClose is Message) {
        val message = messageOrClose as Message
        writer!!.writeMessageFrame(message.formatOpcode, message.data)
        synchronized(this) {
          queueSize -= message.data.size.toLong()
        }
      } else if (messageOrClose is Close) {
        val close = messageOrClose as Close
        writer!!.writeClose(close.code, close.reason)
        // We closed the writer: now both reader and writer are closed.
        if (streamsToClose != null) {
          listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
        }
      } 
      return true
    } finally {
      streamsToClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }

這里就會執行發送訊息的邏輯了,主要有三種訊息情況處理:

  • pong訊息,這個主要是為服務器端準備的,發送給客戶端回應心跳包,
  • 普通訊息,就會把資料型別Opcode和具體資料發送過去
  • 關閉訊息,其實當用戶執行close方法關閉WebSocket的時候,也是發送了一條Close控制幀訊息給服務器告知這個關閉需求,并帶上code狀態碼reason關閉原因,然后服務器端就會關閉當前連接,

好了,最后一步了,就是把這些資料組裝成WebSocket資料幀并寫入流,分成控制幀資料和普通訊息資料幀


  //寫入(發送)控制幀
  private fun writeControlFrame(opcode: Int, payload: ByteString) {
    if (writerClosed) throw IOException("closed")
    
    val length = payload.size
    require(length <= PAYLOAD_BYTE_MAX) {
      "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
    }
    val b0 = B0_FLAG_FIN or opcode
    sinkBuffer.writeByte(b0)

    var b1 = length
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
      sinkBuffer.writeByte(b1)
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (length > 0) {
        val payloadStart = sinkBuffer.size
        sinkBuffer.write(payload)
        sinkBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(payloadStart)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    } else {
      sinkBuffer.writeByte(b1)
      sinkBuffer.write(payload)
    }

    sink.flush()
  }


  //寫入(發送)普通訊息資料幀
  @Throws(IOException::class)
  fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
    if (writerClosed) throw IOException("closed")

    messageBuffer.write(data)

    var b0 = formatOpcode or B0_FLAG_FIN
    val dataSize = messageBuffer.size
    sinkBuffer.writeByte(b0)

    var b1 = 0
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
    }
    when {
      dataSize <= PAYLOAD_BYTE_MAX -> {
        b1 = b1 or dataSize.toInt()
        sinkBuffer.writeByte(b1)
      }
      dataSize <= PAYLOAD_SHORT_MAX -> {
        b1 = b1 or PAYLOAD_SHORT
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeShort(dataSize.toInt())
      }
      else -> {
        b1 = b1 or PAYLOAD_LONG
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeLong(dataSize)
      }
    }

    if (isClient) {
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (dataSize > 0L) {
        messageBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(0L)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    }

    sinkBuffer.write(messageBuffer, dataSize)
    sink.emit()
  }


大家應該都能看懂了吧,其實就是組裝資料幀,包括Opcode,mask,資料長度等等,兩個方法的不同就在于普通資料需要判斷資料長度的三種情況,再組裝資料幀,最后都會通過sinkBuffer寫入到輸出資料流,

終于,基本的流程說的差不多了,其中還有很多細節,同學們可以自己花時間看看琢磨琢磨,比如Okio部分,還是那句話,希望大家有空自己也讀一讀相關原始碼,這樣理解才能深刻,而且你肯定會發現很多我沒說到的細節,歡迎大家討論,我也會繼續努力,最后大家給我加個油點個贊吧,感謝感謝,

總結

再來個圖總結下吧!??
OkHttp-WebSocket原始碼.jpg

參考

OkHttp原始碼
《WebSocket協議翻譯》

附件

OkHttp原始碼
WebSocket功能實作原始碼


我的公眾號:碼上積木,每天三問面試題,詳細剖析,助你成為offer收割機,

謝謝你的閱讀,如果你覺得寫的還行,就點個贊支持下吧!感謝!
你的一個??,就是我分享的動力??,

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/185795.html

標籤:Android

上一篇:android studio前言中不允許有內容錯誤解決

下一篇:android studio前言中不允許有內容錯誤解決

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【從零開始擼一個App】Dagger2

    Dagger2是一個IOC框架,一般用于Android平臺,第一次接觸的朋友,一定會被搞得暈頭轉向。它延續了Java平臺Spring框架代碼碎片化,注解滿天飛的傳統。嘗試將各處代碼片段串聯起來,理清思緒,真不是件容易的事。更不用說還有各版本細微的差別。 與Spring不同的是,Spring是通過反射 ......

    uj5u.com 2020-09-10 06:57:59 more
  • Flutter Weekly Issue 66

    新聞 Flutter 季度調研結果分享 教程 Flutter+FaaS一體化任務編排的思考與設計 詳解Dart中如何通過注解生成代碼 GitHub 用對了嗎?Flutter 團隊分享如何管理大型開源專案 插件 flutter-bubble-tab-indicator A Flutter librar ......

    uj5u.com 2020-09-10 06:58:52 more
  • Proguard 常用規則

    介紹 Proguard 入口,如何查看輸出,如何使用 keep 設定入口以及使用實體,如何配置壓縮,混淆,校驗等規則。

    ......

    uj5u.com 2020-09-10 06:59:00 more
  • Android 開發技術周報 Issue#292

    新聞 Android即將獲得類AirDrop功能:可向附近設備快速分享檔案 谷歌為安卓檔案管理應用引入可安全隱藏資料的Safe Folder功能 Android TV新主界面將顯示電影、電視節目和應用推薦內容 泄露的Android檔案暗示了傳說中的谷歌Pixel 5a與折疊屏新機 谷歌發布Andro ......

    uj5u.com 2020-09-10 07:00:37 more
  • AutoFitTextureView Error inflating class

    報錯: Binary XML file line #0: Binary XML file line #0: Error inflating class xxx.AutoFitTextureView 解決: <com.example.testy2.AutoFitTextureView android: ......

    uj5u.com 2020-09-10 07:00:41 more
  • 根據Uri,Cursor沒有獲取到對應的屬性

    Android: 背景:呼叫攝像頭,拍攝視頻,指定保存的地址,但是回傳的Cursor檔案,只有名稱和大小的屬性,沒有其他諸如時長,連ID屬性都沒有 使用 cursor.getInt(cursor.getColumnIndexOrThrow(MediaStore.Video.Media.DURATIO ......

    uj5u.com 2020-09-10 07:00:44 more
  • Android連載29-持久化技術

    一、持久化技術 我們平時所使用的APP產生的資料,在記憶體中都是瞬時的,會隨著斷電、關機等丟失資料,因此android系統采用了持久化技術,用于存盤這些“瞬時”資料 持久化技術包括:檔案存盤、SharedPreference存盤以及資料庫存盤,還有更復雜的SD卡記憶體儲。 二、檔案存盤 最基本存盤方式, ......

    uj5u.com 2020-09-10 07:00:47 more
  • Android Camera2Video整合到自己專案里

    背景: Android專案里呼叫攝像頭拍攝視頻,原本使用的 MediaStore.ACTION_VIDEO_CAPTURE, 后來因專案需要,改成了camera2 1.Camera2Video 官方demo有點問題,下載后,不能直接整合到專案 問題1.多次拍攝視頻崩潰 問題2.雙擊record按鈕, ......

    uj5u.com 2020-09-10 07:00:50 more
  • Android 開發技術周報 Issue#293

    新聞 谷歌為Android TV開發者提供多種新功能 Android 11將自動填表功能整合到鍵盤輸入建議中 谷歌宣布Android Auto即將支持更多的導航和數字停車應用 谷歌Pixel 5只有XL版本 搭載驍龍765G且將比Pixel 4更便宜 [圖]Wear OS將迎來重磅更新:應用啟動時間 ......

    uj5u.com 2020-09-10 07:01:38 more
  • 海豚星空掃碼投屏 Android 接收端 SDK 集成 六步驟

    掃碼投屏,開放網路,獨占設備,不需要額外下載軟體,微信掃碼,發現設備。支持標準DLNA協議,支持倍速播放。視頻,音頻,圖片投屏。好點意思。還支持自定義基于 DLNA 擴展的操作動作。好像要收費,沒體驗。 這里簡單記錄一下集成程序。 一 跟目錄的build.gradle添加私有mevan倉庫 mave ......

    uj5u.com 2020-09-10 07:01:43 more
最新发布
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:40:31 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:40:11 more
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:39:36 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:39:13 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:16:23 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:16:15 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:15:46 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:14:53 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:14:08 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:08:34 more