該賞金過期5天。此問題的答案有資格獲得 200聲望獎勵。 Blankman正在從信譽良好的來源尋找答案:
請解釋為什么修復與我現在的相比有效
我使用這個示例代碼客戶端連接到我的 websocket 服務,但目前它只是連接然后關閉。
如何保持此連接打開且永不關閉?
建立連接后,我希望它保持打開狀態,直到我關閉應用程式。
package docs.http.scaladsl
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
Github 參考:https : //github.com/akka/akka-http/blob/v10.2.6/docs/src/test/scala/docs/http/scaladsl/WebSocketClientFlow.scala
更新 我有與上面相同的代碼,但我更新了我的源代碼,如下所示:
val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))
val sources: Source[Message, NotUsed] =
Source.combine(source1, source2, Source.maybe)(Concat(_))
所以我可以看到我的源 1 和源 2 被發送到 websocket,但是 websocket 沒有像它應該的那樣開始流式傳輸它的值,它只是掛起。
不知道我做錯了什么......
uj5u.com熱心網友回復:
Akka 檔案指出您的情況:
Akka HTTP WebSocket API 不支持半關閉連接,這意味著如果任一流完成,則整個連接將關閉(在交換“關閉握手”或 3 秒超時后)。
在您的情況下,outgoing(作為 a Source.single)在發出TextMessage. 該webSocketFlow接收完成訊息,然后眼淚就下來了連接。
解決方案是在outgoing完成時延遲,甚至可能永遠延遲(或至少直到應用程式被終止)。
在您不想通過 websocket 發送訊息的情況下,兩個標準源對于延遲完成可能很有用。
Source.maybe實作為 aPromise您可以使用可選的終止訊息來完成。除非承諾完成,否則它不會完成。Source.never永遠不會完成。您可以通過不完成來實作這一點Source.maybe,但這比這要少。
那么它在代碼中會是什么樣子呢?
val outgoing =
Source.single(TextMessage("hello world!"))
.concat(Source.never)
對于Source.maybe,你會想.concatMat,這樣的Promise可用于完成; 這確實意味著您將獲得類似于val (completionPromise, upgradeResponse, closed)整體物化價值的東西:
val outgoing =
Source.single(TextMessage("hello world!"))
.concatMat(Source.maybe[TextMessage])(Keep.right)
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
在你想通過socket發送任意多條訊息的情況下,Source.actorRef或者Source.queue得心應手的情況下:將訊息發送到物化actor ref通過websocket連接發送(發送一個特殊的訊息來完成源)或者offer訊息到佇列然后complete它。
val outgoing =
Source.actorRef[TextMessage](
completionMatcher = {
case Done =>
CompletionStrategy.draining // send the messages already sent before completing
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropNew
)
val ((sendToSocketRef, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
sendToSocketRef ! TextMessage("hello world!")
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/327844.html
上一篇:如何使用scala將整數串列中的ApproxQuanitiles計算到SparkDataFrame列中
下一篇:MVC中的正確結構與spring
