我觀察到了一種我沒有預料到的行為。這里有一些代碼來說明:
class ReactiveService
{
private readonly ISubject<WorkPayload> observableWork
= new Subject<WorkPayload>();
void WorkBeingDone(WorkPayload workPayload)
{
this.observableWork.OnNext(workPayload);
}
IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}
class WorkService
{
private readonly ReactiveService reactiveService;
public WorkService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
Task DoWork()
{
this.reactiveService.WorkBeingDone(new WorkPayload());
// ... lots of logic here ...
return Task.CompletedTask;
}
}
class WorkObservingService
{
private readonly ReactiveService reactiveService;
public WorkObservingService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
async Task DoSomethingWhenWorkHappens()
{
// Now here what I want to do is that on each work being observed,
// some asynchronous operation is triggered, currently I'm doing it
// like this, but I'm open to any other suggestion.
this.reactiveService.WorkBeingDone().Select(
w => Observable.FromAsync(async () =>
{
// Do the asynchronous stuff with w...
})).Concat().Subscribe();
// Prevent the method from returning or something keep
// the subscription alive...
}
}
似乎發生的事情是,當我嘗試在 中做一些異步作業時Observable.FromAsync,它會拋出,這會一直回傳到WorkService并導致對DoWork. 但實際上,這件事不應該關心觀察者會發生什么observableWork。這是預期的嗎?我該如何防止這種情況發生?
uj5u.com熱心網友回復:
可觀察的序列是等待的,所以你可以簡單地這樣做:
async Task DoSomethingWhenWorkHappens()
{
await this.reactiveService.WorkBeingDone().Select(
w => Observable.FromAsync(async () =>
{
// Do the asynchronous stuff with w...
})).Concat().DefaultIfEmpty();
}
通過序列傳播的任何錯誤都將傳遞給結果,并在一個狀態下Task完成它。Faulted
DefaultIfEmpty鏈末尾的運算子的目的是防止出現InvalidOperationException, 以防序列發出零元素。旨在傳播序列中的await最后一個元素作為結果,因此當序列不包含任何元素時它會混淆。
另一種選擇是省略async 和 await,只需將序列轉換為Task( .ToTask()),然后直接回傳。
似乎發生的事情是,當我嘗試在 中做一些異步作業時
Observable.FromAsync,它會拋出,這會一直回傳到WorkService并導致對DoWork.
我沒有運行你的代碼,但我的理論是錯誤Observable.FromAsync升級為導致行程崩潰的未處理例外。我不希望您可以DoWork通過將其包裝在try/中來處理它catch。發生這種情況的原因是裸露的.Subscribe(),即缺少所有三個處理程式(onNext和onError)onCompleted。通過特別省略onError處理程式,您是在對 Rx 說“嘿!我不知道如何處理可能的錯誤!” . 你猜怎么著,Rx 也不知道如何處理這個錯誤,所以它做了可以說是最明智和最負責任的事情,并將其重新拋出ThreadPool,導致您的應用程式崩潰。這不是很友好,但另一方面,它可能比抑制錯誤并將其默默地扔進記憶體洞更好。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/498349.html
標籤:C# 。网 异步 system.reactive
