一、簡要說明
文章資訊:
基于的 ABP vNext 版本:1.0.0
創作日期:2019 年 10 月 24 日晚
更新日期:暫無
ABP vNext 提供了后臺作業者和后臺作業的支持,基本實作與原來的 ABP 框架類似,并且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的后臺作業集成,開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他復雜的配置,
后臺作業在系統開發的程序當中,是比較常用的功能,因為總是有一些長耗時的任務,而這些任務我們不是立即回應的,例如 Excel 檔案匯入、批量發送短信通知等,
后臺作業者 的話,ABP vNext 的實作就是在 CLR 的 Timer 之上封裝了一層,周期性地執行用戶邏輯,ABP vNext 默認提供的 后臺任務管理器,就是在后臺作業者基礎之上進行的封裝,
涉及到后臺任務、后臺作業者的模塊一共有 6 個,它們分別是:
-
Volo.Abp.Threading :提供了一些常用的執行緒組件,其中
AbpTimer就是在里面實作的, -
Volo.Abp.BackgroundWorkers :后臺作業者的定義和實作,
-
Volo.Abp.BackgroundJobs.Abstractions :后臺任務的一些共有定義,
-
Volo.Abp.BackgroundJobs :默認的后臺任務管理器實作,
-
Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 庫實作的后臺任務管理器,
-
Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 實作的后臺任務管理器,
二、原始碼分析
執行緒組件
健壯的計時器
CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer ,它是基于 CLR 執行緒池的一個周期計時器,會根據我們配置的 Period (周期) 定時執行,在 CLR 執行緒池中,所有的 Timer 只有 1 個執行緒為其服務,這個執行緒直到下一個計時器的觸發時間,當下一個 Timer 物件到期時,這個執行緒就會將 Timer 的回呼方法通過 ThreadPool.QueueUserWorkItem() 扔到執行緒池去執行,
不過這帶來了一個問題,即你的回呼方法執行時間超過了計時器的周期,那么就會造成上一個任務還沒執行完成又開始執行新的任務,
解決這個方法其實很簡單,即啟動之后,將周期設定為 Timeout.Infinite ,這樣只會執行一次,當回呼方法執行完成之后,就設定 dueTime 引數說明下次執行要等待多久,并且周期還是 Timeout.Infinite,
ABP vNext 已經為我們提供了健壯的計時器,該型別的定義是 AbpTimer ,在內部用到了 volatile 關鍵字和 Monitor 實作 條件變數模式 解決多執行緒環境下的問題,
public class AbpTimer : ITransientDependency
{
// 回呼事件,
public event EventHandler Elapsed;
// 執行周期,
public int Period { get; set; }
// 定時器啟動之后就開始運行,默認為 Fasle,
public bool RunOnStart { get; set; }
// 日志記錄器,
public ILogger<AbpTimer> Logger { get; set; }
private readonly Timer _taskTimer;
// 定時器是否在執行任務,默認為 false,
private volatile bool _performingTasks;
// 定時器的運行狀態,默認為 false,
private volatile bool _isRunning;
public AbpTimer()
{
Logger = NullLogger<AbpTimer>.Instance;
// 回呼函式是 TimerCallBack,執行周期為永不執行,
_taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
}
public void Start(CancellationToken cancellationToken = default)
{
// 如果傳遞的周期小于等于 0 ,則拋出例外,
if (Period <= 0)
{
throw new AbpException("Period should be set before starting the timer!");
}
// 使用互斥鎖,保證執行緒安全,
lock (_taskTimer)
{
// 如果啟動之后就需要馬上執行,則設定為 0,馬上執行任務,否則會等待 Period 毫秒之后再執行(1 個周期),
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
// 定時器成功運行了,
_isRunning = true;
}
// 釋放 _taskTimer 的互斥鎖,
}
public void Stop(CancellationToken cancellationToken = default)
{
// 使用互斥鎖,
lock (_taskTimer)
{
// 將內部定時器設定為永不執行的狀態,
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
// 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成,
while (_performingTasks)
{
// 臨時釋放鎖,阻塞當前執行緒,但是其他執行緒可以獲取 _timer 的互斥鎖,
Monitor.Wait(_taskTimer);
}
// 需要表示停止狀態,所以標記狀態為 false,
_isRunning = false;
}
}
private void TimerCallBack(object state)
{
lock (_taskTimer)
{
// 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情,
if (!_isRunning || _performingTasks)
{
return;
}
// 臨時停止內部定時器,
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
// 表明馬上需要執行任務了,
_performingTasks = true;
}
try
{
// 呼叫系結的事件,
Elapsed.InvokeSafely(this, new EventArgs());
}
catch
{
// 注意,這里將會吞噬例外,
}
finally
{
lock (_taskTimer)
{
// 任務執行完成,更改狀態,
_performingTasks = false;
// 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期,
if (_isRunning)
{
_taskTimer.Change(Period, Timeout.Infinite);
}
// 解除因為釋放鎖而阻塞的執行緒,
// 如果已經呼叫了 Stop,則會喚醒那個因為 Wait 阻塞的執行緒,就會使 _isRunning 置為 false,
Monitor.Pulse(_taskTimer);
}
}
}
}
這里對 _performingTasks 和 _isRunning 欄位設定為 volatile 防止指令重排和暫存器快取,這是因為在 Stop 方法內部使用到的 _performingTasks 可能會被優化,所以將該欄位設定為了易失的,
IRunnable 介面
ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable 介面,雖然描述說的是對執行緒的行為進行抽象,但千萬千萬不要手動呼叫 Thread.Abort() ,關于 Thread.Abort() 的壞處,這里不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章,
public interface IRunnable
{
// 啟動這個服務,
Task StartAsync(CancellationToken cancellationToken = default);
/// <summary>
/// 停止這個服務,
/// </summary>
Task StopAsync(CancellationToken cancellationToken = default);
}
后臺作業者
模塊的構造
后臺作業者的模塊行為比較簡單,它定義了在應用程式初始化和銷毀時的行為,在初始化時,后臺作業者管理器 獲得所有 后臺作業者,并開始啟動它們,在銷毀時,后臺作業者管理器獲得所有后臺作業者,并開始停止他們,這樣才能夠做到優雅退出,
[DependsOn(
typeof(AbpThreadingModule)
)]
public class AbpBackgroundWorkersModule : AbpModule
{
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
// 如果啟用了后臺作業者,那么獲得后臺作業者管理器的實體,并呼叫 StartAsync 啟動所有后臺作業者,
if (options.IsEnabled)
{
AsyncHelper.RunSync(
() => context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync()
);
}
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
// 如果啟用了后臺作業者,那么獲得后臺作業者管理器的實體,并呼叫 StopAsync 停止所有后臺作業者,
if (options.IsEnabled)
{
AsyncHelper.RunSync(
() => context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StopAsync()
);
}
}
}
后臺作業者的定義
首先看看 IBackgroundWorker 介面的定義,是空的,不過繼承了 ISingletonDependency 介面,說明我們的每個后臺作業者都是 單例 的,
/// <summary>
/// 在后臺運行,執行某些任務的作業程式(執行緒)的介面定義,
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{
}
ABP vNext 為我們定義了一個抽象的后臺作業者型別 BackgroundWorkerBase,這個基類的設計目的是提供一些常用組件(和 ApplicationService 一樣),
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
//TODO: Add UOW, Localization and other useful properties..?
//TODO: 是否應該提供作業單元、本地化以及其他常用的屬性?
public ILogger<BackgroundWorkerBase> Logger { protected get; set; }
protected BackgroundWorkerBase()
{
Logger = NullLogger<BackgroundWorkerBase>.Instance;
}
public virtual Task StartAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Started background worker: " + ToString());
return Task.CompletedTask;
}
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Stopped background worker: " + ToString());
return Task.CompletedTask;
}
public override string ToString()
{
return GetType().FullName;
}
}
ABP vNext 內部只有一個默認的后臺作業者實作 PeriodicBackgroundWorkerBase,從名字上來看,意思是就是周期執行的后臺作業者,內部就是用的 AbpTimer 來實作,ABP vNext 將其包裝起來是為了實作統一的模式(后臺作業者),
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected readonly AbpTimer Timer;
// 也就意味著子類必須在其建構式,指定 timer 的執行周期,
protected PeriodicBackgroundWorkerBase(AbpTimer timer)
{
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
// 啟動后臺作業者,
public override async Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
// 停止后臺作業者,
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);
}
// Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲例外,
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
DoWork();
}
catch (Exception ex)
{
Logger.LogException(ex);
}
}
// 你要周期執行的任務,
protected abstract void DoWork();
}
我們如果要實作自己的后臺作業者,只需要繼承該類,實作 DoWork() 方法即可,
public class TestWorker : PeriodicBackgroundWorkerBase
{
public TestWorker(AbpTimer timer) : base(timer)
{
// 每五分鐘執行一次,
timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
}
protected override void DoWork()
{
Console.WriteLine("后臺作業者被執行了,");
}
}
然后在我們自己模塊的 OnPreApplicationInitialization() 方法內決議出后臺作業管理器(IBackgroundWorkerManager),呼叫它的 Add() 方法,將我們定義的 TestWorker 添加到管理器當中即可,
后臺作業者管理器
所有的后臺作業者都是通過 IBackgroundWorkerManager 進行管理的,它提供了 StartAsync()、StopAsync()、Add() 方法,前面兩個方法就是 IRunnable 介面定義的,后臺作業者管理器直接集成了該介面,后面的 Add() 方法就是用來動態添加我們的后臺作業者,
后臺作業者管理器的默認實作是 BackgroundWorkerManager 型別,它內部做的事情很簡單,就是維護一個后臺作業者集合,每當呼叫 StartAsync() 或 StopAsync() 方法的時候,都從這個集合遍歷后臺作業者,執行他們的啟動和停止方法,
這里值得注意的一點是,當我們呼叫 Add() 方法添加了一個后臺作業者之后,后臺作業者管理器就會啟動這個后臺作業者,
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
protected bool IsRunning { get; private set; }
private bool _isDisposed;
private readonly List<IBackgroundWorker> _backgroundWorkers;
public BackgroundWorkerManager()
{
_backgroundWorkers = new List<IBackgroundWorker>();
}
public virtual void Add(IBackgroundWorker worker)
{
_backgroundWorkers.Add(worker);
// 如果當前后臺作業者管理器還處于運行狀態,則呼叫作業者的 StartAsync() 方法啟動,
if (IsRunning)
{
AsyncHelper.RunSync(
() => worker.StartAsync()
);
}
}
public virtual void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
//TODO: ???
}
// 啟動,則遍歷集合啟動,
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
{
IsRunning = true;
foreach (var worker in _backgroundWorkers)
{
await worker.StartAsync(cancellationToken);
}
}
// 停止, 則遍歷集合停止,
public virtual async Task StopAsync(CancellationToken cancellationToken = default)
{
IsRunning = false;
foreach (var worker in _backgroundWorkers)
{
await worker.StopAsync(cancellationToken);
}
}
}
上述代碼其實存在一個問題,即后臺作業者被釋放以后,是否還能執行 Add() 操作,參考我 之前的文章 ,其實當物件被釋放之后,就應該拋出 ObjectDisposedException 例外,
后臺作業
比起后臺作業者,我們執行一次性任務的時候,一般會使用后臺作業進行處理,比起只能設定固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的后臺作業管理器,能夠讓我們使用 Cron 運算式,更加靈活地設定任務的執行周期,
模塊的構造
關于后臺作業的模塊,我們需要說道兩處,第一處是位于 Volo.Abp.BackgroundJobs.Abstractions 專案的 AbpBackgroundJobsAbstractionsModule ,第二出則是位于 Volo.Abp.BackgroundJobs 專案的 AbpBackgroundJobsModule ,
AbpBackgroundJobsAbstractionsModule 的主要行為是將符合條件的后臺作業,添加到 AbpBackgroundJobOptions 配置當中,以便后續進行使用,
public override void PreConfigureServices(ServiceConfigurationContext context)
{
RegisterJobs(context.Services);
}
private static void RegisterJobs(IServiceCollection services)
{
var jobTypes = new List<Type>();
// 如果注冊的型別符合 IBackgroundJob<> 泛型,則添加到集合當中,
services.OnRegistred(context =>
{
if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
{
jobTypes.Add(context.ImplementationType);
}
});
services.Configure<AbpBackgroundJobOptions>(options =>
{
// 將資料賦值給配置類,
foreach (var jobType in jobTypes)
{
options.AddJob(jobType);
}
});
}
Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 默認后臺作業管理器,這個后臺作業管理器 本質上是一個后臺作業者,
這個后臺作業者會周期性(取決于 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默認為 5 秒種)地從 IBackgroundJobStore 撈出一堆后臺任務,并且在后臺執行,至于每次執行多少個后臺任務,這也取決于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默認值是 1000 個,
注意:
這里的 Options 類是
AbpBackgroundJobWorkerOptions,別和AbpBackgroundWorkerOptions混淆了,
所以在 AbpBackgroundJobsModule 模塊里面,只做了一件事情,就是將負責后臺作業的后臺作業者,添加到后臺作業者管理器種,并開始周期性地執行,
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
// 獲得后臺作業者管理器,并將負責后臺作業的作業者添加進去,
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
);
}
}
后臺作業的定義
在上一節里面看到,只要是實作 IBackgroundJob<TArgs> 型別的都視為一個后臺作業,這個后臺作業介面,只定義了一個行為,那就是執行(Execute(TArgs)),這里的 TArgs 泛型作為執行后臺作業時,需要傳遞的引數型別,
// 因為是傳入的引數,所以泛型引數是逆變的,
public interface IBackgroundJob<in TArgs>
{
void Execute(TArgs args);
}
檢查原始碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob,它的實作大概如下,
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
// ...
public override void Execute(BackgroundEmailSendingJobArgs args)
{
AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
}
}
后臺作業管理器
后臺作業都是通過一個后臺作業管理器(IBackgroundJobManager)進行管理的,這個介面定義了一個入隊方法(EnqueueAsync()),注意,我們的后臺作業在入隊后,不是馬上執行的,
說一下這個入隊處理邏輯:
- 首先我們會通過引數的型別,獲取到任務的名稱,(假設任務上面沒有標注
BackgroundJobNameAttribute特性,那么任務的名稱就是引數型別的FullName,) - 構造一個
BackgroundJobInfo物件, - 通過
IBackgroundJobStore持久化任務資訊,
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
// 獲取任務名稱,
var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
var jobId = await EnqueueAsync(jobName, args, priority, delay);
return jobId.ToString();
}
protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
var jobInfo = new BackgroundJobInfo
{
Id = GuidGenerator.Create(),
JobName = jobName,
// 通過序列化器,序列化引數值,方便存盤,這里內部其實使用的是 JSON.NET 進行序列化,
JobArgs = Serializer.Serialize(args),
Priority = priority,
CreationTime = Clock.Now,
NextTryTime = Clock.Now
};
// 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲,
if (delay.HasValue)
{
jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
}
// 持久化任務資訊,方便后面執行后臺作業的作業者能夠取到,
await Store.InsertAsync(jobInfo);
return jobInfo.Id;
}
BackgroundJobNameAttribute 相關的方法:
public static string GetName<TJobArgs>()
{
return GetName(typeof(TJobArgs));
}
public static string GetName([NotNull] Type jobArgsType)
{
Check.NotNull(jobArgsType, nameof(jobArgsType));
// 判斷引數型別上面是否標注了特性,并且特性實作了 IBackgroundJobNameProvider 介面,
return jobArgsType
.GetCustomAttributes(true)
.OfType<IBackgroundJobNameProvider>()
.FirstOrDefault()
?.Name
// 拿不到名字,則使用型別的 FullName,
?? jobArgsType.FullName;
}
后臺作業的存盤
后臺作業的存盤默認是放在記憶體的,這點可以從 InMemoryBackgroundJobStore 型別實作看出來,在它的內部使用了一個并行字典,通過作業的 Guid 與作業進行關聯系結,
除了記憶體實作,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore 實作,基本套路與 SettingStore 一樣,都是存盤到資料庫里面,
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
protected IBackgroundJobRepository BackgroundJobRepository { get; }
// ...
public BackgroundJobInfo Find(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
BackgroundJobRepository.Find(jobId)
);
}
// ...
public void Insert(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Insert(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}
// ...
}
后臺作業的執行
默認的后臺作業管理器是通過一個后臺作業者來執行后臺任務的,這個實作叫做 BackgroundJobWorker,這個后臺作業者的生命周期也是單例的,后臺作業的具體執行邏輯里面,涉及到了以下幾個型別的互動,
| 型別 | 作用 |
|---|---|
AbpBackgroundJobOptions |
提供每個后臺任務的配置資訊,包括任務的型別、引數型別、任務名稱資料, |
AbpBackgroundJobWorkerOptions |
提供后臺作業作業者的配置資訊,例如每個周期 最大執行的作業數量、后臺 作業者的 執行周期、作業執行 超時時間 等, |
BackgroundJobConfiguration |
后臺任務的配置資訊,作用是將持久化存盤的作業資訊與運行時型別進行系結 和實體化,以便 ABP vNext 來執行具體的任務, |
IBackgroundJobExecuter |
后臺作業的執行器,當我們從持久化存盤獲取到后臺作業資訊時,將會通過 這個執行器來執行具體的后臺作業, |
IBackgroundJobSerializer |
后臺作業序列化器,用于后臺作業持久化時進行序列化的工具,默認采用的 是 JSON.NET 進行實作, |
JobExecutionContext |
執行器在執行后臺作業時,是通過這個背景關系引數進行執行的,在這個上下 文內部,包含了后臺作業的具體型別、后臺作業的引數值, |
IBackgroundJobStore |
前面已經講過了,這個是用于后臺作業的持久化存盤,默認實作是存盤在記憶體, |
BackgroundJobPriority |
后臺作業的執行優先級定義,ABP vNext 在執行后臺任務時,會根據任務的優 先級進行排序,以便在后面執行的時候優先級高的任務先執行, |
我們來按照邏輯順序走一遍它的實作,首先后臺作業的執行作業者會從持久化存盤內,獲取 MaxJobFetchCount 個任務用于執行,從持久化存盤獲取后臺作業資訊(BackgroundJobInfo),是由 IBackgroundJobStore 提供的,
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);
// 不存在任何后臺作業,則直接結束本次呼叫,
if (!waitingJobs.Any())
{
return;
}
InMemoryBackgroundJobStore 的相關實作:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return _jobs.Values
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToList();
}
上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然后根據任務的優先級從高到低進行排序,重試次數少的優先執行,預計執行時間越早的越先執行,最后從這些資料中,篩選出 maxResultCount 結果并回傳,
說到這里,我們來看一下這個 NextTryTime 是如何被計算出來的?回想起最開始的后臺作業管理器,我們在添加一個后臺任務的時候,就會設定這個后臺任務的 預計執行時間,第一個任務被添加到執行佇列中時,它的值一般是 Clock.Now ,也就是它被添加到佇列的時間,
不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗之后,會將 NextTryTime 的值指數級進行增加,這塊代碼可以在 CalculateNextTryTime 里面看到,也就是說某個任務的執行 失敗次數越高,那么它下一次的預期執行時間就會越遠,
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
// 一般來說,這個 DefaultWaitFactor 因子的值是 2.0 ,
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤,
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
{
return null;
}
return nextTryDate;
}
當預期的執行時間都超過 DefaultTimeout 的超時時間時(默認為 2 天),說明這個任務確實沒救了,就不要再執行了,
我們之前說到,從 IBackgroundJobStore 拿到了需要執行的后臺任務資訊集合,接下來我們就要開始執行后臺任務了,
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
try
{
// 根據任務名稱獲取任務的配置引數,
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
// 根據配置里面存盤的任務型別,將引數值進行反序列化,
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
// 構造一個新的執行背景關系,讓執行器執行任務,
var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);
try
{
jobExecuter.Execute(context);
// 如果任務執行成功則洗掉該任務,
store.Delete(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
// 發生任務執行失敗例外時,根據指定的公式計算下一次的執行時間,
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
// 超過超時時間的時候,公式計算函式回傳 null,該任務置為廢棄任務,
jobInfo.IsAbandoned = true;
}
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
// 執行程序中,產生了未知例外,設定為廢棄任務,并列印日志,
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(store, jobInfo);
}
}
執行后臺任務的時候基本分為 5 步,它們分別是:
- 獲得任務關聯的配置引數,默認不用提供,因為在之前模塊初始化的時候就已經配置了(你也可以顯式指定),
- 通過之前存盤的配置引數,將引數值反序列化出來,構造具體實體,
- 構造一個執行背景關系,
- 后臺任務執行器執行具體的后臺任務,
- 成功則洗掉任務,失敗則更新任務下次的執行狀態,
至于執行器里面的真正執行操作,你都拿到了引數值和任務型別了,就可以通過型別用 IoC 獲取后臺任務物件的實體,然后通過反射匹配方法簽名,在實體上呼叫這個方法傳入引數即可,
public virtual void Execute(JobExecutionContext context)
{
// 構造具體的后臺作業實體物件,
var job = context.ServiceProvider.GetService(context.JobType);
if (job == null)
{
throw new AbpException("The job type is not registered to DI: " + context.JobType);
}
// 獲得需要執行的方法簽名,
var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
if (jobExecuteMethod == null)
{
throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
}
try
{
// 直接通過 MethodInfo 的 Invoke 方法呼叫,傳入具體的實體物件和引數值即可,
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
catch (Exception ex)
{
Logger.LogException(ex);
// 如果是執行方法內的例外,則包裝進行處理,然后拋出,
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
{
JobType = context.JobType.AssemblyQualifiedName,
JobArgs = context.JobArgs
};
}
}
集成 Hangfire
ABP vNext 對于 Hangfire 的集成代碼分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置里面,呼叫 Hangfire 庫的相關方法,注入組件到 IoC 容器當中,后者則是對后臺作業進行了適配處理,替換了默認的 IBackgroundJobManager 實作,
在 AbpHangfireModule 模塊內部,通過工廠創建出來一個 BackgroudJobServer 實體,并將它的生命周期與應用程式的生命周期進行系結,以便進行銷毀處理,
public class AbpHangfireModule : AbpModule
{
private BackgroundJobServer _backgroundJobServer;
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddHangfire(configuration =>
{
context.Services.ExecutePreConfiguredActions(configuration);
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
_backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
//TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
_backgroundJobServer.SendStop();
_backgroundJobServer.Dispose();
}
}
我們直奔主題,看一下基于 Hangfire 的后臺作業管理器是怎么實作的,
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
// 如果沒有延遲引數,則直接通過 Enqueue() 方法扔進執行對了,
if (!delay.HasValue)
{
return Task.FromResult(
BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args)
)
);
}
else
{
return Task.FromResult(
BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args),
delay.Value
)
);
}
}
上述代碼中使用 HangfireJobExecutionAdapter 進行了一個適配操作,因為 Hangfire 要將一個后臺任務扔進佇列執行,不是用 TArgs 就能解決的,
轉到這個配接器定義,提供了一個 Execute(TArgs) 方法,當被添加到 Hangfire 佇列執行的時候,實際 Hangfire 會呼叫配接器的 Excetue(TArgs) 方法,然后內部還是使用的 IBackgroundJobExecuter 來執行具體定義的任務,
public class HangfireJobExecutionAdapter<TArgs>
{
protected AbpBackgroundJobOptions Options { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected IBackgroundJobExecuter JobExecuter { get; }
public HangfireJobExecutionAdapter(
IOptions<AbpBackgroundJobOptions> options,
IBackgroundJobExecuter jobExecuter,
IServiceScopeFactory serviceScopeFactory)
{
JobExecuter = jobExecuter;
ServiceScopeFactory = serviceScopeFactory;
Options = options.Value;
}
public void Execute(TArgs args)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
JobExecuter.Execute(context);
}
}
}
集成 RabbitMQ
基于 RabbitMQ 的后臺作業實作,我想放在分布式事件總線里面,對其一起進行講解,
三、總結
ABP vNext 為我們提供了多種后臺作業管理器的實作,你可以根據自己的需求選用不同的后臺作業管理器,又或者是自己動手造輪子,
需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄,
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/101981.html
標籤:.NET Core
