今天來寫寫C#中的異步迭代器 - 機制、概念和一些好用的特性
?
迭代器的概念
迭代器的概念在C#中出現的比較早,很多人可能已經比較熟悉了,
通常迭代器會用在一些特定的場景中,
舉個例子:有一個foreach回圈:
foreach (var item in Sources)
{
Console.WriteLine(item);
}
這個回圈實作了一個簡單的功能:把Sources中的每一項在控制臺中列印出來,
有時候,Sources可能會是一組完全快取的資料,例如:List<string>:
IEnumerable<string> Sources(int x)
{
var list = new List<string>();
for (int i = 0; i < 5; i++)
list.Add($"result from Sources, x={x}, result {i}");
return list;
}
這里會有一個小問題:在我們列印Sources的第一個的資料之前,要先運行完整運行Sources()方法來準備資料,在實際應用中,這可能會花費大量時間和記憶體,更有甚者,Sources可能是一個無邊界的串列,或者不定長的開放式串列,比方一次只處理一個資料專案的佇列,或者本身沒有邏輯結束的佇列,
這種情況,C#給出了一個很好的迭代器解決:
IEnumerable<string> Sources(int x)
{
for (int i = 0; i < 5; i++)
yield return $"result from Sources, x={x}, result {i}";
}
這個方式的作業原理與上一段代碼很像,但有一些根本的區別 - 我們沒有用快取,而只是每次讓一個元素可用,
為了幫助理解,來看看foreach在編譯器中的解釋:
using (var iter = Sources.GetEnumerator())
{
while (iter.MoveNext())
{
var item = iter.Current;
Console.WriteLine(item);
}
}
當然,這個是省略掉很多東西后的概念解釋,我們不糾結這個細節,但大體的意思是這樣的:編譯器對傳遞給foreach的運算式呼叫GetEnumerator(),然后用一個回圈去檢查是否有下一個資料(MoveNext()),在得到肯定答案后,前進并訪問Current屬性,而這個屬性代表了前進到的元素,
為防止非授權轉發,這兒給出本文的原文鏈接:https://
?
上面這個例子,我們通過MoveNext()/Current方式訪問了一個沒有大小限制的向前的串列,我們還用到了yield迭代器這個很復雜的東西 - 至少我是這么認為的,
我們把上面的例子中的yield去掉,改寫一下看看:
IEnumerable<string> Sources(int x) => new GeneratedEnumerable(x);
class GeneratedEnumerable : IEnumerable<string>
{
private int x;
public GeneratedEnumerable(int x) => this.x = x;
public IEnumerator<string> GetEnumerator() => new GeneratedEnumerator(x);
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
class GeneratedEnumerator : IEnumerator<string>
{
private int x, i;
public GeneratedEnumerator(int x) => this.x = x;
public string Current { get; private set; }
object IEnumerator.Current => Current;
public void Dispose() { }
public bool MoveNext()
{
if (i < 5)
{
Current = $"result from Sources, x={x}, result {i}";
i++;
return true;
}
else
{
return false;
}
}
void IEnumerator.Reset() => throw new NotSupportedException();
}
這樣寫完,對照上面的yield迭代器,理解作業程序就比較容易了:
- 首先,我們給出一個物件
IEnumerable,注意,IEnumerable和IEnumerator是不同的, - 當我們呼叫
Sources時,就創建了GeneratedEnumerable,它存盤狀態引數x,并公開了需要的IEnumerable方法, - 后面,在需要
foreach迭代資料時,會呼叫GetEnumerator(),而它又呼叫GeneratedEnumerator以充當資料上的游標, MoveNext()方法邏輯上實作了for回圈,只不過,每次呼叫MoveNext()只執行一步,更多的資料會通過Current回傳過來,另外補充一點:MoveNext()方法中的return false對應于yield break關鍵字,用于終止迭代,
是不是好理解了?
?
下面說說異步中的迭代器,
異步中的迭代器
上面的迭代,是同步的程序,而現在Dotnet開發作業更傾向于異步,使用async/await來做,特別是在提高服務器的可伸縮性方面應用特別多,
上面的代碼最大的問題,在于MoveNext(),很明顯,這是個同步的方法,如果它運行需要一段時間,那執行緒就會被阻塞,這會讓代碼執行程序變得不可接受,
我們能做得最接近的方法是異步獲取資料:
async Task<List<string>> Sources(int x) {...}
但是,異步獲取資料并不能解決資料快取延遲的問題,
好在,C#為此特意增加了對異步迭代器的支持:
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
注意,從.NET Standard 2.1和.NET Core 3.0開始,異步迭代器已經包含在框架中了,而在早期版本中,需要手動引入:
# dotnet add package Microsoft.Bcl.AsyncInterfaces
目前這個包的版本號是5.0.0,
?
還是上面例子的邏輯:
IAsyncEnumerable<string> Source(int x) => throw new NotImplementedException();
看看foreach可以await后的樣子:
await foreach (var item in Sources)
{
Console.WriteLine(item);
}
編譯器會將它解釋為:
await using (var iter = Sources.GetAsyncEnumerator())
{
while (await iter.MoveNextAsync())
{
var item = iter.Current;
Console.WriteLine(item);
}
}
這兒有個新東西:await using,與using用法相同,但釋放時會呼叫DisposeAsync,而不是Dispose,包括回收清理也是異步的,
這段代碼其實跟前邊的同步版本非常相似,只是增加了await,但是,編譯器會分解并重寫異步狀態機,它就變成異步的了,原理不細說了,不是本文關注的內容,
那么,帶有yield的迭代器如何異步呢?看代碼:
async IAsyncEnumerable<string> Sources(int x)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(100); // 這兒模擬異步延遲
yield return $"result from Sources, x={x}, result {i}";
}
}
嗯,看著就舒服,
?
這就完了?圖樣圖森破,異步有一個很重要的特性:取消,
那么,怎么取消異步迭代?
異步迭代的取消
異步方法通過CancellationToken來支持取消,異步迭代也不例外,看看上面IAsyncEnumerator<T>的定義,取消標志也被傳遞到了GetAsyncEnumerator()方法中,
那么,如果是手工回圈呢?我們可以這樣寫:
await foreach (var item in Sources.WithCancellation(cancellationToken).ConfigureAwait(false))
{
Console.WriteLine(item);
}
這個寫法等同于:
var iter = Sources.GetAsyncEnumerator(cancellationToken);
await using (iter.ConfigureAwait(false))
{
while (await iter.MoveNextAsync().ConfigureAwait(false))
{
var item = iter.Current;
Console.WriteLine(item);
}
}
沒錯,ConfigureAwait也適用于DisposeAsync(),所以最后就變成了:
await iter.DisposeAsync().ConfigureAwait(false);
?
異步迭代的取消捕獲做完了,接下來怎么用呢?
看代碼:
IAsyncEnumerable<string> Sources(int x) => new SourcesEnumerable(x);
class SourcesEnumerable : IAsyncEnumerable<string>
{
private int x;
public SourcesEnumerable(int x) => this.x = x;
public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(100, cancellationToken); // 模擬異步延遲
yield return $"result from Sources, x={x}, result {i}";
}
}
}
如果有CancellationToken通過WithCancellation傳過來,迭代器會在正確的時間被取消 - 包括異步獲取資料期間(例子中的Task.Delay期間),當然我們還可以在迭代器中任何一個位置檢查IsCancellationRequested或呼叫ThrowIfCancellationRequested(),
此外,編譯器也會通過[EnumeratorCancellation]來完成這個任務,所以我們還可以這樣寫:
async IAsyncEnumerable<string> Sources(int x, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(100, cancellationToken); // 模擬異步延遲
yield return $"result from Sources, x={x}, result {i}";
}
}
這個寫法與上面的代碼其實是一樣的,區別在于加了一個引數,
實際應用中,我們有下面幾種寫法上的選擇:
// 不取消
await foreach (var item in Sources)
// 通過WithCancellation取消
await foreach (var item in Sources.WithCancellation(cancellationToken))
// 通過SourcesAsync取消
await foreach (var item in SourcesAsync(cancellationToken))
// 通過SourcesAsync和WithCancellation取消
await foreach (var item in SourcesAsync(cancellationToken).WithCancellation(cancellationToken))
// 通過不同的Token取消
await foreach (var item in SourcesAsync(tokenA).WithCancellation(tokenB))
幾種方式區別于應用場景,實質上沒有區別,對兩個Token的方式,任何一個Token被取消時,任務會被取消,
總結
同步迭代其實在各個代碼中用的都比較多,但異步迭代用得很好,一方面,這是個相對新的東西,另一方面,是會有點繞,所以很多人都不敢碰,
今天這個,也是個人的一些經驗總結,希望對大家理解迭代能有所幫助,
?
?
![]() |
微信公眾號:老王Plus 掃描二維碼,關注個人公眾號,可以第一時間得到最新的個人文章和內容推送 本文著作權歸作者所有,轉載請保留此宣告和原文鏈接 |
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/235596.html
標籤:.NET Core

