主頁 > .NET開發 > System.IO.Pipelines——高性能IO(一)

System.IO.Pipelines——高性能IO(一)

2020-09-23 03:04:58 .NET開發

轉自https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines

System.IO.Pipelines 是一個新庫,旨在使在 .NET 中執行高性能 I/O 更加容易, 該庫的目標為適用于所有 .NET 實作的 .NET Standard,

System.IO.Pipelines 解決什么問題

分析流資料的應用由樣板代碼組成,后者由許多專門且不尋常的代碼流組成, 樣板代碼和特殊情況代碼很復雜且難以進行維護,

System.IO.Pipelines 已構建為:

  • 具有高性能的流資料分析功能,
  • 減少代碼復雜性,

下面的代碼是典型的 TCP 服務器,它從客戶機接收行分隔的訊息(由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代碼有幾個問題:

  • 單次呼叫 ReadAsync 可能無法接收整條訊息(行尾),
  • 忽略了 stream.ReadAsync 的結果, stream.ReadAsync 回傳讀取的資料量,
  • 它不能處理在單個 ReadAsync 呼叫中讀取多行的情況,
  • 它為每次讀取分配一個 byte 陣列,

要解決上述問題,需要進行以下更改:

  • 緩沖傳入的資料,直到找到新行,

  • 分析緩沖區中回傳的所有行,

  • 該行可能大于 1KB(1024 位元組), 此代碼需要調整輸入緩沖區的大小,直到找到分隔符后,才能在緩沖區內容納完整行,

    • 如果調整緩沖區的大小,當輸入中出現較長的行時,將生成更多緩沖區副本,
    • 壓縮用于讀取行的緩沖區,以減少空余,
  • 請考慮使用緩沖池來避免重復分配記憶體,

  • 下面的代碼解決了其中一些問題:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

前面的代碼很復雜,不能解決所識別的所有問題, 高性能網路通常意味著撰寫非常復雜的代碼以使性能最大化, System.IO.Pipelines 的設計目的是使撰寫此類代碼更容易,

若要查看翻譯為非英語語言的代碼注釋,請在 此 GitHub 討論問題中告訴我們,

管道

Pipe 類可用于創建 PipeWriter/PipeReader 對, 寫入 PipeWriter 的所有資料都可用于 PipeReader

1 var pipe = new Pipe();
2 PipeReader reader = pipe.Reader;
3 PipeWriter writer = pipe.Writer;

 

管道基本用法

 1 async Task ProcessLinesAsync(Socket socket)
 2 {
 3     var pipe = new Pipe();
 4     Task writing = FillPipeAsync(socket, pipe.Writer);
 5     Task reading = ReadPipeAsync(pipe.Reader);
 6 
 7     await Task.WhenAll(reading, writing);
 8 }
 9 
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12     const int minimumBufferSize = 512;
13 
14     while (true)
15     {
16         // Allocate at least 512 bytes from the PipeWriter.
17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18         try
19         {
20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21             if (bytesRead == 0)
22             {
23                 break;
24             }
25             // Tell the PipeWriter how much was read from the Socket.
26             writer.Advance(bytesRead);
27         }
28         catch (Exception ex)
29         {
30             LogError(ex);
31             break;
32         }
33 
34         // Make the data available to the PipeReader.
35         FlushResult result = await writer.FlushAsync();
36 
37         if (result.IsCompleted)
38         {
39             break;
40         }
41     }
42 
43      // By completing PipeWriter, tell the PipeReader that there's no more data coming.
44     await writer.CompleteAsync();
45 }
46 
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49     while (true)
50     {
51         ReadResult result = await reader.ReadAsync();
52         ReadOnlySequence<byte> buffer = result.Buffer;
53 
54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55         {
56             // Process the line.
57             ProcessLine(line);
58         }
59 
60         // Tell the PipeReader how much of the buffer has been consumed.
61         reader.AdvanceTo(buffer.Start, buffer.End);
62 
63         // Stop reading if there's no more data coming.
64         if (result.IsCompleted)
65         {
66             break;
67         }
68     }
69 
70     // Mark the PipeReader as complete.
71     await reader.CompleteAsync();
72 }
73 
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76     // Look for a EOL in the buffer.
77     SequencePosition? position = buffer.PositionOf((byte)'\n');
78 
79     if (position == null)
80     {
81         line = default;
82         return false;
83     }
84 
85     // Skip the line + the \n.
86     line = buffer.Slice(0, position.Value);
87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88     return true;
89 }

有兩個回圈:

  • FillPipeAsync 從 Socket 讀取并寫入 PipeWriter
  • ReadPipeAsync 從 PipeReader 讀取并分析傳入的行,

沒有分配顯式緩沖區, 所有緩沖區管理都委托給 PipeReader 和 PipeWriter 實作, 委派緩沖區管理使使用代碼更容易集中關注業務邏輯,

在第一個回圈中:

  • 呼叫 PipeWriter.GetMemory(Int32) 從基礎撰寫器獲取記憶體,
  • 呼叫 PipeWriter.Advance(Int32) 以告知 PipeWriter 有多少資料已寫入緩沖區,
  • 呼叫 PipeWriter.FlushAsync 以使資料可用于 PipeReader

在第二個回圈中,PipeReader 使用由 PipeWriter 寫入的緩沖區, 緩沖區來自套接字, 對 PipeReader.ReadAsync 的呼叫:

  • 回傳包含兩條重要資訊的 ReadResult:

    • 以 ReadOnlySequence<byte> 形式讀取的資料,
    • 布林值 IsCompleted,指示是否已到達資料結尾 (EOF),

找到行尾 (EOL) 分隔符并分析該行后:

  • 該邏輯處理緩沖區以跳過已處理的內容,
  • 呼叫 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和檢查了多少資料,

讀取器和撰寫器回圈通過呼叫 Complete 結束, Complete 使基礎管道釋放其分配的記憶體,

反壓和流量控制

理想情況下,讀取和分析可協同作業:

  • 寫入執行緒使用來自網路的資料并將其放入緩沖區,
  • 分析執行緒負責構造適當的資料結構,

通常,分析所花費的時間比僅從網路復制資料塊所用時間更長:

  • 讀取執行緒領先于分析執行緒,
  • 讀取執行緒必須級訓或分配更多記憶體來存盤用于分析執行緒的資料,

為了獲得最佳性能,需要在頻繁暫停和分配更多記憶體之間取得平衡,

為解決上述問題,Pipe 提供了兩個設定來控制資料流:

  • PauseWriterThreshold:確定在呼叫 FlushAsync 暫停之前應緩沖多少資料,
  • ResumeWriterThreshold:確定在恢復對 PipeWriter.FlushAsync 的呼叫之前,讀取器必須觀察多少資料,

具有 ResumeWriterThreshold 和 PauseWriterThreshold 的圖

PipeWriter.FlushAsync:

  • 當 Pipe 中的資料量超過 PauseWriterThreshold 時,回傳不完整的 ValueTask<FlushResult>
  • 低于 ResumeWriterThreshold 時,回傳完整的 ValueTask<FlushResult>

使用兩個值可防止快速回圈,如果只使用一個值,則可能發生這種回圈,

示例

1 // The Pipe will start returning incomplete tasks from FlushAsync until
2 // the reader examines at least 5 bytes.
3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
4 var pipe = new Pipe(options);

PipeScheduler

通常在使用 async 和 await 時,異步代碼會在 TaskScheduler 或當前 SynchronizationContext 上恢復,

在執行 I/O 時,對執行 I/O 的位置進行細粒度控制非常重要, 此控制元件允許高效利用 CPU 快取, 高效的快取對于 Web 服務器等高性能應用至關重要, PipeScheduler 提供對異步回呼運行位置的控制, 默認情況下:

  • 使用當前的 SynchronizationContext,
  • 如果沒有 SynchronizationContext,它將使用執行緒池運行回呼,
 1 public static void Main(string[] args)
 2 {
 3     var writeScheduler = new SingleThreadPipeScheduler();
 4     var readScheduler = new SingleThreadPipeScheduler();
 5 
 6     // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
 7     var options = new PipeOptions(readerScheduler: readScheduler,
 8                                   writerScheduler: writeScheduler,
 9                                   useSynchronizationContext: false);
10     var pipe = new Pipe(options);
11 }
12 
13 // This is a sample scheduler that async callbacks on a single dedicated thread.
14 public class SingleThreadPipeScheduler : PipeScheduler
15 {
16     private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
17      new BlockingCollection<(Action<object> Action, object State)>();
18     private readonly Thread _thread;
19 
20     public SingleThreadPipeScheduler()
21     {
22         _thread = new Thread(DoWork);
23         _thread.Start();
24     }
25 
26     private void DoWork()
27     {
28         foreach (var item in _queue.GetConsumingEnumerable())
29         {
30             item.Action(item.State);
31         }
32     }
33 
34     public override void Schedule(Action<object> action, object state)
35     {
36         _queue.Add((action, state));
37     }
38 }

PipeScheduler.ThreadPool 是 PipeScheduler 實作,用于對執行緒池的回呼進行排隊, PipeScheduler.ThreadPool 是默認選項,通常也是最佳選項, PipeScheduler.Inline 可能會導致意外后果,如死鎖,

管道重置

通常重用 Pipe 物件即可重置, 若要重置管道,請在 PipeReader 和 PipeWriter 完成時呼叫 PipeReader Reset,

PipeReader

PipeReader 代表呼叫方管理記憶體, 在呼叫 PipeReader.ReadAsync 之后始終呼叫 PipeReader.AdvanceTo , 這使 PipeReader 知道呼叫方何時用完記憶體,以便可以對其進行跟蹤, 從 PipeReader.ReadAsync 回傳的 ReadOnlySequence<byte> 僅在呼叫 PipeReader.AdvanceTo 之前有效, 呼叫 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>

PipeReader.AdvanceTo 采用兩個 SequencePosition 引數:

  • 第一個引數確定消耗的記憶體量,
  • 第二個引數確定觀察到的緩沖區數,

將資料標記為“已使用”意味著管道可以將記憶體回傳到底層緩沖池, 將資料標記為“已觀察”可控制對 PipeReader.ReadAsync 的下一個呼叫的操作, 將所有內容都標記為“已觀察”意味著下次對 PipeReader.ReadAsync 的呼叫將不會回傳,直到有更多資料寫入管道, 任何其他值都將使對 PipeReader.ReadAsync 的下一次呼叫立即回傳并包含已觀察到的和未觀察到的資料,但不是已被使用的資料 ,

讀取流資料方案

嘗試讀取流資料時會出現以下幾種典型模式:

  • 給定資料流時,分析單條訊息,
  • 給定資料流時,分析所有可用訊息,

以下示例使用 TryParseMessage 方法分析來自 ReadOnlySequence<byte> 的訊息, TryParseMessage 分析單條訊息并更新輸入緩沖區,以從緩沖區中剪裁已分析的訊息, TryParseMessage 不是 .NET 的一部分,它是在以下部分中使用的用戶撰寫的方法,

1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);

讀取單條訊息

下面的代碼從 PipeReader 讀取一條訊息并將其回傳給呼叫方,

 1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 2  CancellationToken cancellationToken = default)
 3 {
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         // In the event that no message is parsed successfully, mark consumed
10         // as nothing and examined as the entire buffer.
11         SequencePosition consumed = buffer.Start;
12         SequencePosition examined = buffer.End;
13 
14         try
15         {
16             if (TryParseMessage(ref buffer, out Message message))
17             {
18                 // A single message was successfully parsed so mark the start as the
19                 // parsed buffer as consumed. TryParseMessage trims the buffer to
20                 // point to the data after the message was parsed.
21                 consumed = buffer.Start;
22 
23                 // Examined is marked the same as consumed here, so the next call
24                 // to ReadSingleMessageAsync will process the next message if there's
25                 // one.
26                 examined = consumed;
27 
28                 return message;
29             }
30 
31             // There's no more data to be processed.
32             if (result.IsCompleted)
33             {
34                 if (buffer.Length > 0)
35                 {
36                     // The message is incomplete and there's no more data to process.
37                     throw new InvalidDataException("Incomplete message.");
38                 }
39 
40                 break;
41             }
42         }
43         finally
44         {
45             reader.AdvanceTo(consumed, examined);
46         }
47     }
48 
49     return null;
50 }

前面的代碼:

  • 分析單條訊息,
  • 更新已使用的 SequencePosition 并檢查 SequencePosition 以指向已剪裁的輸入緩沖區的開始,

因為 TryParseMessage 從輸入緩沖區中洗掉了已分析的訊息,所以更新了兩個 SequencePosition 引數, 通常,分析來自緩沖區的單條訊息時,檢查的位置應為以下位置之一:

  • 訊息的結尾,
  • 如果未找到訊息,則回傳接識訓沖區的結尾,

單條訊息案例最有可能出現錯誤, 將錯誤的值傳遞給“已檢查”可能會導致記憶體不足例外或無限回圈 , 有關詳細資訊,請參閱本文中的 PipeReader 常見問題部分,

讀取多條訊息

以下代碼從 PipeReader 讀取所有訊息,并在每條訊息上呼叫 ProcessMessageAsync

 1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
 2 {
 3     try
 4     {
 5         while (true)
 6         {
 7             ReadResult result = await reader.ReadAsync(cancellationToken);
 8             ReadOnlySequence<byte> buffer = result.Buffer;
 9 
10             try
11             {
12                 // Process all messages from the buffer, modifying the input buffer on each
13                 // iteration.
14                 while (TryParseMessage(ref buffer, out Message message))
15                 {
16                     await ProcessMessageAsync(message);
17                 }
18 
19                 // There's no more data to be processed.
20                 if (result.IsCompleted)
21                 {
22                     if (buffer.Length > 0)
23                     {
24                         // The message is incomplete and there's no more data to process.
25                         throw new InvalidDataException("Incomplete message.");
26                     }
27                     break;
28                 }
29             }
30             finally
31             {
32                 // Since all messages in the buffer are being processed, you can use the
33                 // remaining buffer's Start and End position to determine consumed and examined.
34                 reader.AdvanceTo(buffer.Start, buffer.End);
35             }
36         }
37     }
38     finally
39     {
40         await reader.CompleteAsync();
41     }
42 }

取消

PipeReader.ReadAsync

  • 支持傳遞 CancellationToken,
  • 如果在讀取掛起期間取消了 CancellationToken,則會引發 OperationCanceledException,
  • 支持通過 PipeReader.CancelPendingRead 取消當前讀取操作的方法,這樣可以避免引發例外, 呼叫 PipeReader.CancelPendingRead 將導致對 PipeReader.ReadAsync 的當前或下次呼叫回傳 ReadResult,并將 IsCanceled 設定為 true, 這對于以非破壞性和非例外的方式停止現有的讀取回圈非常有用,
 1 private PipeReader reader;
 2 
 3 public MyConnection(PipeReader reader)
 4 {
 5     this.reader = reader;
 6 }
 7 
 8 public void Abort()
 9 {
10     // Cancel the pending read so the process loop ends without an exception.
11     reader.CancelPendingRead();
12 }
13 
14 public async Task ProcessMessagesAsync()
15 {
16     try
17     {
18         while (true)
19         {
20             ReadResult result = await reader.ReadAsync();
21             ReadOnlySequence<byte> buffer = result.Buffer;
22 
23             try
24             {
25                 if (result.IsCanceled)
26                 {
27                     // The read was canceled. You can quit without reading the existing data.
28                     break;
29                 }
30 
31                 // Process all messages from the buffer, modifying the input buffer on each
32                 // iteration.
33                 while (TryParseMessage(ref buffer, out Message message))
34                 {
35                     await ProcessMessageAsync(message);
36                 }
37 
38                 // There's no more data to be processed.
39                 if (result.IsCompleted)
40                 {
41                     break;
42                 }
43             }
44             finally
45             {
46                 // Since all messages in the buffer are being processed, you can use the
47                 // remaining buffer's Start and End position to determine consumed and examined.
48                 reader.AdvanceTo(buffer.Start, buffer.End);
49             }
50         }
51     }
52     finally
53     {
54         await reader.CompleteAsync();
55     }
56 }

PipeReader 常見問題

  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致讀取已讀取的資料,

  • 傳遞 buffer.End 作為檢查物件可能會導致以下問題:

    • 資料停止
    • 如果資料未使用,可能最侄訓出現記憶體不足 (OOM) 例外, 例如,當一次處理來自緩沖區的單條訊息時,可能會出現 PipeReader.AdvanceTo(position, buffer.End)
  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致無限回圈, 例如,如果 buffer.Start 沒有更改,則 PipeReader.AdvanceTo(buffer.Start) 將導致在下一個對 PipeReader.ReadAsync 的呼叫在新資料到來之前立即回傳,

  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致無限緩沖(最終導致 OOM),

  • 在呼叫 PipeReader.AdvanceTo 之后使用 ReadOnlySequence<byte> 可能會導致記憶體損壞(在釋放之后使用),

  • 未能呼叫 PipeReader.Complete/CompleteAsync 可能會導致記憶體泄漏,

  • 在處理緩沖區之前檢查 ReadResult.IsCompleted 并退出讀取邏輯會導致資料丟失, 回圈退出條件應基于 ReadResult.Buffer.IsEmpty 和 ReadResult.IsCompleted, 如果錯誤執行此操作,可能會導致無限回圈,

有問題的代碼

? 資料丟失

當 IsCompleted 被設定為 true 時,ReadResult 可能會回傳最后一段資料, 在退出讀回圈之前不讀取該資料將導致資料丟失,

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
 6 
 7     if (result.IsCompleted)
 8     {
 9         break;
10     }
11 
12     Process(ref dataLossBuffer, out Message message);
13 
14     reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
15 }

 警告

不要使用上述代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 前面的示例用于解釋 PipeReader 常見問題,

? 無限回圈

如果 Result.IsCompleted 是 true,則以下邏輯可能會導致無限回圈,但緩沖區中永遠不會有完整的訊息,

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6     if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
 7     {
 8         break;
 9     }
10 
11     Process(ref infiniteLoopBuffer, out Message message);
12 
13     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
14 }

下面是另一段具有相同問題的代碼, 該代碼在檢查 ReadResult.IsCompleted 之前檢查非慷訓沖區, 由于該代碼位于 else if 中,如果緩沖區中沒有完整的訊息,它將永遠回圈,

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6 
 7     if (!infiniteLoopBuffer.IsEmpty)
 8     {
 9         Process(ref infiniteLoopBuffer, out Message message);
10     }
11     else if (result.IsCompleted)
12     {
13         break;
14     }
15 
16     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
17 }

? 意外掛起

在分析單條訊息時,如果無條件呼叫 PipeReader.AdvanceTo 而 buffer.End 位于 examined 位置,則可能導致掛起, 對 PipeReader.AdvanceTo 的下次呼叫將在以下情況下回傳:

  • 有更多資料寫入管道,
  • 以及之前未檢查過新資料,

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> hangBuffer = result.Buffer;
 6 
 7     Process(ref hangBuffer, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

? 記憶體不足 (OOM)

在滿足以下條件的情況下,以下代碼將保持緩沖,直到發生 OutOfMemoryException:

  • 沒有最大訊息大小,
  • 從 PipeReader 回傳的資料不會生成完整的訊息, 例如,它不會生成完整的訊息,因為另一端正在撰寫一條大訊息(例如,一條為 4GB 的訊息),

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
 6 
 7     Process(ref thisCouldOutOfMemory, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

? 記憶體損壞

當寫入讀取緩沖區的幫助程式時,應在呼叫 Advance 之前復制任何回傳的有效負載, 下面的示例將回傳 Pipe 已丟棄的記憶體,并可能將其重新用于下一個操作(讀/寫),

 警告

不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,

1 public class Message
2 {
3     public ReadOnlySequence<byte> CorruptedPayload { get; set; }
4 }
 1 Environment.FailFast("This code is terrible, don't use it!");
 2     Message message = null;
 3 
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         ReadHeader(ref buffer, out int length);
10 
11         if (length <= buffer.Length)
12         {
13             message = new Message
14             {
15                 // Slice the payload from the existing buffer
16                 CorruptedPayload = buffer.Slice(0, length)
17             };
18 
19             buffer = buffer.Slice(length);
20         }
21 
22         if (result.IsCompleted)
23         {
24             break;
25         }
26 
27         reader.AdvanceTo(buffer.Start, buffer.End);
28 
29         if (message != null)
30         {
31             // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
32             // was captured.
33             break;
34         }
35     }
36 
37     return message;
38 }

PipeWriter

PipeWriter 管理用于代表呼叫方寫入的緩沖區, PipeWriter 實作IBufferWriter<byte>, IBufferWriter<byte> 使得無需額外的緩沖區副本就可以訪問緩沖區來執行寫入操作,

 1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
 2 {
 3     // Request at least 5 bytes from the PipeWriter.
 4     Memory<byte> memory = writer.GetMemory(5);
 5 
 6     // Write directly into the buffer.
 7     int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
 8 
 9     // Tell the writer how many bytes were written.
10     writer.Advance(written);
11 
12     await writer.FlushAsync(cancellationToken);
13 }

之前的代碼:

  • 使用 GetMemory 從 PipeWriter 請求至少 5 個位元組的緩沖區,
  • 將 ASCII 字串 "Hello" 的位元組寫入回傳的 Memory<byte>
  • 呼叫 Advance 以指示寫入緩沖區的位元組數,
  • 重繪 PipeWriter,以便將位元組發送到基礎設備,

以前的寫入方法使用 PipeWriter 提供的緩沖區, 或者,PipeWriter.WriteAsync:

  • 將現有緩沖區復制到 PipeWriter
  • 根據需要呼叫 GetSpan``Advance,然后呼叫 FlushAsync,
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
2 {
3     byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
4 
5     // Write helloBytes to the writer, there's no need to call Advance here
6     // (Write does that).
7     await writer.WriteAsync(helloBytes, cancellationToken);
8 }

取消

FlushAsync 支持傳遞 CancellationToken, 如果令牌在重繪掛起時被取消,則傳遞 CancellationToken 將導致 OperationCanceledException, PipeWriter.FlushAsync 支持通過 PipeWriter.CancelPendingFlush 取消當前重繪操作而不引發例外的方法, 呼叫 PipeWriter.CancelPendingFlush 將導致對 PipeWriter.FlushAsync 或 PipeWriter.WriteAsync 的當前或下次呼叫回傳 FlushResult,并將 IsCanceled 設定為 true, 這對于以非破壞性和非例外的方式停止暫停重繪非常有用,

PipeWriter 常見問題

  • GetSpan 和 GetMemory 回傳至少具有請求記憶體量的緩沖區, 請勿假設確切的緩沖區大小 ,
  • 無法保證連續的呼叫將回傳相同的緩沖區或相同大小的緩沖區,
  • 在呼叫 Advance 之后,必須請求一個新的緩沖區來繼續寫入更多資料, 不能寫入先前獲得的緩沖區,
  • 如果未完成對 FlushAsync 的呼叫,則呼叫 GetMemory 或 GetSpan 將不安全,
  • 如果未重繪資料,則呼叫 Complete 或 CompleteAsync 可能導致記憶體損壞,

IDuplexPipe

IDuplexPipe 是支持讀寫的型別的協定, 例如,網路連接將由 IDuplexPipe 表示,

與包含 PipeReader 和 PipeWriter 的 Pipe 不同,IDuplexPipe 代表全雙工連接的單側, 這意味著寫入 PipeWriter 的內容不會從 PipeReader 中讀取,

在讀取或寫入流資料時,通常使用反序列化程式讀取資料,并使用序列化程式寫入資料, 大多數讀取和寫入流 API 都有一個 Stream 引數, 為了更輕松地與這些現有 API 集成,PipeReader 和 PipeWriter 公開了一個 AsStream, AsStream 回傳圍繞 PipeReader 或 PipeWriter 的 Stream 實作,

 

System.IO.Pipelines——高性能IO(一) 

System.IO.Pipelines——高性能IO(二)   

System.IO.Pipelines——高性能IO(三)  

 


轉載請標明本文來源:https://www.cnblogs.com/yswenli/p/11810317.html
更多內容歡迎Star、Fork我的的github:https://github.com/yswenli/
如果發現本文有什么問題和任何建議,也隨時歡迎交流~

 

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/108746.html

標籤:C#

上一篇:C# Moq

下一篇:轉:C# String為值型別還是參考型別

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more