我正在深化我在 fs2 方面的知識,并想嘗試使用 fs2-kafka 來替換 akka 流的用例。這個想法很簡單,從 kafka 讀取并通過 http 請求將資料發布到接收器,然后在成功時提交回 kafka。到目前為止,我無法真正弄清楚 http 部分。在 akka 流 / akka http 中,你有一個開箱即用的流https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-連接池
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
與 akka 流完美集成。
我試圖看看我是否可以用 http4s 和 fs2 做類似的事情。
有沒有人有任何參考資料、代碼示例、博客以及展示如何進行這種集成的內容。到目前為止,我唯一能想到的是,將流包裝到客戶端資源的使用方法中,即
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
即使那樣我也不確定整個事情
uj5u.com熱心網友回復:
typelevel生態系統的事情是一切都只是一個庫,您不需要關于它們中有多少相互作用的示例,您只需要了解每個庫的作業原理和組合的基本規則。
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
請注意,我將所有這些都寫在了腦海中,只需快速瀏覽一下檔案,您就需要更改許多內容(尤其是型別,例如Data& Result)。以及調整諸如錯誤處理和何時提交回Kafka 之類的東西。
但是,我希望這可以幫助您了解如何構建代碼。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/312784.html
上一篇:替換嵌套映射中的值
