我正在嘗試構建一個有關Stream.concurrently在 fs2 中使用該方法的示例。我正在開發生產者/消費者模式,使用 aQueue作為共享狀態:
import cats.effect.std.{Queue, Random}
object Fs2Tutorial extends IOApp {
val random: IO[Random[IO]] = Random.scalaUtilRandom[IO]
val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)
val producer: IO[Nothing] = for {
r <- random
q <- queue
p <-
r.betweenInt(1, 11)
.flatMap(q.offer)
.flatTap(_ => IO.sleep(1.second))
.foreverM
} yield p
val consumer: IO[Nothing] = for {
q <- queue
c <- q.take.flatMap { n =>
IO.println(s"Consumed $n")
}.foreverM
} yield c
val concurrently: Stream[IO, Nothing] = Stream.eval(producer).concurrently(Stream.eval(consumer))
override def run(args: List[String]): IO[ExitCode] = {
concurrently.compile.drain.as(ExitCode.Success)
}
}
我希望程式列印一些"Consumed n",一些n。但是,該程式不會向控制臺列印任何內容。
上面的代碼有什么問題?
uj5u.com熱心網友回復:
上面的代碼有什么問題?
您Queue在消費者和生產者中使用的不是相同的,而是他們每個人都在創建自己的新獨立Queue (Random順便說一句,同樣的情況)
這是新手常犯的錯誤,他們尚未掌握資料型別背后的主要原則,例如 IO
當你這樣做時,val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)你說的queue是一個程式,在評估時會產生一個 type 的值Queue[IO, Unit],這就是所有這一切的重點。
該程式成為一個值,并且作為任何值,您可以以任何方式對其進行操作以產生新值,例如flatMap,當兩者都使用 soconsumer并producer通過flatMapping queue它們創建新的獨立程式/值來創建新程式時。
您可以像這樣修復該代碼:
import cats.effect.{IO, IOApp}
import cats.effect.std.{Queue, Random}
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.duration._
object Fs2Tutorial extends IOApp.Simple {
override final val run: IO[Unit] = {
val resources =
(
Random.scalaUtilRandom[IO],
Queue.bounded[IO, Int](10)
).tupled
val concurrently =
Stream.eval(resources).flatMap {
case (random, queue) =>
val producer =
Stream
.fixedDelay[IO](1.second)
.evalMap(_ => random.betweenInt(1, 11))
.evalMap(queue.offer)
val consumer =
Stream.fromQueueUnterminated(queue).evalMap(n => IO.println(s"Consumed $n"))
producer.concurrently(consumer)
}
concurrently.interruptAfter(10.seconds).compile.drain >> IO.println("Finished!")
}
}
(你可以看到它在這里運行)。
PS:我建議您查看Fabio Labella的“程式即價值”系列: https ://systemfw.org/archive.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/422311.html
標籤:
