我想將檔案從 s3 流式傳輸到要決議和豐富的 actor 并將輸出寫入其他檔案。parserActor 的數量應該受到限制,例如
application.conf
akka{
actor{
deployment {
HereClient/router1 {
router = round-robin-pool
nr-of-instances = 28
}
}
}
}
code
val writerActor = actorSystem.actorOf(WriterActor.props())
val parser = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
但是寫入檔案的actor應該限制為1(單例)
我嘗試做類似的事情
val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.map (record => record ! parser)
但我不確定是否正確處理了背壓。有什么建議嗎?
uj5u.com熱心網友回復:
我認為您應該使用“異步”操作之一
也許這個其他 q/a 給你一些 insperation異步處理 akka 流并寫入檔案接收器
uj5u.com熱心網友回復:
實際上,您的解決方案是無視背壓。
在保持背壓的同時讓流與 actor 互動的正確方法是使用akka-stream(參考)的詢問模式支持。
根據我對您示例的理解,您有 2 個單獨的演員互動點:
- 將記錄發送給決議參與者(通過路由器)
- 將決議后的記錄發送給單例寫入actor
我會做的是類似于以下內容:
val writerActor = actorSystem.actorOf(WriterActor.props())
val parserActor = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.ask[ParsedRecord](28)(parserActor)
.ask[WriteAck](writerActor)
.runWith(Sink.ignore)
這個想法是您將所有GenericRecord元素發送到parserActor,它將以ParsedRecord. 作為示例,我們將并行度指定為 28,因為這是您配置的實體數,但是只要您使用的值高于角色實體的實際數量,任何角色都不會遭受作業饑餓。
一旦得到parseActor決議結果的回復(這里由 表示ParsedRecord),我們就應用相同的模式與單例作者 actor 進行互動。請注意,這里我們沒有指定并行度,因為我們只有一個實體,因此一次發送超過 1 條訊息是沒有意義的(實際上,由于在異步邊界進行緩沖,無論如何都會發生這種情況,但這只是一個內置優化)。在這種情況下,我們希望 writer actor 回復 aWriteAck以通知我們寫入已成功,我們可以發送下一個元素。
使用這種方法,您可以在整個流中保持背壓。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/312782.html
上一篇:chisel如何連接到這樣的埠?
下一篇:替換嵌套映射中的值
