我正在嘗試從 S3 下載一個大檔案并將其資料發送到另一個執行 http 請求的參與者,然后保留回應。我想限制該參與者發送的請求數量,因此我需要處理背壓。
我嘗試做這樣的事情:
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)
問題是我看到它只從檔案中讀取 30 行作為為并行性設定的限制。我不確定這是實作我所尋找的正確方法
uj5u.com熱心網友回復:
如果原因不是Sink.head我在評論中提到的使用,您可以使用Sink.actorRefWithBackpressure.
示例代碼:
class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()
uj5u.com熱心網友回復:
正如 Johny 在他的評論中所指出的,這Sink.head就是導致流僅處理大約 30 個元素的原因。發生的事情大約是:
Sink.head表示對 1 個元素的需求- 這種需求通過第二個傳播
mapAsync - 當需求達到第一個時
mapAsync,因為那個并行度為 30,它表示對 30 個元素的需求 - CSV 決議階段發出 30 個元素
- 當收到來自客戶端actor的帶有第一個元素的ask的回應時,回應向下傳播到persistactor的ask
- 來自 CSV 決議階段的另外一個元素的需求被發出信號
- 當持久角色回應時,回應進入接收器
- 由于接收器
Sink.head一旦接收到元素就會取消流,因此流會被拆除 - 任何已發送但正在等待回應的客戶端參與者的請求仍將得到處理
持久參與者的回應與 CSV 決議并向客戶端參與者發送請求之間存在一些競爭:如果后者更快,客戶端參與者可能會處理 31 行。
如果您只想要Future[Done]在處理完每個元素后都Sink.last可以使用此代碼,則可以很好地使用此代碼。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/366266.html
下一篇:Scala選項和一些不匹配
