主頁 > .NET開發 > [Abp vNext 原始碼分析] - 12. 后臺作業與后臺作業者

[Abp vNext 原始碼分析] - 12. 后臺作業與后臺作業者

2020-09-22 05:01:06 .NET開發

一、簡要說明

文章資訊:

基于的 ABP vNext 版本:1.0.0

創作日期:2019 年 10 月 24 日晚

更新日期:暫無

ABP vNext 提供了后臺作業者和后臺作業的支持,基本實作與原來的 ABP 框架類似,并且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的后臺作業集成,開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他復雜的配置,

后臺作業在系統開發的程序當中,是比較常用的功能,因為總是有一些長耗時的任務,而這些任務我們不是立即回應的,例如 Excel 檔案匯入、批量發送短信通知等,

后臺作業者 的話,ABP vNext 的實作就是在 CLR 的 Timer 之上封裝了一層,周期性地執行用戶邏輯,ABP vNext 默認提供的 后臺任務管理器,就是在后臺作業者基礎之上進行的封裝,

涉及到后臺任務、后臺作業者的模塊一共有 6 個,它們分別是:

  • Volo.Abp.Threading :提供了一些常用的執行緒組件,其中 AbpTimer 就是在里面實作的,

  • Volo.Abp.BackgroundWorkers :后臺作業者的定義和實作,

  • Volo.Abp.BackgroundJobs.Abstractions :后臺任務的一些共有定義,

  • Volo.Abp.BackgroundJobs :默認的后臺任務管理器實作,

  • Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 庫實作的后臺任務管理器,

  • Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 實作的后臺任務管理器,

二、原始碼分析

執行緒組件

健壯的計時器

CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer ,它是基于 CLR 執行緒池的一個周期計時器,會根據我們配置的 Period (周期) 定時執行,在 CLR 執行緒池中,所有的 Timer 只有 1 個執行緒為其服務,這個執行緒直到下一個計時器的觸發時間,當下一個 Timer 物件到期時,這個執行緒就會將 Timer 的回呼方法通過 ThreadPool.QueueUserWorkItem() 扔到執行緒池去執行,

不過這帶來了一個問題,即你的回呼方法執行時間超過了計時器的周期,那么就會造成上一個任務還沒執行完成又開始執行新的任務,

解決這個方法其實很簡單,即啟動之后,將周期設定為 Timeout.Infinite ,這樣只會執行一次,當回呼方法執行完成之后,就設定 dueTime 引數說明下次執行要等待多久,并且周期還是 Timeout.Infinite

ABP vNext 已經為我們提供了健壯的計時器,該型別的定義是 AbpTimer ,在內部用到了 volatile 關鍵字和 Monitor 實作 條件變數模式 解決多執行緒環境下的問題,

public class AbpTimer : ITransientDependency
{
    // 回呼事件,
    public event EventHandler Elapsed;

    // 執行周期,
    public int Period { get; set; }

    // 定時器啟動之后就開始運行,默認為 Fasle,
    public bool RunOnStart { get; set; }

    // 日志記錄器,
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定時器是否在執行任務,默認為 false,
    private volatile bool _performingTasks;
    // 定時器的運行狀態,默認為 false,
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回呼函式是 TimerCallBack,執行周期為永不執行,
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 如果傳遞的周期小于等于 0 ,則拋出例外,
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥鎖,保證執行緒安全,
        lock (_taskTimer)
        {
            // 如果啟動之后就需要馬上執行,則設定為 0,馬上執行任務,否則會等待 Period 毫秒之后再執行(1 個周期),
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定時器成功運行了,
            _isRunning = true;
        }
        // 釋放 _taskTimer 的互斥鎖,
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥鎖,
        lock (_taskTimer)
        {
            // 將內部定時器設定為永不執行的狀態,
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成,
            while (_performingTasks)
            {
                // 臨時釋放鎖,阻塞當前執行緒,但是其他執行緒可以獲取 _timer 的互斥鎖,
                Monitor.Wait(_taskTimer);
            }

            // 需要表示停止狀態,所以標記狀態為 false,
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情,
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 臨時停止內部定時器,
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 表明馬上需要執行任務了,
            _performingTasks = true;
        }

        try
        {
            // 呼叫系結的事件,
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 注意,這里將會吞噬例外,
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任務執行完成,更改狀態,
                _performingTasks = false;

                // 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期,
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除因為釋放鎖而阻塞的執行緒,
                // 如果已經呼叫了 Stop,則會喚醒那個因為 Wait 阻塞的執行緒,就會使 _isRunning 置為 false,
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}

這里對 _performingTasks_isRunning 欄位設定為 volatile 防止指令重排和暫存器快取,這是因為在 Stop 方法內部使用到的 _performingTasks 可能會被優化,所以將該欄位設定為了易失的,

IRunnable 介面

ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable 介面,雖然描述說的是對執行緒的行為進行抽象,但千萬千萬不要手動呼叫 Thread.Abort() ,關于 Thread.Abort() 的壞處,這里不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章,

public interface IRunnable
{
    // 啟動這個服務,
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 停止這個服務,
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}

后臺作業者

模塊的構造

后臺作業者的模塊行為比較簡單,它定義了在應用程式初始化和銷毀時的行為,在初始化時,后臺作業者管理器 獲得所有 后臺作業者,并開始啟動它們,在銷毀時,后臺作業者管理器獲得所有后臺作業者,并開始停止他們,這樣才能夠做到優雅退出,

[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了后臺作業者,那么獲得后臺作業者管理器的實體,并呼叫 StartAsync 啟動所有后臺作業者,
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了后臺作業者,那么獲得后臺作業者管理器的實體,并呼叫 StopAsync 停止所有后臺作業者,
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}

后臺作業者的定義

首先看看 IBackgroundWorker 介面的定義,是空的,不過繼承了 ISingletonDependency 介面,說明我們的每個后臺作業者都是 單例 的,

/// <summary>
/// 在后臺運行,執行某些任務的作業程式(執行緒)的介面定義,
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}

ABP vNext 為我們定義了一個抽象的后臺作業者型別 BackgroundWorkerBase,這個基類的設計目的是提供一些常用組件(和 ApplicationService 一樣),

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否應該提供作業單元、本地化以及其他常用的屬性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}

ABP vNext 內部只有一個默認的后臺作業者實作 PeriodicBackgroundWorkerBase,從名字上來看,意思是就是周期執行的后臺作業者,內部就是用的 AbpTimer 來實作,ABP vNext 將其包裝起來是為了實作統一的模式(后臺作業者),

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味著子類必須在其建構式,指定 timer 的執行周期,
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 啟動后臺作業者,
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 停止后臺作業者,
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲例外,
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要周期執行的任務,
    protected abstract void DoWork();
}

我們如果要實作自己的后臺作業者,只需要繼承該類,實作 DoWork() 方法即可,

public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分鐘執行一次,
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("后臺作業者被執行了,");
    }
}

然后在我們自己模塊的 OnPreApplicationInitialization() 方法內決議出后臺作業管理器(IBackgroundWorkerManager),呼叫它的 Add() 方法,將我們定義的 TestWorker 添加到管理器當中即可,

后臺作業者管理器

所有的后臺作業者都是通過 IBackgroundWorkerManager 進行管理的,它提供了 StartAsync()StopAsync()Add() 方法,前面兩個方法就是 IRunnable 介面定義的,后臺作業者管理器直接集成了該介面,后面的 Add() 方法就是用來動態添加我們的后臺作業者,

后臺作業者管理器的默認實作是 BackgroundWorkerManager 型別,它內部做的事情很簡單,就是維護一個后臺作業者集合,每當呼叫 StartAsync()StopAsync() 方法的時候,都從這個集合遍歷后臺作業者,執行他們的啟動和停止方法,

這里值得注意的一點是,當我們呼叫 Add() 方法添加了一個后臺作業者之后,后臺作業者管理器就會啟動這個后臺作業者,

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 如果當前后臺作業者管理器還處于運行狀態,則呼叫作業者的 StartAsync() 方法啟動,
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 啟動,則遍歷集合啟動,
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 停止, 則遍歷集合停止,
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

上述代碼其實存在一個問題,即后臺作業者被釋放以后,是否還能執行 Add() 操作,參考我 之前的文章 ,其實當物件被釋放之后,就應該拋出 ObjectDisposedException 例外,

后臺作業

比起后臺作業者,我們執行一次性任務的時候,一般會使用后臺作業進行處理,比起只能設定固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的后臺作業管理器,能夠讓我們使用 Cron 運算式,更加靈活地設定任務的執行周期,

模塊的構造

關于后臺作業的模塊,我們需要說道兩處,第一處是位于 Volo.Abp.BackgroundJobs.Abstractions 專案的 AbpBackgroundJobsAbstractionsModule ,第二出則是位于 Volo.Abp.BackgroundJobs 專案的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行為是將符合條件的后臺作業,添加到 AbpBackgroundJobOptions 配置當中,以便后續進行使用,

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 如果注冊的型別符合 IBackgroundJob<> 泛型,則添加到集合當中,
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 將資料賦值給配置類,
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}

Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 默認后臺作業管理器,這個后臺作業管理器 本質上是一個后臺作業者

這個后臺作業者會周期性(取決于 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默認為 5 秒種)地從 IBackgroundJobStore 撈出一堆后臺任務,并且在后臺執行,至于每次執行多少個后臺任務,這也取決于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默認值是 1000 個,

注意:

這里的 Options 類是 AbpBackgroundJobWorkerOptions,別和 AbpBackgroundWorkerOptions 混淆了,

所以在 AbpBackgroundJobsModule 模塊里面,只做了一件事情,就是將負責后臺作業的后臺作業者,添加到后臺作業者管理器種,并開始周期性地執行,

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 獲得后臺作業者管理器,并將負責后臺作業的作業者添加進去,
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}

后臺作業的定義

在上一節里面看到,只要是實作 IBackgroundJob<TArgs> 型別的都視為一個后臺作業,這個后臺作業介面,只定義了一個行為,那就是執行(Execute(TArgs)),這里的 TArgs 泛型作為執行后臺作業時,需要傳遞的引數型別,

// 因為是傳入的引數,所以泛型引數是逆變的,
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}

檢查原始碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob,它的實作大概如下,

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}

后臺作業管理器

后臺作業都是通過一個后臺作業管理器(IBackgroundJobManager)進行管理的,這個介面定義了一個入隊方法(EnqueueAsync()),注意,我們的后臺作業在入隊后,不是馬上執行的,

說一下這個入隊處理邏輯:

  1. 首先我們會通過引數的型別,獲取到任務的名稱,(假設任務上面沒有標注 BackgroundJobNameAttribute 特性,那么任務的名稱就是引數型別的 FullName ,)
  2. 構造一個 BackgroundJobInfo 物件,
  3. 通過 IBackgroundJobStore 持久化任務資訊,
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 獲取任務名稱,
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 通過序列化器,序列化引數值,方便存盤,這里內部其實使用的是 JSON.NET 進行序列化,
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲,
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任務資訊,方便后面執行后臺作業的作業者能夠取到,
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}

BackgroundJobNameAttribute 相關的方法:

public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判斷引數型別上面是否標注了特性,并且特性實作了 IBackgroundJobNameProvider 介面,
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,則使用型別的 FullName,
            ?? jobArgsType.FullName;
}

后臺作業的存盤

后臺作業的存盤默認是放在記憶體的,這點可以從 InMemoryBackgroundJobStore 型別實作看出來,在它的內部使用了一個并行字典,通過作業的 Guid 與作業進行關聯系結,

除了記憶體實作,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore 實作,基本套路與 SettingStore 一樣,都是存盤到資料庫里面,

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}

后臺作業的執行

默認的后臺作業管理器是通過一個后臺作業者來執行后臺任務的,這個實作叫做 BackgroundJobWorker,這個后臺作業者的生命周期也是單例的,后臺作業的具體執行邏輯里面,涉及到了以下幾個型別的互動,

型別 作用
AbpBackgroundJobOptions 提供每個后臺任務的配置資訊,包括任務的型別、引數型別、任務名稱資料,
AbpBackgroundJobWorkerOptions 提供后臺作業作業者的配置資訊,例如每個周期 最大執行的作業數量、后臺
作業者的 執行周期、作業執行 超時時間 等,
BackgroundJobConfiguration 后臺任務的配置資訊,作用是將持久化存盤的作業資訊與運行時型別進行系結
和實體化,以便 ABP vNext 來執行具體的任務,
IBackgroundJobExecuter 后臺作業的執行器,當我們從持久化存盤獲取到后臺作業資訊時,將會通過
這個執行器來執行具體的后臺作業,
IBackgroundJobSerializer 后臺作業序列化器,用于后臺作業持久化時進行序列化的工具,默認采用的
是 JSON.NET 進行實作,
JobExecutionContext 執行器在執行后臺作業時,是通過這個背景關系引數進行執行的,在這個上下
文內部,包含了后臺作業的具體型別、后臺作業的引數值,
IBackgroundJobStore 前面已經講過了,這個是用于后臺作業的持久化存盤,默認實作是存盤在記憶體,
BackgroundJobPriority 后臺作業的執行優先級定義,ABP vNext 在執行后臺任務時,會根據任務的優
先級進行排序,以便在后面執行的時候優先級高的任務先執行,

我們來按照邏輯順序走一遍它的實作,首先后臺作業的執行作業者會從持久化存盤內,獲取 MaxJobFetchCount 個任務用于執行,從持久化存盤獲取后臺作業資訊(BackgroundJobInfo),是由 IBackgroundJobStore 提供的,

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何后臺作業,則直接結束本次呼叫,
if (!waitingJobs.Any())
{
    return;
}

InMemoryBackgroundJobStore 的相關實作:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然后根據任務的優先級從高到低進行排序,重試次數少的優先執行,預計執行時間越早的越先執行,最后從這些資料中,篩選出 maxResultCount 結果并回傳,

說到這里,我們來看一下這個 NextTryTime 是如何被計算出來的?回想起最開始的后臺作業管理器,我們在添加一個后臺任務的時候,就會設定這個后臺任務的 預計執行時間,第一個任務被添加到執行佇列中時,它的值一般是 Clock.Now ,也就是它被添加到佇列的時間,

不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗之后,會將 NextTryTime 的值指數級進行增加,這塊代碼可以在 CalculateNextTryTime 里面看到,也就是說某個任務的執行 失敗次數越高,那么它下一次的預期執行時間就會越遠,

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 一般來說,這個 DefaultWaitFactor 因子的值是 2.0 ,
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤,
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

當預期的執行時間都超過 DefaultTimeout 的超時時間時(默認為 2 天),說明這個任務確實沒救了,就不要再執行了,

我們之前說到,從 IBackgroundJobStore 拿到了需要執行的后臺任務資訊集合,接下來我們就要開始執行后臺任務了,

foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根據任務名稱獲取任務的配置引數,
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根據配置里面存盤的任務型別,將引數值進行反序列化,
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 構造一個新的執行背景關系,讓執行器執行任務,
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 如果任務執行成功則洗掉該任務,            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 發生任務執行失敗例外時,根據指定的公式計算下一次的執行時間,
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超過超時時間的時候,公式計算函式回傳 null,該任務置為廢棄任務,
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 執行程序中,產生了未知例外,設定為廢棄任務,并列印日志,
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

執行后臺任務的時候基本分為 5 步,它們分別是:

  1. 獲得任務關聯的配置引數,默認不用提供,因為在之前模塊初始化的時候就已經配置了(你也可以顯式指定),
  2. 通過之前存盤的配置引數,將引數值反序列化出來,構造具體實體,
  3. 構造一個執行背景關系,
  4. 后臺任務執行器執行具體的后臺任務,
  5. 成功則洗掉任務,失敗則更新任務下次的執行狀態,

至于執行器里面的真正執行操作,你都拿到了引數值和任務型別了,就可以通過型別用 IoC 獲取后臺任務物件的實體,然后通過反射匹配方法簽名,在實體上呼叫這個方法傳入引數即可,

public virtual void Execute(JobExecutionContext context)
{
    // 構造具體的后臺作業實體物件,
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 獲得需要執行的方法簽名,
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接通過 MethodInfo 的 Invoke 方法呼叫,傳入具體的實體物件和引數值即可,
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 如果是執行方法內的例外,則包裝進行處理,然后拋出,
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

集成 Hangfire

ABP vNext 對于 Hangfire 的集成代碼分布在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置里面,呼叫 Hangfire 庫的相關方法,注入組件到 IoC 容器當中,后者則是對后臺作業進行了適配處理,替換了默認的 IBackgroundJobManager 實作,

AbpHangfireModule 模塊內部,通過工廠創建出來一個 BackgroudJobServer 實體,并將它的生命周期與應用程式的生命周期進行系結,以便進行銷毀處理,

public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

我們直奔主題,看一下基于 Hangfire 的后臺作業管理器是怎么實作的,

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 如果沒有延遲引數,則直接通過 Enqueue() 方法扔進執行對了,
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

上述代碼中使用 HangfireJobExecutionAdapter 進行了一個適配操作,因為 Hangfire 要將一個后臺任務扔進佇列執行,不是用 TArgs 就能解決的,

轉到這個配接器定義,提供了一個 Execute(TArgs) 方法,當被添加到 Hangfire 佇列執行的時候,實際 Hangfire 會呼叫配接器的 Excetue(TArgs) 方法,然后內部還是使用的 IBackgroundJobExecuter 來執行具體定義的任務,

public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

集成 RabbitMQ

基于 RabbitMQ 的后臺作業實作,我想放在分布式事件總線里面,對其一起進行講解,

三、總結

ABP vNext 為我們提供了多種后臺作業管理器的實作,你可以根據自己的需求選用不同的后臺作業管理器,又或者是自己動手造輪子,

需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄,

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/101981.html

標籤:.NET Core

上一篇:輕量級ORM《sqlcommon》第一個版本發布了!!!

下一篇:造輪子了!NETCore跨平臺UI框架,CPF

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more