我有一個事件源物體 (C),它需要更改其狀態以回應另一個不同型別 (P) 物體的狀態更改。C 的狀態是否應該實際改變的邏輯非常復雜,要計算的資料存在于 C 中;此外,C 的許多實體應該監聽 P 的一個實體,并且實體集會隨著時間的推移而增加,所以我寧愿他們從知道 P 的 ID 的流中拉出,而不是讓 P 跟蹤所有的 ID Cs 并推給他們。
我正在考慮做一些事情,例如:
- 標記 P 事件的投影
- 有一個發送到 C 的訂閱(P.id)命令
- 如果 C 尚未訂閱 P(它只能訂閱一個,并且不應更改),則觸發事件 Subscribed(P.id)
- 回應事件,使用 Akka-persistent-query 實作標記為 1 的事件流,將它們映射到命令,并使用同步異步運行,將它們發送到我的 ES 物體參考
這似乎有點像在事件處理程式中運行流的反模式。我想知道是否有更好/更受支持的方式來做到這一點,而上游不必知道下游。我決定不使用 Akka pub-sub,因為它最多只進行一次交付,并且我想盡可能避免使用 Kafka。
uj5u.com熱心網友回復:
您絕對不想在事件處理程式中運行流:事件處理程式永遠不會產生副作用。
假設您希望 a從未運行的時間(包括曾經運行過的時間)C獲取事件,這表明應該為每個. 由于訂閱將針對一個特定的 ,我會認真考慮不標記,而是使用流來獲取 a 的所有事件并忽略不感興趣的事件。在流中,您將這些轉換為API 中的命令,包括帶有命令的事件流中的偏移量,并將其發送到(對于至少一次交付,帶有詢問的 a 很有用;將持久化一個事件記錄它處理偏移量:這允許命令是冪等的,因為CCCPeventsByPersistenceIdPCPCmapAsyncCC 如果偏移量小于或等于其狀態下的高水位偏移量,則可以確認該命令)。
此流在成功持久化Subscribed(P.id)事件后由命令處理程式啟動(在本例中從偏移量 0 開始),然后在持久性參與者重新水合后如果狀態顯示它已訂閱(在本例中從一加高水補)。
此處不使用標記的理由是假設C不感興趣的事件數量小于未訂閱的帶有來自Ps的標記的事件數量(請注意,對于大多數持久性插件,C標簽越多,開銷就越大:僅由物體的一個特定實體使用的標簽通常不是一個好主意)。如果所討論的標簽很少見,則此假設可能不成立,eventsByTag并且按 id 過濾可能很有用。
當然,這確實有為每個 運行離散流的缺點C:取決于C訂閱給定的 s 的數量P,這樣做的開銷可能很大,并且被趕上的訂閱者的流將特別浪費。C在這種情況下,將命令傳遞給給定的訂閱 s 的責任P可以轉移到參與者身上。在這種情況下,唯一真正的變化是在哪里C運行流,而是通過詢問參與者從P. 因為這種方法顯著提高了復雜性(尤其是在管理何時Cs 加入和退出共享的“追趕”流),我傾向于建議從每個流的C方法開始,然后轉到共享流(還值得注意的是,可以有多個共享流:實際上,我傾向于將共享流設定為 per- ActorSystem(例如,每個感興趣的“節點單例” P),以免涉及遠程處理),因為進行轉換并不困難(從C's 的角度來看,實際上并沒有適應的命令是來自它啟動的流還是來自其他參與者運行的流的區別)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/412164.html
標籤:
下一篇:加特林的自定義饋線
