前言
隨著分布式架構微服務的興起,DDD(領域驅動設計)、CQRS(命令查詢職責分離)、EDA(事件驅動架構)、ES(事件溯源)等概念也一并成為時下的火熱概念,我也在早些時候閱讀了一些大佬的分析文,學習相關概念,不過一直有種霧里看花、似懂非懂的感覺,經過一段時間的學習和研究大佬的代碼后,自己設計實作了一套我消化理解后的代碼,為了突出重點,避免受到大量實作細節的干擾,當然也是懶(這才是主要原因),其中的所有基礎設施都使用了現成的庫,所實作的研究成果也做成了傻瓜式一鍵體驗(我對對著黑框框敲命令沒什么興趣,能點兩下滑鼠搞定的事我絕不在鍵盤上敲又臭又長的命令,敲命令能敲出優越感的人我覺得應該是抖M),
正文
DDD(領域驅動設計)
這一定是最群魔亂舞的一個概念,每個大佬都能講出一大篇演講稿,但都或多或少存在差異或分歧,在我初看 DDD 時,我就被整懵了,這到底是咋回事?
現在回過頭來看,DDD 其實是一個高階思想概念,并不能指導開發者如何敲鍵盤,是指匯入如何思考領域問題,而不是指匯入思考出具體的領域的,正是因為中間隔了一層虛幻飄渺的概念,導致不同的人得出了不同的結論,還好 DDD 存在一些比較具體容易落實的概念,現在就來講下我對這些常見基礎概念的理解和我編碼時的基本原則,希望大家能在看大佬的文章時不用一臉懵逼,也進行下心得交流,
Entity(物體)
物體是一個存盤資料的類,如果類中包含自身的合法性驗證規則之類的方法,一般稱之為充血模型,相對的單純保存資料的則稱為貧血模型(有時也叫做 POCO 類),物體有一個重要性質,相等性是由標識屬性決定的,這個標識可以是一個簡單的 int 型的 Id,也可以是多個內部資料的某種組合(類似資料庫表的復合欄位主鍵),除標識外的其他東西均不對兩個物體物件的相等性產生影響,并且物體的資料屬性是可更改的,
有很多大佬認為物體應該是充血的,但在我看來,貧血的似乎更好,因為需求的不穩定性可能導致這些規則并不穩定,或規則本身并不唯一,在不同場合可能需要不同規則,這時候充血模型無論怎么辦都很別扭,如果把規則定義和校驗交給外部組件,這些需求就很容易滿足,比如使用 FluentValidate 為一種物體定義多套規則或對內部的規則條目按情況重新組合,
ValueObject(值物件)
值物件也是用來存盤資料的類,與物體相對,值物件沒有標識屬性,其相等性由所有內部屬性決定,當且僅當兩個值物件實體的所有屬性一一相等時,這兩個值物件相等,并且值物件的所有屬性為只讀,僅能在建構式中進行唯一一次設定,如果希望修改某個值物件的某一屬性,唯一的辦法是使用新的值物件替換舊的值物件,并且值物件經常作為物體的屬性存在,
這個概念看起來和物體特別相似,都是用來存盤資料的,但也有些性質上的根本不同,網上的大佬通常會為值物件撰寫基類,但我認為,值物件和物體在代碼實作上并沒有這么大的區別,可以看作整數和小數在計算機中表現為不同的資料型別,但在數學概念上他們沒有區別,僅僅只是因為離散的計算機系統無法完美表示連續的數學數字而產生的縫合怪,我傾向于根據類的代碼定義所表現出來的性質與誰相符就將其視為誰,而不是看實作的介面或繼承的基類,因為需求的不確定性會導致他們可能會發生轉換,根據代碼進行自我描述來判斷可以避免很多潛在的麻煩,
Aggregate,Aggregate Root(聚合及聚合根)
聚合根表示一個領域所操作的頂級物體型別,其他附屬資料都是聚合根的內部屬性,聚合根和其所屬的其他物體的組合稱為聚合,這是一個純概念性的東西,對領域物體的操作必須從聚合根開始,也就是說確保資料完整性的基本單位是聚合,大佬的代碼中經常會用一個空介面來表示聚合根,如果某個物體實作了這個介面,就表示這個物體可以是一個聚合根,請注意,聚合根不一定必須是頂級型別,也可以是其他物體的一個屬性,這表示一個物體在,某些情況下是聚合根,而其他情況下是另一個聚合根的內部屬性,也就是說物體之間并非嚴格的樹狀關系,而是一般有向圖狀關系,
我認為定義這樣的空介面實際意不大,反而可能造成一些誤會,如果某個物體由于需求變動導致不再會成為聚合根,那這個物體事實上將不再是聚合根,但人是會犯錯的,很可能忘記去掉聚合根介面,這時代碼與事實將產生矛盾,所以我認為聚合根應該基于事實而不是代碼,當一個物體不再會作為聚合根使用時,將相關代碼洗掉,就同時表示它不再是聚合根,閱讀代碼的人也因為看不到相關代碼而自動認為它不是聚合根,在代碼中的體現方式與下一個的概念有關,
Repository(倉儲)
倉儲表示對聚合根的持久化的抽象,在代碼上可表現為宣告了增刪查改的相關方法的介面,而倉儲的實作類負責具體解決如何對聚合根物體進行增刪查改,例如在倉儲內部使用資料庫完成具體作業,
如果一個倉儲負責管理一個聚合根物體的持久化或者說存取,那這個物體就是一個事實上的聚合根,那么在這里,就可以在代碼操作上將看到某個物體被倉儲管理等價為這個物體是聚合根,反之就不是,也就是說,如果將某個物體的倉儲的最后一個實際使用代碼洗掉,這個物體就在事實上不再是聚合根,此時代碼表現與事實將完美同步,不再會產生矛盾,至于由于沒看到某個物體的倉儲而將物體誤認為不是聚合根,這其實并沒有任何問題,這說明在你所關注的領域中這個物體確實不是聚合根,而這個物體可能作為聚合根使用的領域你根本不關心,所以看不到,那這個物體是否在其他領域作為聚合根使用對你而言其實是無所謂的,
Domain Service(領域服務)
這就涉及到業務代碼的撰寫了,如果一個業務需要由多個聚合根配合完成,也就是需要多個倉儲,那么就應該將這些對倉儲的呼叫封裝進一個服務,統一對外暴露提供服務,
如果這些倉儲操作需要具有事務性,也可以在這里進行協調管理,如果某個業務只需要一個倉儲參與,要不要專門封裝一個服務就看你高興了,
CQRS(命令查詢職責分離)
CQRS 本質上是一種指導思想,指導開發者如何設計一個低耦合高可擴展架構的思想,傳統的 CURD 將對資料的操作分為 讀、寫、改、刪,將他們封裝在一起導致他們將緊密耦合在相同的資料源中,不利于擴展,CQRS 則將對資料的操作分為會改變資料源的和不會改變資料源的,前者稱為命令,后者稱為查詢,將他們分別封裝能讓他們各自使用不同的資料源,提高可擴展性,
其中命令是一個會改變資料源,但不回傳任何值的方法;查詢是會回傳值,但絕不會改變資料源的方法,但是在我的編碼中,命令是可以回傳值的,至于要回傳什么,根據實際情況調整,比如最簡單的回傳一個 bool 表示操作是否成功以決定接下來的業務流程該走向何方,這是很常見的情況,所以在我的概念里,一個方法是命令還是查詢實際上只看這個方法是否會改變資料源,要封裝在一起還是分別封裝都無所謂,建議分開封裝到不同的倉儲中,通過倉儲關聯到具體的資料源,命令和查詢的倉儲關聯到不同的資料源的時候,自然就完成了讀寫分離,通過起名來明示方法的目的應該可以輕松分辨一個方法屬于命令還是查詢,只要腦子里有這個概念,要實作擴展辦法多的是,
事件驅動架構(EDA)
可以說所有圖形界面(Gui)編程都是清一色的事件驅動架構,這東西一點也不稀奇,說白了,EDA 就是一種被動架構,通過某些事情的發生來觸發某些操作的執行,否則系統就隨時待命,按兵不動,
EDA 的實作需要一個中介才能實作,在 Windows 中,這個東西叫做 Windows 訊息佇列(訊息回圈)和事件處理器,同樣的,在非 Gui 編程中也需要這倆東西,但通常被稱為訊息總線和訊息消費者,在分布式系統中,這個中介將不與系統在同一行程甚至不在同一設備中,稱為分布式訊息總線,這樣在開發時可以分成兩撥,一撥負責寫生產并發送事件的代碼,一撥負責寫接收事件資訊并進行處理的代碼,他們之間的溝通僅限于交流關心的事件叫什么以及事件攜帶了什么資訊,至于產生的訊息是如何送到正確的消費端并觸發消費處理器的,那是訊息總線的事,如果一個訊息總線需要這兩撥人了解中間的程序甚至需要自己去實作,那這個訊息總線是個廢品,也起不到什么解耦的效果,甚至是個拖后腿的東西,
EDA + CQRS
當他們結合在一起,就產生了命令或查詢的發起和實際處理實作可以分離的效果,命令的發起方向命令總線發送一條命令訊息并帶上必要引數,消費方收到訊息后獲取引數完成任務并回傳結果,命令可以看作一種特殊的事件,命令只由一個命令處理器處理,并可向發送方回傳一個處理結果;事件由所有對同種事件感興趣的事件處理器處理,不向事件發送方回傳任何結果,
事件處理器的執行順序是不確定的,所以任何事件處理器都必須獨立完成事件處理,如果兩個事件處理之間存在因果依賴,應該在前置事件處理后由事件處理器發布新事件,并由后置事件處理器去處理前置事件產生的新事件,而不是讓它們處理同一事件,
ES(事件溯源)
事件溯源表示能追查一個事件的源頭,甚至與之相關的其他事件的概念,說句大白話就是刨祖墳,ES 對歷史狀態回溯的需求有著天然的支持,最常見的如撤銷重做,而 ES 一般會配合 EDA 使用,ES 保存 EDA 產生的事件資訊,并且這些資訊有只讀性和因果連貫性,這順便能讓我們對系統中的物體究竟是如何一步一步變成現在這個樣子有一個清晰的了解,畢竟物體具有可變性,物體資訊一旦改變,舊的資訊就會丟失,ES 剛好彌補了這個缺陷,
代碼展示說明
此處的事件訊息中介使用 MediatR 實作,
介面
DDD 相關
物體
定義一個物體的基本要素,實作介面的類就是物體,值物件沒有介面或基類,只看代碼所展現的性質是否符合值物件的定義,聚合根沒有介面或基類,只看物體是否被倉儲使用,領域服務說白了就是個打包封裝,根據情況來決定,例如重構時提取方法即可視為封裝服務,在此處可簡單認為沒有實作物體介面的資料類是值物件:
1 /// <summary> 2 /// 物體介面 3 /// </summary> 4 public interface IEntity {} 5 6 /// <summary> 7 /// 泛型物體介面,約束Id屬性 8 /// </summary> 9 public interface IEntity<TKey> : IEntity 10 where TKey : IEquatable<TKey> 11 { 12 TKey Id { get; set; } 13 }
倉儲介面
倉儲介面細分為可讀倉儲和可寫倉儲,可寫倉儲有一個分支為可批量提交倉儲,表示修改操作會在呼叫提交保存方法后批量保存,也就是事務(就是用來替代操作單元的,這東西就有一個提交操作,名字也莫名其妙,我曾經一直無法理解這東西是干嘛的),介面宣告參考 EF Core,示例實作也基于 EF Core,由于已經公開了查詢介面型別的 Set 屬性,使用者可以任意自定義查詢,
1 public interface IBulkOperableVariableRepository<TResult, TVariableRepository, TEntity> 2 where TEntity : IEntity 3 where TVariableRepository : IVariableRepository<TEntity> 4 { 5 TResult SaveChanges(); 6 Task<TResult> SaveChangesAsync(CancellationToken cancellationToken); 7 } 8 9 public interface IBulkOperableVariableRepository<TVariableRepository, TEntity> 10 where TEntity : IEntity 11 where TVariableRepository : IVariableRepository<TEntity> 12 { 13 void SaveChanges(); 14 Task SaveChangesAsync(CancellationToken cancellationToken); 15 } 16 17 public interface IReadOnlyRepository<TEntity> 18 where TEntity : IEntity 19 { 20 IQueryable<TEntity> Set { get; } 21 TEntity Find(TEntity entity, bool ignoreNullValue); 22 Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue); 23 24 } 25 public interface IReadOnlyRepository<TEntity, TKey> : IReadOnlyRepository<TEntity> 26 where TEntity : IEntity<TKey> 27 where TKey : IEquatable<TKey> 28 { 29 TEntity Find(TKey key); 30 Task<TEntity> FindAsync(TKey key); 31 IQueryable<TEntity> Find(IEnumerable<TKey> keys); 32 } 33 34 public interface IVariableRepository<TEntity> 35 where TEntity : IEntity 36 { 37 void Add(TEntity entity); 38 Task AddAsync(TEntity entity, CancellationToken cancellationToken); 39 void Update(TEntity entity); 40 Task UpdateAsync(TEntity entity, CancellationToken cancellationToken); 41 void Delete(TEntity entity, bool isSoftDelete); 42 Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken); 43 void AddRange(IEnumerable<TEntity> entities); 44 Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken); 45 void UpdateRange(IEnumerable<TEntity> entities); 46 Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken); 47 void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete); 48 Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken); 49 } 50 public interface IVariableRepository<TEntity, TKey> : IVariableRepository<TEntity> 51 where TEntity : IEntity<TKey> 52 where TKey : IEquatable<TKey> 53 { 54 void Delete(TKey key, bool isSoftDelete); 55 Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken); 56 void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete); 57 Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken); 58 } 59 60 public interface IRepository<TEntity> : IVariableRepository<TEntity>, IReadOnlyRepository<TEntity> 61 where TEntity : IEntity 62 { 63 } 64 65 public interface IRepository<TEntity, TKey> : IRepository<TEntity>, IVariableRepository<TEntity, TKey>, IReadOnlyRepository<TEntity, TKey> 66 where TEntity : IEntity<TKey> 67 where TKey : IEquatable<TKey> 68 { 69 }
EF Core 專用特化版倉儲介面
1 public interface IEFCoreRepository<TEntity, TDbContext> : IReadOnlyRepository<TEntity>, IVariableRepository<TEntity>, IBulkOperableVariableRepository<int, IEFCoreRepository<TEntity, TDbContext>, TEntity> 2 where TEntity : class, IEntity 3 where TDbContext : DbContext 4 { } 5 6 public interface IEFCoreRepository<TEntity, TKey, TDbContext> : IEFCoreRepository<TEntity, TDbContext>, IReadOnlyRepository<TEntity, TKey>, IVariableRepository<TEntity, TKey> 7 where TEntity : class, IEntity<TKey> 8 where TKey : IEquatable<TKey> 9 where TDbContext : DbContext 10 { }
CQRS+EDA 相關:
命令介面
分為帶回傳值命令和無回傳值命令
1 public interface ICommand<out TResult> : ICommand 2 { 3 } 4 5 public interface ICommand : IMessage 6 { 7 }
命令總線介面
同樣分為帶回傳值和無回傳值
1 public interface ICommandBus<in TCommand> 2 where TCommand : ICommand 3 { 4 Task SendCommandAsync(TCommand command, CancellationToken cancellationToken); 5 } 6 7 public interface ICommandBus<in TCommand, TResult> : ICommandBus<TCommand> 8 where TCommand : ICommand<TResult> 9 { 10 new Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken); 11 }
命令處理器介面
同上
1 public interface ICommandHandler<in TCommand> 2 where TCommand : ICommand 3 { 4 Task Handle(TCommand command, CancellationToken cancellationToken); 5 } 6 7 public interface ICommandHandler<in TCommand, TResult> : ICommandHandler<TCommand> 8 where TCommand : ICommand<TResult> 9 { 10 new Task<TResult> Handle(TCommand command, CancellationToken cancellationToken); 11 }
命令存盤介面
可用于歷史命令追溯,回傳值可用于回傳存盤是否成功或其他必要資訊
1 public interface ICommandStore 2 { 3 void Save(ICommand command); 4 5 Task SaveAsync(ICommand command, CancellationToken cancellationToken); 6 } 7 8 public interface ICommandStore<TResult> : ICommandStore 9 { 10 new TResult Save(ICommand command); 11 12 new Task<TResult> SaveAsync(ICommand command, CancellationToken cancellationToken); 13 }
事件介面
沒有回傳值
1 public interface IEvent : IMessage 2 { 3 }
事件總線介面
同上
1 public interface IEventBus 2 { 3 void PublishEvent(IEvent @event); 4 5 Task PublishEventAsync(IEvent @event, CancellationToken cancellationToken); 6 } 7 8 public interface IEventBus<TResult> : IEventBus 9 { 10 new TResult PublishEvent(IEvent @event); 11 12 new Task<TResult> PublishEventAsync(IEvent @event, CancellationToken cancellationToken); 13 }
事件處理器介面
同上
1 public interface IEventHandler<in TEvent> 2 where TEvent : IEvent 3 { 4 Task Handle(TEvent @event, CancellationToken cancellationToken); 5 }
事件存盤介面
同命令存盤介面
1 public interface IEventStore 2 { 3 void Save(IEvent @event); 4 5 Task SaveAsync(IEvent @event, CancellationToken cancellationToken = default); 6 } 7 8 public interface IEventStore<TResult> : IEventStore 9 { 10 new TResult Save(IEvent @event); 11 12 new Task<TResult> SaveAsync(IEvent @event, CancellationToken cancellationToken = default); 13 }
(命令、事件)訊息基礎介面
1 public interface IMessage 2 { 3 Guid Id { get; } 4 5 DateTimeOffset Timestamp { get; } 6 }
相關介面定義完畢,
實作
EF Core 泛型倉儲
未知主鍵的物體使用物體物件為條件查找時,使用動態生成運算式的方法
1 public class EFCoreRepository<TEntity, TKey, TDbContext> : EFCoreRepository<TEntity, TDbContext>, IEFCoreRepository<TEntity, TKey, TDbContext> 2 where TEntity : class, IEntity<TKey> 3 where TKey : IEquatable<TKey> 4 where TDbContext : DbContext 5 { 6 public EFCoreRepository(TDbContext dbContext) : base(dbContext) 7 { 8 } 9 10 public virtual void Delete(TKey key, bool isSoftDelete) 11 { 12 var entity = Find(key); 13 Delete(entity, isSoftDelete); 14 } 15 16 public virtual Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken = default) 17 { 18 Delete(key, isSoftDelete); 19 return Task.CompletedTask; 20 } 21 22 public virtual void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete) 23 { 24 var entities = Find(keys).ToArray(); 25 dbSet.AttachRange(entities); 26 DeleteRange(entities, isSoftDelete); 27 } 28 29 public virtual Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken = default) 30 { 31 DeleteRange(keys, isSoftDelete); 32 return Task.CompletedTask; 33 } 34 35 public virtual TEntity Find(TKey key) 36 { 37 return Set.SingleOrDefault(x => x.Id.Equals(key)); 38 } 39 40 public virtual IQueryable<TEntity> Find(IEnumerable<TKey> keys) 41 { 42 return Set.Where(x => keys.Contains(x.Id)); 43 } 44 45 public override TEntity Find(TEntity entity, bool ignoreNullValue) 46 { 47 return base.Find(entity, ignoreNullValue); 48 } 49 50 public virtual Task<TEntity> FindAsync(TKey key) 51 { 52 return Set.SingleOrDefaultAsync(x => x.Id.Equals(key)); 53 } 54 55 public override Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue) 56 { 57 return base.FindAsync(entity, ignoreNullValue); 58 } 59 } 60 61 public class EFCoreRepository<TEntity, TDbContext> : IEFCoreRepository<TEntity, TDbContext> 62 where TEntity : class, IEntity 63 where TDbContext : DbContext 64 { 65 protected readonly TDbContext dbContext; 66 protected readonly DbSet<TEntity> dbSet; 67 68 protected virtual void ProcessChangedEntity() 69 { 70 var changedEntities = dbContext.ChangeTracker.Entries() 71 .Where(x => x.State == EntityState.Added || x.State == EntityState.Modified); 72 foreach (var entity in changedEntities) 73 { 74 (entity as IOptimisticConcurrencySupported)?.GenerateNewConcurrencyStamp(); 75 } 76 77 var changedEntitiesGroups = changedEntities.GroupBy(x => x.State); 78 foreach (var group in changedEntitiesGroups) 79 { 80 switch (group) 81 { 82 case var entities when entities.Key == EntityState.Added: 83 foreach (var entity in entities) 84 { 85 if (entity is IActiveControllable) 86 { 87 (entity as IActiveControllable).Active ??= true; 88 } 89 } 90 break; 91 case var entities when entities.Key == EntityState.Modified: 92 foreach (var entity in entities) 93 { 94 (entity as IEntity)?.ProcessCreationInfoWhenModified(dbContext); 95 96 if (entity is IActiveControllable && (entity as IActiveControllable).Active == null) 97 { 98 entity.Property(nameof(IActiveControllable.Active)).IsModified = false; 99 } 100 } 101 break; 102 default: 103 break; 104 } 105 } 106 } 107 108 protected virtual void ResetDeletedMark(params TEntity[] entities) 109 { 110 foreach (var entity in entities) 111 { 112 if (entity is ILogicallyDeletable) 113 { 114 (entity as ILogicallyDeletable).IsDeleted = false; 115 } 116 } 117 } 118 119 public EFCoreRepository(TDbContext dbContext) 120 { 121 this.dbContext = dbContext; 122 dbSet = this.dbContext.Set<TEntity>(); 123 } 124 125 public virtual void Add(TEntity entity) 126 { 127 dbSet.Add(entity); 128 } 129 130 public virtual Task AddAsync(TEntity entity, CancellationToken cancellationToken = default) 131 { 132 return dbSet.AddAsync(entity, cancellationToken).AsTask(); 133 } 134 135 public virtual void AddRange(IEnumerable<TEntity> entities) 136 { 137 dbSet.AddRange(entities); 138 } 139 140 public virtual Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default) 141 { 142 return dbSet.AddRangeAsync(entities, cancellationToken); 143 } 144 145 public virtual void Delete(TEntity entity, bool isSoftDelete) 146 { 147 dbSet.Attach(entity); 148 if (isSoftDelete) 149 { 150 if (entity is ILogicallyDeletable) 151 { 152 (entity as ILogicallyDeletable).IsDeleted = true; 153 } 154 else 155 { 156 throw new InvalidOperationException($"要求軟洗掉的物體不實作{nameof(ILogicallyDeletable)}介面,"); 157 } 158 } 159 else 160 { 161 dbSet.Remove(entity); 162 } 163 } 164 165 public virtual Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken = default) 166 { 167 Delete(entity, isSoftDelete); 168 return Task.CompletedTask; 169 } 170 171 public virtual void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete) 172 { 173 dbSet.AttachRange(entities); 174 foreach (var entity in entities) 175 { 176 Delete(entity, isSoftDelete); 177 } 178 } 179 180 public virtual Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken = default) 181 { 182 DeleteRange(entities, isSoftDelete); 183 return Task.CompletedTask; 184 } 185 186 public virtual TEntity Find(TEntity entity, bool ignoreNullValue) 187 { 188 var exp = GenerateWhere(dbContext, entity, ignoreNullValue); 189 190 return Set.SingleOrDefault(exp); 191 } 192 193 public virtual Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue) 194 { 195 var exp = GenerateWhere(dbContext, entity, ignoreNullValue); 196 197 return Set.SingleOrDefaultAsync(exp); 198 } 199 200 public virtual int SaveChanges() 201 { 202 ProcessChangedEntity(); 203 return dbContext.SaveChanges(); 204 } 205 206 public virtual Task<int> SaveChangesAsync(CancellationToken cancellationToken = default) 207 { 208 ProcessChangedEntity(); 209 return dbContext.SaveChangesAsync(cancellationToken); 210 } 211 212 public virtual IQueryable<TEntity> Set => dbSet.AsNoTracking(); 213 214 public virtual void Update(TEntity entity) 215 { 216 ResetDeletedMark(entity); 217 dbSet.Update(entity); 218 } 219 220 public virtual Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default) 221 { 222 Update(entity); 223 return Task.CompletedTask; 224 } 225 226 public virtual void UpdateRange(IEnumerable<TEntity> entities) 227 { 228 ResetDeletedMark(entities.ToArray()); 229 dbSet.UpdateRange(entities); 230 } 231 232 public virtual Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default) 233 { 234 UpdateRange(entities); 235 return Task.CompletedTask; 236 } 237 238 static private Expression<Func<TEntity, bool>> GenerateWhere(TDbContext dbContext, TEntity entity, bool ignoreNullValue) 239 { 240 //查找物體型別主鍵 241 var model = dbContext.Model.FindEntityType(typeof(TEntity)); 242 var key = model.FindPrimaryKey(); 243 244 //查找所有主鍵屬性,如果沒有主鍵就使用所有物體屬性 245 IEnumerable<PropertyInfo> props; 246 if (key != null) 247 { 248 props = key.Properties.Select(x => x.PropertyInfo); 249 } 250 else 251 { 252 props = model.GetProperties().Select(x => x.PropertyInfo); 253 } 254 255 //生成運算式引數 256 ParameterExpression parameter = Expression.Parameter(typeof(TEntity), "x"); 257 258 //初始化提取物體型別所有屬性資訊生成屬性訪問運算式并包裝備用 259 var keyValues = props.Select(x => new { key = x, value = https://www.cnblogs.com/coredx/p/x.GetValue(entity), propExp = Expression.Property(parameter, x) }); 260 //初始化存盤由基礎型別組成的屬性資訊(只要個空集合,實際資料在后面的回圈中填充) 261 var primitiveKeyValues = keyValues.Take(0).Where(x => IsPrimitiveType(x.key.PropertyType)); 262 //初始化基礎型別屬性的相等比較運算式存盤集合(只要個空集合,實際資料在后面的回圈中填充) 263 var equals = primitiveKeyValues.Take(0).Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value))); 264 //初始化復雜型別屬性存盤集合 265 var notPrimitiveKeyValues = primitiveKeyValues; 266 267 //如果還有元素,說明上次用于提取資訊的復雜屬性內部還存在復雜屬性,接下來用提取到的基礎型別屬性資訊生成相等比較運算式并合并到存盤集合然后繼續提取剩下的復雜型別屬性的內部屬性 268 while (keyValues.Count() > 0) 269 { 270 if (ignoreNullValue) 271 { 272 keyValues = keyValues.Where(x => x.value != null); 273 } 274 //提取由基礎型別組成的屬性資訊 275 primitiveKeyValues = keyValues.Where(x => IsPrimitiveType(x.key.PropertyType)); 276 //生成基礎型別屬性的相等比較運算式 277 equals = equals.Concat(primitiveKeyValues.Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value)))); 278 //提取復雜型別屬性 279 notPrimitiveKeyValues = keyValues.Except(primitiveKeyValues); 280 //分別提取各個復雜型別屬性內部的屬性資訊繼續生成內部屬性訪問運算式 281 keyValues = 282 from kv in notPrimitiveKeyValues 283 from propInfo in kv.value.GetType().GetProperties() 284 select new { key = propInfo, value = https://www.cnblogs.com/coredx/p/propInfo.GetValue(kv.value), propExp = Expression.Property(kv.propExp, propInfo) }; 285 } 286 287 //如果相等比較運算式有多個,將所有相等比較運算式用 && 運算連接起來 288 var and = equals.First(); 289 foreach (var eq in equals.Skip(1)) 290 { 291 and = Expression.AndAlso(and, eq); 292 } 293 294 //生成完整的過濾條件運算式,形如: (TEntity x) => { return x.a == ? && x.b == ? && x.obj1.m == ? && x.obj1.n == ? && x.obj2.u.v == ?; } 295 var exp = Expression.Lambda<Func<TEntity, bool>>(and, parameter); 296 297 //判斷某個型別是否是基礎資料型別 298 static bool IsPrimitiveType(Type type) 299 { 300 var primitiveTypes = new[] { 301 typeof(sbyte) 302 ,typeof(byte) 303 ,typeof(short) 304 ,typeof(ushort) 305 ,typeof(int) 306 ,typeof(uint) 307 ,typeof(long) 308 ,typeof(ulong) 309 ,typeof(float) 310 ,typeof(double) 311 ,typeof(decimal) 312 ,typeof(char) 313 ,typeof(string) 314 ,typeof(bool) 315 ,typeof(DateTime) 316 ,typeof(DateTimeOffset) 317 //,typeof(Enum) 318 ,typeof(Guid)}; 319 320 var tmp = 321 type.IsDerivedFrom(typeof(Nullable<>)) 322 ? Nullable.GetUnderlyingType(type) 323 : type; 324 325 return tmp.IsEnum || primitiveTypes.Contains(tmp); 326 } 327 328 return exp; 329 } 330 }
命令
命令基類
1 public abstract class MediatRCommand : MediatRCommand<Unit>, ICommand, IRequest 2 { 3 } 4 5 public abstract class MediatRCommand<TResult> : ICommand<TResult>, IRequest<TResult> 6 { 7 public Guid Id { get; } 8 9 public DateTimeOffset Timestamp { get; } 10 11 public MediatRCommand() 12 { 13 Id = Guid.NewGuid(); 14 Timestamp = DateTimeOffset.Now; 15 } 16 }
示例具體命令,命令只包含引數資訊,如何使用引數資訊完成任務是命令處理器的事
1 public class ListUserCommand : MediatRCommand<IPagedList<ApplicationUser>> 2 { 3 public PageInfo PageInfo { get; } 4 public QueryFilter QueryFilter { get; } 5 public ListUserCommand(PageInfo pageInfo, QueryFilter queryFilter) 6 { 7 PageInfo = pageInfo; 8 QueryFilter = queryFilter; 9 } 10 }
命令總線
1 public class MediatRCommandBus<TCommand, TResult> : ICommandBus<TCommand, TResult> 2 where TCommand : MediatRCommand<TResult> 3 { 4 private readonly IMediator mediator; 5 private readonly ICommandStore commandStore; 6 7 public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) 8 { 9 this.mediator = mediator; 10 this.commandStore = commandStore; 11 } 12 13 public virtual Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken = default) 14 { 15 commandStore?.SaveAsync(command, cancellationToken); 16 return mediator.Send(command, cancellationToken); 17 } 18 19 Task ICommandBus<TCommand>.SendCommandAsync(TCommand command, CancellationToken cancellationToken) 20 { 21 return SendCommandAsync(command, cancellationToken); 22 } 23 } 24 25 public class MediatRCommandBus<TCommand> : MediatRCommandBus<MediatRCommand<Unit>, Unit> 26 where TCommand : MediatRCommand<Unit> 27 { 28 public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) : base(mediator, commandStore) 29 { 30 } 31 }
命令處理器
命令處理器基類
1 public abstract class MediatRCommandHandler<TCommand, TResult> : ICommandHandler<TCommand, TResult>, IRequestHandler<TCommand, TResult> 2 where TCommand : MediatRCommand<TResult> 3 { 4 public abstract Task<TResult> Handle(TCommand command, CancellationToken cancellationToken = default); 5 6 Task ICommandHandler<TCommand>.Handle(TCommand command, CancellationToken cancellationToken) 7 { 8 return Handle(command, cancellationToken); 9 } 10 } 11 12 public abstract class MediatRCommandHandler<TCommand> : MediatRCommandHandler<TCommand, Unit> 13 where TCommand : MediatRCommand 14 { 15 }
具體命令處理器示例,使用注入的倉儲查詢資料,ApplicationUser 在這里就是事實上的聚合根物體
1 public class ListUserCommandHandler : MediatRCommandHandler<ListUserCommand, IPagedList<ApplicationUser>> 2 { 3 private IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository; 4 5 public ListUserCommandHandler(IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository) 6 { 7 this.repository = repository; 8 } 9 10 public override Task<IPagedList<ApplicationUser>> Handle(ListUserCommand command, CancellationToken cancellationToken = default) 11 { 12 return repository.Set 13 .OrderBy(x => x.Id) 14 .ToPagedListAsync(command.PageInfo.PageNumber, command.PageInfo.PageSize); 15 } 16 }
命令存盤
什么都沒干,實際使用時可以使用資料庫保存相關資訊
1 public class InProcessCommandStore : ICommandStore<bool> 2 { 3 public bool Save(ICommand command) 4 { 5 return SaveAsync(command).Result; 6 } 7 8 public Task<bool> SaveAsync(ICommand command, CancellationToken cancellationToken = default) 9 { 10 return Task.FromResult(true); 11 } 12 13 void ICommandStore.Save(ICommand command) 14 { 15 Save(command); 16 } 17 18 Task ICommandStore.SaveAsync(ICommand command, CancellationToken cancellationToken) 19 { 20 return SaveAsync(command, cancellationToken); 21 } 22 }
事件部分和命令基本相同,具體代碼可以到文章末尾下載專案代碼查看,
使用
在 Startup.ConfigureServices 方法中注冊相關服務,事件總線和命令總線都使用 MediatR 實作,.Net Core 內置 DI 支持注冊泛型服務,所以某個物體在實際使用時注入泛型倉儲就表示這個物體是聚合根,不用提前定義具體的聚合根物體倉儲,所以洗掉使用代碼相當于洗掉了倉儲定義,
1 services.AddScoped(typeof(ICommandBus<>), typeof(MediatRCommandBus<>)); 2 services.AddScoped(typeof(ICommandBus<,>), typeof(MediatRCommandBus<,>)); 3 services.AddScoped(typeof(ICommandStore), typeof(InProcessCommandStore)); 4 services.AddScoped(typeof(IEventBus), typeof(MediatREventBus)); 5 services.AddScoped(typeof(IEventBus<>), typeof(MediatREventBus<>)); 6 services.AddScoped(typeof(IEventStore), typeof(InProcessEventStore)); 7 services.AddScoped(typeof(IEFCoreRepository<,>), typeof(EFCoreRepository<,>)); 8 services.AddScoped(typeof(IEFCoreRepository<,,>), typeof(EFCoreRepository<,,>)); 9 services.AddMediatR(typeof(ListUserCommandHandler).GetTypeInfo().Assembly);
示例使用比較簡單,就不定義服務了,如果需要定義服務,那么使用服務的一般是命令處理器,倉儲由服務使用,這里命令處理器直接使用倉儲,在控制器中注入命令總線,向命令總線發送命令就可以獲取結果,MediatR 會自動根據發送的命令型別查找匹配的命令處理器去呼叫,
1 [ApiController] 2 [Route("api/[controller]")] 3 public class UsersController : ControllerBase 4 { 5 private readonly ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> _commandBus; 6 private readonly IMapper _mapper; 7 8 public UsersController(ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> commandBus, IMapper mapper) 9 { 10 _commandBus = commandBus; 11 _mapper = mapper; 12 } 13 14 /// <summary> 15 /// 獲取用戶串列 16 /// </summary> 17 /// <param name="page">頁碼</param> 18 /// <param name="size">每頁條目數</param> 19 /// <returns>用戶串列</returns> 20 [HttpGet] 21 [Produces("application/json")] //宣告介面回應 json 資料 22 public async Task<IActionResult> GetAsync(int? page, int? size) 23 { 24 var cmd = new ListUserCommand(new PageInfo(page ?? 1, size ?? 10), new QueryFilter()); 25 var users = await _commandBus.SendCommandAsync(cmd, default); 26 27 return new JsonResult( 28 new 29 { 30 rows = users.Select(u => _mapper.Map<ApplicationUserDto>(u)), 31 total = users.PageCount, //總頁數 32 page = users.PageNumber, //當前頁碼 33 records = users.TotalItemCount //總記錄數 34 } 35 ); 36 } 37 }
使用就是這么簡單,使用者根本不需要知道命令處理器的存在,把命令發送到總線,等著接收結果就可以了,
事件一般由命令處理器引發,可以改造命令處理器用 DI 注入事件總線,然后在命令處理器中向事件總線發送事件,事件總線就會自動觸發相應的事件處理器,
結語
完整的流程大概就是:控制器使用注入的服務執行業務流程,業務服務向命令總線發送命令,命令總線觸發處理器處理命令,命令處理器向事件總線發送事件,事件總線觸發事件處理器處理事件,事件處理器在處理事件后向事件總線發送新的事件觸發后續事件處理器繼續處理新的事件(如果需要),直到最后不發送事件的事件處理器完成處理,整個流程完結,在此程序中總線會自動呼叫注入的總線訊息存盤來持久化命令和事件,至此,一個環環相扣的極簡 DDD+CQRS+EDA+ES 架構搭建完成!
想要實際體驗的朋友可以到文章末尾下載專案并運行體驗,啟動除錯后訪問 /swagger 然后嘗試體驗呼叫 api/users 介面,
轉載請完整保留以下內容并在顯眼位置標注,未經授權洗掉以下內容進行轉載盜用的,保留追究法律責任的權利!
本文地址:https://www.cnblogs.com/coredx/p/12364960.html
完整源代碼:Github
里面有各種小東西,這只是其中之一,不嫌棄的話可以Star一下,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/220698.html
標籤:領域驅動設計
上一篇:Pr:視頻防抖效果
