我想寫一個完全基于ZIO堆疊設計的應用程式。 我是這個框架的新手,所以也許解決方案是微不足道的,我誤解了一些重要的東西。 我面臨以下問題。 我需要用REST接收的命令來取消對kafka主題的訂閱。 同時,我也需要通過REST來訂閱主題。 我使用zio-kafka寫了以下代碼,描述了一個訂閱話題并將事件列印到控制臺的效果:
<private val consumerSettings = ConsumerSettings(List("localhost: 9092"))。) withGroupId("MyConsumerGroup")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)
private val managedConsumer = Consumer.make(consumerSettings)
val consumer: ZLayer[Clock with Blocking。Throwable, Has[Consumer] ] = ZLayer. fromManaged(managedConsumer)
def startStream。ZIO[Console with Any with Has[Consumer] with Clock。Throwable, Unit] =
Consumer.subscribeAnd(Subscription.texts("myTopic"))
.plainStream(Serde.string, Serde.string)
.tap(cr => zio.console.putStrLn(cr.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.run(ZSink.forach(_.commit))
然后我使用zhttp描述了REST端點:
private val app = HttpApp.fromEffectFunction{
case Method. POST -> Root / "stop"/span> => for {
_ <- ZIO.serviceWith[Consumer] (_.unsubscribe)
_ <- zio.console.putStrLn("stoped")
} yield Response.ok
case Method. POST -> Root / "start" => for {
_ <- startStream.fork
_ <- zio.console.putStrLn("started")
} yield Response.ok
}
private val server = Server.port(8080) Server.app(應用)。
最后我用main方法來運行我的簡單程式:
override def run(args。List[String])。) URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.provideSomeLayer(consumer Console.live).fork
_ <- server.make.use(_ => console.putStrLn("server started"/span>) *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto EventLoopGroup.auto() consumer)
} yield()).exitCode
它運行得很好,但問題是當我運行程式時,它對/stop請求作出反應,但Consumer仍然被訂閱,并且訊息仍然從主題讀取。
如果我在運行我的程式時,只有服務器的效果,就像下面這樣:
override def run(args。List[String])。) URIO[zio.ZEnv, ExitCode] =
server.make.use(_ => console.putStrLn("server started") *> ZIO.ever)
.provideCustomLayer(ServerChannelFactory.auto EventLoopGroup.auto() consumer)
.exitCode
而在我呼叫/start端點后,在控制臺我可以看到消費者是活的,我可以看到一些關于kafka集群的資訊,但沒有從主題中讀取訊息。
請告訴我哪里錯了,哪里是我的誤解。
謝謝。
uj5u.com熱心網友回復:
你正在傳遞一個consumer層兩次,這是有效的。這意味著取消訂閱的消費者是不一樣的。
那么
你是怎么做到的?那怎么辦呢
override def run(args: List[String])。) URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.ever)
} yield ()).providSomeLayer(ServerChannelFactory.auto EventLoopGroup.auto() consumer)
.exitCode
注意:可能無法編譯,沒有經過測驗
。轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/326319.html
標籤:
