什么是Actor模式
Actors 為最低級別的“計算單元”
以上解釋來自官方檔案,看起來“晦澀難懂”,大白話就是說Actors模式是一段需要單執行緒執行的代碼塊,
實際開發中我們經常會有一些邏輯不能并發執行,我們常用的做法就是加鎖,例如:
lock(obj) { //dosomething... }
或者用Redis等中間件,為分布式應用加一些分布式鎖,遺憾的是,使用顯式鎖定機制容易出錯, 它們很容易導致死鎖,并可能對性能產生嚴重影響,Actors模式為單執行緒邏輯提供了一種更好的選擇,
什么時候用Actors
- 需要單執行緒執行,比如需要加lock
- 邏輯可以被劃分為小的執行單元
作業原理

Dapr啟動app時,Sidecar呼叫Actors獲取配置資訊,之后Sidecar將Actors的資訊發送到安置服務(Placement Service),安置服務會將不同的Actor型別根據其Id和Actor型別磁區,并將Actor資訊廣播到所有dapr實體,

在客戶端呼叫某個Actor時,安置服務會根據其Id和Actor型別,找到其所在的dapr實體,并執行其方法,
呼叫Actor方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
<actorType>:執行組件型別,<actorId>:要呼叫的特定參與者的 ID,<method>:要呼叫的方法
計時器Timers和提醒器Reminders
Actor可以設定timer和reminder設定執行Actor的時間,有點像我們常用的定時任務,但是timer和reminder也存在不同,
- timer只作用于激活狀態的Actor,一個Actor長期不被呼叫,其自己的空閑計時器會逐漸累積,到一定時間后會被Dapr銷毀,timer沒法作用于已銷毀的Actor,
- reminder則可以作用于所有狀態的Actor,主要方式是重置空閑計時器,使其處于活躍狀態
操作timer
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
到期時間(due time)表示注冊后 timer 將首次觸發的時間, period 表示timer在此之后觸發的頻率, 到期時間為0表示立即執行, 負 due times 和負 periods 都是無效,
下面的請求體配置了一個 timer, dueTime 9秒, period 3秒, 這意味著它將在9秒后首次觸發,然后每3秒觸發一次,
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
下面的請求體配置了一個 timer, dueTime 0秒, period 3秒, 這意味著它將在注冊之后立即觸發,然后每3秒觸發一次,
{ "dueTime":"0h0m0s0ms", "period":"0h0m3s0ms" }
操作reminder
POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
到期時間(due time)表示注冊后 reminders將首次觸發的時間, period 表示在此之后 reminders 將觸發的頻率, 到期時間為0表示立即執行, 負 due times 和負 periods 都是無效, 若要注冊僅觸發一次的 reminders ,請將 period 設定為空字串,
下面的請求體配置了一個 reminders, dueTime 9秒, period 3秒, 這意味著它將在9秒后首次觸發,然后每3秒觸發一次,
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
下面的請求體配置了一個 reminders, dueTime 0秒, period 3秒, 這意味著它將在注冊之后立即觸發,然后每3秒觸發一次,
{ "dueTime":"0h0m0s0ms", "period":"0h0m3s0ms" }
下面的請求體配置了一個 reminders, dueTime 15秒, period 空字串, 這意味著它將在15秒后首次觸發,之后就不再被觸發,
{ "dueTime":"0h0m15s0ms", "period":"" }
資料持久化
使用 Dapr 狀態管理構建塊保存執行組件狀態, 由于執行組件可以一輪執行多個狀態操作,因此狀態存盤組件必須支持多項事務, 撰寫本文時,以下狀態存盤支持多項事務:
- Azure Cosmos DB
- MongoDB
- MySQL
- PostgreSQL
- Redis
- RethinkDB
- SQL Server
若要配置要與執行組件一起使用的狀態存盤組件,需要將以下元資料附加到狀態存盤配置:
- name: actorStateStore value: "true"
win10自承載模式下已默認設定此項 C:\Users\<username>\.dapr\components\statestore.yaml
專案實體
Actor操作
下面將通過一個審核流程的例子來演示,
還是用前面的FrontEnd專案,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore,

定義IOrderStatusActor介面,需要繼承自IActor
using Dapr.Actors; using System.Threading.Tasks; namespace FrontEnd.ActorDefine { public interface IOrderStatusActor : IActor { Task<string> Paid(string orderId); Task<string> GetStatus(string orderId); } }
定義OrderStatusActor實作IOrderStatusActor,并繼承自Actor
using Dapr.Actors.Runtime; using System.Threading.Tasks; namespace FrontEnd.ActorDefine { public class OrderStatusActor : Actor, IOrderStatusActor { public OrderStatusActor(ActorHost host) : base(host) { } public async Task<string> Paid(string orderId) { // change order status to paid await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid"); return orderId; } public async Task<string> GetStatus(string orderId) { return await StateManager.GetStateAsync<string>(orderId); } } }
需要注意的是,執行組件方法的回傳型別必須為 Task 或 Task<T> , 此外,執行組件方法最多只能有一個引數, 回傳型別和引數都必須可 System.Text.Json 序列化,
Actor的api是必需的,因為 Dapr 挎斗呼叫應用程式來承載和與執行組件實體進行互動,所以在Startup的Configure中配置
app.UseEndpoints(endpoints => { endpoints.MapActorsHandlers(); // ....... });
Startup類也是用于注冊特定執行組件型別的位置, 在ConfigureServices ScoreActor 使用注冊 services.AddActors :
services.AddActors(options => { options.Actors.RegisterActor<OrderStatusActor>(); });
為測驗這個Actor,需要定義一個介面呼叫,新增ActorController
using Dapr.Actors; using Dapr.Actors.Client; using FrontEnd.ActorDefine; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; namespace FrontEnd.Controllers { [Route("[controller]")] [ApiController] public class ActorController : ControllerBase { [HttpGet("paid/{orderId}")] public async Task<ActionResult> PaidAsync(string orderId) { var actorId = new ActorId(orderId); var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor"); var result = await proxy.Paid(orderId); return Ok(result); } } }
ActorProxy.Create 為創建代理實體, Create方法采用兩個引數:標識特定執行組件和執行組件 ActorId 型別, 它還具有一個泛型型別引數,用于指定執行組件型別所實作的執行組件介面, 由于服務器和客戶端應用程式都需要使用執行組件介面,它們通常存盤在單獨的共享專案中,
下面通過postman測驗下,呼叫成功

查看redis中的資料
127.0.0.1:6379> keys * 1) "test_topic" 2) "frontend||guid" 3) "frontend||name" 5) "newOrder" 6) "frontend||OrderStatusActor||myid-123||123" 7) "myapp2||key2" 8) "myapp2||key1" 9) "deathStarStatus" 10) "myapp||name" 127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123 1) "data" 2) "\"init\"" 3) "version" 4) "1"
可以發現actor資料的命名規則是appName||ActorName||ActorId||key
同樣可以使用注入的方式創建proxy,ActorController中注入IActorProxyFactory
private readonly IActorProxyFactory _actorProxyFactory; public ActorController(IActorProxyFactory actorProxyFactory) { _actorProxyFactory = actorProxyFactory; }
新增獲取資料介面
[HttpGet("get/{orderId}")] public async Task<ActionResult> GetAsync(string orderId) { var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>( new ActorId("myid-" + orderId), "OrderStatusActor"); return Ok(await proxy.GetStatus(orderId)); }
postman測驗

Timer操作
使用Actor基類的 RegisterTimerAsync 方法計劃計時器,在OrderStatusActor類中新增方法
public Task StartTimerAsync(string name, string text) { return RegisterTimerAsync( name, nameof(TimerCallbackAsync), Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(3)); } public Task TimerCallbackAsync(byte[] state) { var text = Encoding.UTF8.GetString(state); _logger.LogInformation($"Timer fired: {text}"); return Task.CompletedTask; }
StartTimerAsync方法呼叫 RegisterTimerAsync 來計劃計時器, RegisterTimerAsync 采用五個引數:
- 計時器的名稱,
- 觸發計時器時要呼叫的方法的名稱,
- 要傳遞給回呼方法的狀態,
- 首次呼叫回呼方法之前要等待的時間,
- 回呼方法呼叫之間的時間間隔, 可以指定 以
TimeSpan.FromMilliseconds(-1)禁用定期信號,
在OrderStatusActor構造方法中呼叫StartTimerAsync
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();
通過呼叫paid介面實體化一個Actor,即可開啟timer

查看控制臺,timer觸發成功
== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == Timer fired: this is a test timer
TimerCallbackAsync方法以二進制形式接收用戶狀態, 在示例中,回呼在將狀態寫入日志之前將狀態 string 解碼回 ,
可以通過呼叫 來停止計時器 UnregisterTimerAsync :
public Task StopTimerAsync(string name) { return UnregisterTimerAsync(name); }
Reminder操作
使用Actor基類的 RegisterReminderAsync 方法計劃計時器,在OrderStatusActor類中新增方法
public Task SetReminderAsync(string text) { return RegisterReminderAsync( "test-reminder", Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(1)); } public Task ReceiveReminderAsync( string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period) { if (reminderName == "test-reminder") { var text = Encoding.UTF8.GetString(state); Logger.LogWarning($"reminder fired: {text}"); } return Task.CompletedTask; }
RegisterReminderAsync方法類似于 RegisterTimerAsync ,但不必顯式指定回呼方法, 如上面的示例所示,實作 IRemindable.ReceiveReminderAsync 以處理觸發的提醒,
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
ReceiveReminderAsync觸發提醒時呼叫 方法, 它采用 4 個引數:
- 提醒的名稱,
- 注冊期間提供的用戶狀態,
- 注冊期間提供的呼叫到期時間,
- 注冊期間提供的呼叫周期,
在OrderStatusActor構造方法中呼叫SetReminderAsync
SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();
通過呼叫paid介面實體化一個Actor,即可開啟reminder

查看控制臺,reminder觸發成功
== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == reminder fired: this is a test reminder
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/302909.html
標籤:.NET技术
