介紹
Actor模式將Actor描述為最低級別的“計算單元”,換句話說,您在一個獨立的單元(稱為actor)中撰寫代碼,該單元接收訊息并一次處理一個訊息,沒有任何并發或執行緒,
再換句話說,根據
ActorId劃分獨立計算單元后,相同的ActorId重入要排隊,可以理解為lock(ActorId)
注:這里有個反例,就是重入性的引入,這個概念目前還是Preview,它允許同一個鏈內可以重復進入,判斷的標準不止是ActorId這么簡單,即自己調自己是被允許的,這個默認是關閉的,需要手動開啟,即默認不允許自己調自己,
當您的代碼處理一條訊息時,它可以向其他參與者發送一潭訓多條訊息,或者創建新的參與者,底層運行時管理每個參與者運行的方式、時間和地點,并在參與者之間路由訊息,
大量的Actor可以同時執行,Actor彼此獨立執行,
Dapr 包含一個運行時,它專門實作了 Virtual Actor 模式, 通過 Dapr 的實作,您可以根據 Actor 模型撰寫 Dapr Actor,而 Dapr 利用底層平臺提供的可擴展性和可靠性保證,
什么時候用Actors
Actor 設計模式非常適合許多分布式系統問題和場景,但您首先應該考慮的是該模式的約束,一般來說,如果出現以下情況,請考慮使用Actors模式來為您的問題或場景建模:
- 您的問題空間涉及大量(數千個或更多)
小的、獨立且孤立的狀態和邏輯單元, - 您希望使用
不需要與外部組件進行大量互動的單執行緒物件,包括跨一組Actors查詢狀態, - 您的 Actor 實體
不會通過發出 I/O 操作來阻塞具有不可預測延遲的呼叫者,
Dapr Actor
每個Actor都被定義為Actor型別的實體,就像物件是類的實體一樣, 例如,可能有一個執行計算器功能的Actor型別,并且可能有許多該型別的Actor分布在集群的各個節點上,每個這樣的Actor都由一個Acotr ID唯一標識,

生命周期
Dapr Actors是虛擬的,這意味著他們的生命周期與他們的記憶體表現無關,因此,它們不需要顯式創建或銷毀,Dapr Actors運行時在第一次收到對該Actor ID 的請求時會自動激活該Actor,如果一個Actor在一段時間內沒有被使用,Dapr Actors運行時就會對記憶體中的物件進行垃圾回收,如果稍后需要重新激活,它還將保持對參與者存在的了解,如果稍后需要重新激活,它還將保持對 Actor 的一切原有資料,
呼叫 Actor 方法和提醒會重置空閑時間,例如提醒觸發將使Actor保持活躍,無論Actor是活躍還是不活躍,Actor提醒都會觸發,如果為不活躍的Actor觸發,它將首先激活演員,Actor 計時器不會重置空閑時間,因此計時器觸發不會使 Actor 保持活動狀態,計時器僅在Actor處于活動狀態時觸發,
Reminders 和 Timers 最大的區別就是Reminders會保持Actor的活動狀態,而Timers不會
Dapr 運行時用來查看Actor是否可以被垃圾回收的空閑超時和掃描間隔是可配置的,當 Dapr 運行時呼叫 Actor 服務以獲取支持的 Actor 型別時,可以傳遞此資訊,
由于Virtual Actor模型的存在,這種Virtual Actor生命周期抽象帶來了一些注意事項,事實上,Dapr Actors實作有時會偏離這個模型,
第一次將訊息發送到Actor ID時,Actor被自動激活(導致構建Actor物件), 經過一段時間后,Actor物件將被垃圾回收,被回收后再次使用Actor ID將導致構造一個新的Actor物件, Actor 的狀態比物件的生命周期長,因為狀態存盤在 Dapr 運行時配置的狀態管理組件中,
注:Actor被垃圾回收之前,Actor物件是會復用的,這里會導致一個問題,在.Net Actor類中,建構式在Actor存活期間只會被呼叫一次,
分發和故障轉移
為了提供可擴展性和可靠性,Actor 實體分布在整個集群中,Dapr 根據需要自動將它們從故障節點遷移到健康節點,
Actors 分布在 Actor 服務的實體中,而這些實體分布在集群中的節點之間, 對于給定的Actor型別,每個服務實體都包含一組Actor,
Dapr安置服務(Placement Service)
Dapr Actor 運行時為您管理分發方案和密鑰范圍設定,這是由Actor Placement 服務完成的,創建服務的新實體時,相應的 Dapr 運行時會注冊它可以創建的Actor型別,并且安置服務會計算給定Actor型別的所有實體的磁區,每個Actor型別的磁區資訊表被更新并存盤在環境中運行的每個Dapr實體中,并且可以隨著Actor服務的新實體的創建和銷毀而動態變化,這如下圖所示:

當客戶端呼叫具有特定ID的Actor(例如,Actor ID 123)時,客戶端的 Dapr 實體會Hash Actor型別和 ID,并使用該資訊呼叫可以為特定Actor ID的請求提供服務的相應Dapr實體,因此,始終為任何給定的Actor ID 呼叫相同的磁區(或服務實體),這如下圖所示:

這簡化了一些選擇,但也帶來了一些考慮:
- 默認情況下,Actor 隨機放置到 pod 中,從而實作均勻分布,
- 因為Actor是隨機放置的,應該可以預料到Actor操作總是需要網路通信,包括方法呼叫資料的序列化和反序列化,產生延遲和開銷,
注:Dapr Actor 放置服務僅用于 Actor 放置,因此如果您的服務不使用 Dapr Actors,則不需要, 放置服務可以在所有托管環境中運行,包括自托管和 Kubernetes,
Actor通訊
您可以通過HTTP/gRPC呼叫Actor,當然也可以用SDK,
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>
并發
Dapr Actor 運行時為訪問 Actor 方法提供了一個簡單的回合制(turn-basesd)的訪問模型,這意味著在任何時候,Actor 物件的代碼中都不能有超過一個執行緒處于活動狀態,
單個Actor實體一次不能處理多個請求,如果預期要處理并發請求,Actor 實體可能會導致吞吐量瓶頸,
單個Actor實體指每個Actor ID對應的Actor物件,單個Actor不并發就沒有問題
如果兩個 Actor 之間存在回圈請求,而同時向其中一個 Actor 發出外部請求,則 Actor 之間可能會陷入僵局,Dapr Actor運行時自動超時Actor呼叫并向呼叫者拋出例外以中斷可能的死鎖情況,

重入性(Preview)
作為對 dapr 中基礎 Actor 的增強,現在重入性為預覽功能,感興趣的小伙伴可以到看官方檔案,
回合制訪問(Turn-based access)
一個回合包括一個Actor方法的完整執行以回應來自其他Actor或客戶端的請求,或者一個計時器/提醒回呼的完整執行,即使這些方法和回呼是異步的,Dapr Actor運行時也不會將它們交叉,一個回合必須完全完成后,才允許進行新的回合,換句話說,當前正在執行的Actor方法或計時器/提醒回呼必須完全完成,才能允許對方法或回呼的新呼叫,
Dapr Actor運行時通過在回合開始時獲取每個Actor的鎖并在回合結束時釋放鎖來實作基于回合的并發性, 因此,基于回合的并發是在每個Actor的基礎上執行的,而不是跨Actor,Actor 方法和計時器/提醒回呼可以代表不同的 Actor 同時執行,
以下示例說明了上述概念,考慮實作兩個異步方法(例如 Method1 和 Method2)、計時器和提醒的Actor 型別,下圖顯示了代表屬于此Actor型別的兩個Actors(ActorId1 和 ActorId2)執行這些方法和回呼的時間線示例,

Actor狀態管理
Actor可以使用狀態管理功能可靠地保存狀態,您可以通過 HTTP/gRPC 端點與 Dapr 互動以進行狀態管理,
要使用 actor,您的狀態存盤必須支持事務,這意味著您的狀態存盤組件必須實作 TransactionalStore 介面,只有一個狀態存盤組件可以用作所有參與者的狀態存盤,
事務支持串列:https://docs.dapr.io/reference/components-reference/supported-state-stores/
注:建議學習的時候都用Redis,官方所有的示例也都是基于Redis,比較容易上手,且Dapr init默認集成
Actor計時器和提醒
Actor可以通過注冊計時器或提醒來安排自己的定期作業,
計時器和提醒的功能非常相似,主要區別在于,Dapr Actor 運行時在停用后不保留有關計時器的任何資訊,而使用 Dapr Actor 狀態提供程式保留有關提醒的資訊,
定時器和提醒的調度配置是相同的,總結如下:
DueTime 是一個可選引數,用于設定第一次呼叫回呼之前的時間或時間間隔,如果省略 DueTime,則在定時器/提醒注冊后立即呼叫回呼,
支持的格式:
- RFC3339 日期格式,例如
2020-10-02T15:00:00Z - time.Duration 格式,例如
2h30m - ISO 8601 持續時間格式,例如
PT2H30M
period 是一個可選引數,用于設定兩次連續回呼呼叫之間的時間間隔,當以 ISO 8601-1 持續時間格式指定時,您還可以配置重復次數以限制回呼呼叫的總數,如果省略 period,則回呼將僅被呼叫一次,
支持的格式:
- time.Duration 格式,例如
2h30m - ISO 8601 持續時間格式,例如
PT2H30M,R5/PT1M30S
ttl 是一個可選引數,用于設定計時器/提醒到期和洗掉的時間或時間間隔,如果省略 ttl,則不應用任何限制,
支持的格式:
- RFC3339 日期格式,例如
2020-10-02T15:00:00Z - time.Duration 格式,例如
2h30m - ISO 8601 持續時間格式,例如
PT2H30M
當您同時指定周期內的重復次數和 ttl 時,計時器/提醒將在滿足任一條件時停止,
Actor 運行時配置
-
actorIdleTimeout- 停用空閑 actor 之前的超時時間,每個 actorScanInterval 間隔都會檢查超時,默認值:60 分鐘 -
actorScanInterval- 指定掃描演員以停用空閑Actor的頻率的持續時間,閑置時間超過 actor_idle_timeout 的 Actor 將被停用,默認值:30 秒 -
drainOngoingCallTimeout- 在耗盡Rebalanced的Actor的程序中的持續時間,這指定了當前活動 Actor 方法完成的超時時間,如果當前沒有 Actor 方法呼叫,則忽略此項,默認值:60 秒 -
drainRebalancedActors- 如果為 true,Dapr 將等待 drainOngoingCallTimeout 持續時間以允許當前角色呼叫完成,然后再嘗試停用角色,默認值:truedrainRebalancedActors與上面的drainOngoingCallTimeout需搭配使用 -
reentrancy- (ActorReentrancyConfig) - 配置角色的重入行為,如果未提供,則禁用可重入,默認值:disabled, 0 -
remindersStoragePartitions- 配置Actor提醒的磁區數,如果未提供,則所有提醒都將保存為Actor狀態存盤中的單個記錄,默認值:0
// In Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// Register actor runtime with DI
services.AddActors(options =>
{
// Register actor types and configure actor settings
options.Actors.RegisterActor<MyActor>();
// Configure default settings
options.ActorIdleTimeout = TimeSpan.FromMinutes(60);
options.ActorScanInterval = TimeSpan.FromSeconds(30);
options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60);
options.DrainRebalancedActors = true;
options.RemindersStoragePartitions = 7;
// reentrancy not implemented in the .NET SDK at this time
});
// Register additional services for use with actors
services.AddSingleton<BankService>();
}
磁區提醒(Preview)
在 sidecar 重新啟動后,Actor 提醒會保留并繼續觸發,在 Dapr 運行時版本 1.3 之前,提醒被保存在 actor 狀態存盤中的單個記錄上,
此為Preview功能,感興趣可以看官方檔案
.Net呼叫Dapr的Actor
與以往不同,Actor示例會多創建一個共享類別庫用于存放Server和Client共用的部分
創建Assignment.Shared
創建類別庫專案,并添加Dapr.ActorsNuGet包參考,最后添加以下幾個類:
AccountBalance.cs
namespace Assignment.Shared;
public class AccountBalance
{
public string AccountId { get; set; } = default!;
public decimal Balance { get; set; }
}
IBankActor.cs
注:這個是Actor介面,IActor是Dapr SDK提供的
using Dapr.Actors;
namespace Assignment.Shared;
public interface IBankActor : IActor
{
Task<AccountBalance> GetAccountBalance();
Task Withdraw(WithdrawRequest withdraw);
}
OverdraftException.cs
namespace Assignment.Shared;
public class OverdraftException : Exception
{
public OverdraftException(decimal balance, decimal amount)
: base($"Your current balance is {balance:c} - that's not enough to withdraw {amount:c}.")
{
}
}
WithdrawRequest.cs
namespace Assignment.Shared;
public class WithdrawRequest
{
public decimal Amount { get; set; }
}
創建Assignment.Server
創建類別庫專案,并添加Dapr.Actors.AspNetCoreNuGet包參考和Assignment.Shared專案參考,最后修改程式埠為5000,
注:Server與Shared和Client的NuGet包不一樣,Server是集成了服務端的一些功能
修改program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<BankService>();
builder.Services.AddActors(options =>
{
options.Actors.RegisterActor<DemoActor>();
});
var app = builder.Build();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapActorsHandlers();
});
app.Run();
添加BankService.cs
using Assignment.Shared;
namespace Assignment.Server;
public class BankService
{
// Allow overdraft of up to 50 (of whatever currency).
private readonly decimal OverdraftThreshold = -50m;
public decimal Withdraw(decimal balance, decimal amount)
{
// Imagine putting some complex auditing logic here in addition to the basics.
var updated = balance - amount;
if (updated < OverdraftThreshold)
{
throw new OverdraftException(balance, amount);
}
return updated;
}
}
添加BankActor.cs
using Assignment.Shared;
using Dapr.Actors.Runtime;
using System;
namespace Assignment.Server;
public class BankActor : Actor, IBankActor, IRemindable // IRemindable is not required
{
private readonly BankService bank;
public BankActor(ActorHost host, BankService bank)
: base(host)
{
// BankService is provided by dependency injection.
// See Program.cs
this.bank = bank;
}
public async Task<AccountBalance> GetAccountBalance()
{
var starting = new AccountBalance()
{
AccountId = this.Id.GetId(),
Balance = 10m, // Start new accounts with 100, we're pretty generous.
};
var balance = await StateManager.GetOrAddStateAsync("balance", starting);
return balance;
}
public async Task Withdraw(WithdrawRequest withdraw)
{
var starting = new AccountBalance()
{
AccountId = this.Id.GetId(),
Balance = 10m, // Start new accounts with 100, we're pretty generous.
};
var balance = await StateManager.GetOrAddStateAsync("balance", starting)!;
if (balance.Balance <= 0)
{
// Simulated reminder deposit
if (Random.Shared.Next(100) > 90)
{
await RegisterReminderAsync("Deposit", null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
}
}
// Throws Overdraft exception if the account doesn't have enough money.
var updated = this.bank.Withdraw(balance.Balance, withdraw.Amount);
balance.Balance = updated;
await StateManager.SetStateAsync("balance", balance);
}
public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
if (reminderName == "Deposit")
{
var balance = await StateManager.GetStateAsync<AccountBalance>("balance")!;
if (balance.Balance <= 0)
{
balance.Balance += 60; // 50(Overdraft Threshold) + 10 = 60
Console.WriteLine("Deposit: 10");
}
else
{
Console.WriteLine("Deposit: ignore");
}
}
}
}
運行Assignment.Server
使用Dapr CLI來啟動,先使用命令列工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Server,然后執行下面命令
dapr run --app-id testactor --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run
創建Assignment.Client
創建控制臺專案,并添加Dapr.ActorsNuGet包參考和Assignment.Shared專案參考,
修改Program.cs
using Assignment.Shared;
using Dapr.Actors;
using Dapr.Actors.Client;
Console.WriteLine("Creating a Bank Actor");
var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "BankActor");
Parallel.ForEach(Enumerable.Range(1, 10), async i =>
{
while (true)
{
var balance = await bank.GetAccountBalance();
Console.WriteLine($"[Worker-{i}] Balance for account '{balance.AccountId}' is '{balance.Balance:c}'.");
Console.WriteLine($"[Worker-{i}] Withdrawing '{1m:c}'...");
try
{
await bank.Withdraw(new WithdrawRequest() { Amount = 1m });
}
catch (ActorMethodInvocationException ex)
{
Console.WriteLine("[Worker-{i}] Overdraft: " + ex.Message);
}
Task.Delay(1000).Wait();
}
});
Console.ReadKey();
運行Assignment.Client
使用Dapr CLI來啟動,先使用命令列工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Client,然后執行下面命令
dotnet run
本章原始碼
Assignment07
https://github.com/doddgu/dapr-study-room
我們正在行動,新的框架、新的生態
我們的目標是自由的、易用的、可塑性強的、功能豐富的、健壯的,
所以我們借鑒Building blocks的設計理念,正在做一個新的框架MASA Framework,它有哪些特點呢?
- 原生支持Dapr,且允許將Dapr替換成傳統通信方式
- 架構不限,單體應用、SOA、微服務都支持
- 支持.Net原生框架,降低學習負擔,除特定領域必須引入的概念,堅持不造新輪子
- 豐富的生態支持,除了框架以外還有組件庫、權限中心、配置中心、故障排查中心、報警中心等一系列產品
- 核心代碼庫的單元測驗覆寫率90%+
- 開源、免費、社區驅動
- 還有什么?我們在等你,一起來討論
經過幾個月的生產專案實踐,已完成POC,目前正在把之前的積累重構到新的開源專案中
目前原始碼已開始同步到Github(檔案站點在規劃中,會慢慢完善起來):
MASA.BuildingBlocks
MASA.Contrib
MASA.Utils
MASA.EShop
BlazorComponent
MASA.Blazor
QQ群:7424099
微信群:加技術運營微信(MasaStackTechOps),備注來意,邀請進群

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/394888.html
標籤:C#
