我正在使用可重新啟動的源和接收器處理 akka 流中的錯誤。
object Main extends App {
implicit val system: ActorSystem = ActorSystem("akka-streams-system")
val restartSettings =
RestartSettings(1.seconds, 10.seconds, 0.2d)
val restartableSource = RestartSource.onFailuresWithBackoff(restartSettings) {() => {
Source(0 to 10)
.map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!"))
}}
val restartableSink: Sink[String, NotUsed] = RestartSink.withBackoff(restartSettings){
() => Sink.fold("")((_, newVal) => {
if(newVal == "3") {
println(newVal " Exception")
throw new RuntimeException("Kabooom!!!") // TRIGGERRING A FAILURE expecting the steam to restart just the sink.
} else {
println(newVal " sink")
}
newVal
})
}
restartableSource.runWith(restartableSink)
}
我在不同的場景下分別打破源和匯。我首先打破接收器,期望接收器重新啟動并一遍又一遍地重新處理newVal == 3訊息。但似乎接收器中的錯誤只是被丟棄了,只重試了源故障,因此源最終被重新啟動并重新處理從 0 開始的事件。
我正在模仿一個場景,我想從源中讀取(比如說從檔案中)并有一個 HTTP 接收器,它獨立地重試失敗的 HTTP 請求,而無需重新啟動整個流的管道。
我使用上述共享代碼得到的輸出如下。
0 sink
1 sink
2 sink
3 Exception
4 sink
[WARN] [01/10/2022 09:13:14.647] [akka-streams-system-akka.actor.default-dispatcher-6] [RestartWithBackoffSource(akka://akka-streams-system)] Restarting stream due to failure [1]: java.lang.RuntimeException: Boom!
java.lang.RuntimeException: Boom!
at Main$.$anonfun$restartableSource$2(Main.scala:18)
at Main$.$anonfun$restartableSource$2$adapted(Main.scala:16)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:52)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
對于推理為什么會發生這種情況以及如何獨立于源重新啟動接收器的任何幫助,我將不勝感激。
uj5u.com熱心網友回復:
您RestartSink 正在重新啟動(而不是在重新啟動其他任何東西的程序中):如果不是,您將永遠不會4 sink在3 Exception. 出于某種原因,它沒有記錄,但這可能是由于流屬性(最近幾個月重新啟動流中的記錄也發生了一些行為變化,因此記錄可能會因您正在運行的版本而異)。
從檔案中RestartSink:
重新啟動程序本質上是有損的,因為取消和發送訊息之間沒有協調。當被包裹的 Sink 確實取消時,這個 Sink 將背壓,但是任何已經發送的元素可能已經丟失。
這基本上是因為在一般情況下流階段是無記憶的。在您的Sink.fold示例中,它將以干凈狀態(即"")重新啟動。根據我的經驗,這確實使RestartSink和RestartFlow比RestartSource.
對于您描述的用例,我傾向于使用一個mapAsync階段 withakka.pattern.RetrySupport通過Future基于 - 的 API 發送 HTTP 請求并在失敗時重試請求:
val restartingSource: Source[Element, _] = ???
restartingSource.mapAsync(1) { elem =>
import akka.pattern.RetrySupport._
// will need an implicit ExecutionContext and an implicit Scheduler (both are probably best obtained from the ActorSystem)
val sendRequest = () => {
// Future-based HTTP call
???
}
retry(
attempt = sendRequest,
attempts = Int.MaxValue,
minBackoff = 1.seconds,
maxBackoff = 10.seconds,
randomFactor = 0.2
)
}.runWith(Sink.ignore)
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/412162.html
標籤:
