轉自https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines
System.IO.Pipelines 是一個新庫,旨在使在 .NET 中執行高性能 I/O 更加容易, 該庫的目標為適用于所有 .NET 實作的 .NET Standard,
System.IO.Pipelines 解決什么問題
分析流資料的應用由樣板代碼組成,后者由許多專門且不尋常的代碼流組成, 樣板代碼和特殊情況代碼很復雜且難以進行維護,
System.IO.Pipelines 已構建為:
- 具有高性能的流資料分析功能,
- 減少代碼復雜性,
下面的代碼是典型的 TCP 服務器,它從客戶機接收行分隔的訊息(由 '\n' 分隔):
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // Process a single line from the buffer ProcessLine(buffer); }
前面的代碼有幾個問題:
- 單次呼叫
ReadAsync可能無法接收整條訊息(行尾), - 忽略了
stream.ReadAsync的結果,stream.ReadAsync回傳讀取的資料量, - 它不能處理在單個
ReadAsync呼叫中讀取多行的情況, - 它為每次讀取分配一個
byte陣列,
要解決上述問題,需要進行以下更改:
-
緩沖傳入的資料,直到找到新行,
-
分析緩沖區中回傳的所有行,
-
該行可能大于 1KB(1024 位元組), 此代碼需要調整輸入緩沖區的大小,直到找到分隔符后,才能在緩沖區內容納完整行,
- 如果調整緩沖區的大小,當輸入中出現較長的行時,將生成更多緩沖區副本,
- 壓縮用于讀取行的緩沖區,以減少空余,
-
請考慮使用緩沖池來避免重復分配記憶體,
-
下面的代碼解決了其中一些問題:
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // Calculate the amount of bytes remaining in the buffer. var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // Double the buffer size and copy the previously buffered data into the new buffer. var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // Return the old buffer to the pool. ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes. bytesBuffered += bytesRead; var linePosition = -1; do { // Look for a EOL in the buffered data. linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset. var lineLength = linePosition - bytesConsumed; // Process the line. ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line consumed (including \n). bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
前面的代碼很復雜,不能解決所識別的所有問題, 高性能網路通常意味著撰寫非常復雜的代碼以使性能最大化, System.IO.Pipelines 的設計目的是使撰寫此類代碼更容易,
若要查看翻譯為非英語語言的代碼注釋,請在 此 GitHub 討論問題中告訴我們,
管道
Pipe 類可用于創建 PipeWriter/PipeReader 對, 寫入 PipeWriter 的所有資料都可用于 PipeReader:
1 var pipe = new Pipe(); 2 PipeReader reader = pipe.Reader; 3 PipeWriter writer = pipe.Writer;
管道基本用法
1 async Task ProcessLinesAsync(Socket socket) 2 { 3 var pipe = new Pipe(); 4 Task writing = FillPipeAsync(socket, pipe.Writer); 5 Task reading = ReadPipeAsync(pipe.Reader); 6 7 await Task.WhenAll(reading, writing); 8 } 9 10 async Task FillPipeAsync(Socket socket, PipeWriter writer) 11 { 12 const int minimumBufferSize = 512; 13 14 while (true) 15 { 16 // Allocate at least 512 bytes from the PipeWriter. 17 Memory<byte> memory = writer.GetMemory(minimumBufferSize); 18 try 19 { 20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); 21 if (bytesRead == 0) 22 { 23 break; 24 } 25 // Tell the PipeWriter how much was read from the Socket. 26 writer.Advance(bytesRead); 27 } 28 catch (Exception ex) 29 { 30 LogError(ex); 31 break; 32 } 33 34 // Make the data available to the PipeReader. 35 FlushResult result = await writer.FlushAsync(); 36 37 if (result.IsCompleted) 38 { 39 break; 40 } 41 } 42 43 // By completing PipeWriter, tell the PipeReader that there's no more data coming. 44 await writer.CompleteAsync(); 45 } 46 47 async Task ReadPipeAsync(PipeReader reader) 48 { 49 while (true) 50 { 51 ReadResult result = await reader.ReadAsync(); 52 ReadOnlySequence<byte> buffer = result.Buffer; 53 54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) 55 { 56 // Process the line. 57 ProcessLine(line); 58 } 59 60 // Tell the PipeReader how much of the buffer has been consumed. 61 reader.AdvanceTo(buffer.Start, buffer.End); 62 63 // Stop reading if there's no more data coming. 64 if (result.IsCompleted) 65 { 66 break; 67 } 68 } 69 70 // Mark the PipeReader as complete. 71 await reader.CompleteAsync(); 72 } 73 74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) 75 { 76 // Look for a EOL in the buffer. 77 SequencePosition? position = buffer.PositionOf((byte)'\n'); 78 79 if (position == null) 80 { 81 line = default; 82 return false; 83 } 84 85 // Skip the line + the \n. 86 line = buffer.Slice(0, position.Value); 87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); 88 return true; 89 }
有兩個回圈:
FillPipeAsync從Socket讀取并寫入PipeWriter,ReadPipeAsync從PipeReader讀取并分析傳入的行,
沒有分配顯式緩沖區, 所有緩沖區管理都委托給 PipeReader 和 PipeWriter 實作, 委派緩沖區管理使使用代碼更容易集中關注業務邏輯,
在第一個回圈中:
- 呼叫 PipeWriter.GetMemory(Int32) 從基礎撰寫器獲取記憶體,
- 呼叫 PipeWriter.Advance(Int32) 以告知
PipeWriter有多少資料已寫入緩沖區, - 呼叫 PipeWriter.FlushAsync 以使資料可用于
PipeReader,
在第二個回圈中,PipeReader 使用由 PipeWriter 寫入的緩沖區, 緩沖區來自套接字, 對 PipeReader.ReadAsync 的呼叫:
-
回傳包含兩條重要資訊的 ReadResult:
- 以
ReadOnlySequence<byte>形式讀取的資料, - 布林值
IsCompleted,指示是否已到達資料結尾 (EOF),
- 以
找到行尾 (EOL) 分隔符并分析該行后:
- 該邏輯處理緩沖區以跳過已處理的內容,
- 呼叫
PipeReader.AdvanceTo以告知PipeReader已消耗和檢查了多少資料,
讀取器和撰寫器回圈通過呼叫 Complete 結束, Complete 使基礎管道釋放其分配的記憶體,
反壓和流量控制
理想情況下,讀取和分析可協同作業:
- 寫入執行緒使用來自網路的資料并將其放入緩沖區,
- 分析執行緒負責構造適當的資料結構,
通常,分析所花費的時間比僅從網路復制資料塊所用時間更長:
- 讀取執行緒領先于分析執行緒,
- 讀取執行緒必須級訓或分配更多記憶體來存盤用于分析執行緒的資料,
為了獲得最佳性能,需要在頻繁暫停和分配更多記憶體之間取得平衡,
為解決上述問題,Pipe 提供了兩個設定來控制資料流:
- PauseWriterThreshold:確定在呼叫 FlushAsync 暫停之前應緩沖多少資料,
- ResumeWriterThreshold:確定在恢復對
PipeWriter.FlushAsync的呼叫之前,讀取器必須觀察多少資料,

PipeWriter.FlushAsync:
- 當
Pipe中的資料量超過PauseWriterThreshold時,回傳不完整的ValueTask<FlushResult>, - 低于
ResumeWriterThreshold時,回傳完整的ValueTask<FlushResult>,
使用兩個值可防止快速回圈,如果只使用一個值,則可能發生這種回圈,
示例
1 // The Pipe will start returning incomplete tasks from FlushAsync until 2 // the reader examines at least 5 bytes. 3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5); 4 var pipe = new Pipe(options);
PipeScheduler
通常在使用 async 和 await 時,異步代碼會在 TaskScheduler 或當前 SynchronizationContext 上恢復,
在執行 I/O 時,對執行 I/O 的位置進行細粒度控制非常重要, 此控制元件允許高效利用 CPU 快取, 高效的快取對于 Web 服務器等高性能應用至關重要, PipeScheduler 提供對異步回呼運行位置的控制, 默認情況下:
- 使用當前的 SynchronizationContext,
- 如果沒有
SynchronizationContext,它將使用執行緒池運行回呼,
1 public static void Main(string[] args) 2 { 3 var writeScheduler = new SingleThreadPipeScheduler(); 4 var readScheduler = new SingleThreadPipeScheduler(); 5 6 // Tell the Pipe what schedulers to use and disable the SynchronizationContext. 7 var options = new PipeOptions(readerScheduler: readScheduler, 8 writerScheduler: writeScheduler, 9 useSynchronizationContext: false); 10 var pipe = new Pipe(options); 11 } 12 13 // This is a sample scheduler that async callbacks on a single dedicated thread. 14 public class SingleThreadPipeScheduler : PipeScheduler 15 { 16 private readonly BlockingCollection<(Action<object> Action, object State)> _queue = 17 new BlockingCollection<(Action<object> Action, object State)>(); 18 private readonly Thread _thread; 19 20 public SingleThreadPipeScheduler() 21 { 22 _thread = new Thread(DoWork); 23 _thread.Start(); 24 } 25 26 private void DoWork() 27 { 28 foreach (var item in _queue.GetConsumingEnumerable()) 29 { 30 item.Action(item.State); 31 } 32 } 33 34 public override void Schedule(Action<object> action, object state) 35 { 36 _queue.Add((action, state)); 37 } 38 }
PipeScheduler.ThreadPool 是 PipeScheduler 實作,用于對執行緒池的回呼進行排隊, PipeScheduler.ThreadPool 是默認選項,通常也是最佳選項, PipeScheduler.Inline 可能會導致意外后果,如死鎖,
管道重置
通常重用 Pipe 物件即可重置, 若要重置管道,請在 PipeReader 和 PipeWriter 完成時呼叫 PipeReader Reset,
PipeReader
PipeReader 代表呼叫方管理記憶體, 在呼叫 PipeReader.ReadAsync 之后始終呼叫 PipeReader.AdvanceTo , 這使 PipeReader 知道呼叫方何時用完記憶體,以便可以對其進行跟蹤, 從 PipeReader.ReadAsync 回傳的 ReadOnlySequence<byte> 僅在呼叫 PipeReader.AdvanceTo 之前有效, 呼叫 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>,
PipeReader.AdvanceTo 采用兩個 SequencePosition 引數:
- 第一個引數確定消耗的記憶體量,
- 第二個引數確定觀察到的緩沖區數,
將資料標記為“已使用”意味著管道可以將記憶體回傳到底層緩沖池, 將資料標記為“已觀察”可控制對 PipeReader.ReadAsync 的下一個呼叫的操作, 將所有內容都標記為“已觀察”意味著下次對 PipeReader.ReadAsync 的呼叫將不會回傳,直到有更多資料寫入管道, 任何其他值都將使對 PipeReader.ReadAsync 的下一次呼叫立即回傳并包含已觀察到的和未觀察到的資料,但不是已被使用的資料 ,
讀取流資料方案
嘗試讀取流資料時會出現以下幾種典型模式:
- 給定資料流時,分析單條訊息,
- 給定資料流時,分析所有可用訊息,
以下示例使用 TryParseMessage 方法分析來自 ReadOnlySequence<byte> 的訊息, TryParseMessage 分析單條訊息并更新輸入緩沖區,以從緩沖區中剪裁已分析的訊息, TryParseMessage 不是 .NET 的一部分,它是在以下部分中使用的用戶撰寫的方法,
1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);
讀取單條訊息
下面的代碼從 PipeReader 讀取一條訊息并將其回傳給呼叫方,
1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader, 2 CancellationToken cancellationToken = default) 3 { 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 // In the event that no message is parsed successfully, mark consumed 10 // as nothing and examined as the entire buffer. 11 SequencePosition consumed = buffer.Start; 12 SequencePosition examined = buffer.End; 13 14 try 15 { 16 if (TryParseMessage(ref buffer, out Message message)) 17 { 18 // A single message was successfully parsed so mark the start as the 19 // parsed buffer as consumed. TryParseMessage trims the buffer to 20 // point to the data after the message was parsed. 21 consumed = buffer.Start; 22 23 // Examined is marked the same as consumed here, so the next call 24 // to ReadSingleMessageAsync will process the next message if there's 25 // one. 26 examined = consumed; 27 28 return message; 29 } 30 31 // There's no more data to be processed. 32 if (result.IsCompleted) 33 { 34 if (buffer.Length > 0) 35 { 36 // The message is incomplete and there's no more data to process. 37 throw new InvalidDataException("Incomplete message."); 38 } 39 40 break; 41 } 42 } 43 finally 44 { 45 reader.AdvanceTo(consumed, examined); 46 } 47 } 48 49 return null; 50 }
前面的代碼:
- 分析單條訊息,
- 更新已使用的
SequencePosition并檢查SequencePosition以指向已剪裁的輸入緩沖區的開始,
因為 TryParseMessage 從輸入緩沖區中洗掉了已分析的訊息,所以更新了兩個 SequencePosition 引數, 通常,分析來自緩沖區的單條訊息時,檢查的位置應為以下位置之一:
- 訊息的結尾,
- 如果未找到訊息,則回傳接識訓沖區的結尾,
單條訊息案例最有可能出現錯誤, 將錯誤的值傳遞給“已檢查”可能會導致記憶體不足例外或無限回圈 , 有關詳細資訊,請參閱本文中的 PipeReader 常見問題部分,
讀取多條訊息
以下代碼從 PipeReader 讀取所有訊息,并在每條訊息上呼叫 ProcessMessageAsync,
1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default) 2 { 3 try 4 { 5 while (true) 6 { 7 ReadResult result = await reader.ReadAsync(cancellationToken); 8 ReadOnlySequence<byte> buffer = result.Buffer; 9 10 try 11 { 12 // Process all messages from the buffer, modifying the input buffer on each 13 // iteration. 14 while (TryParseMessage(ref buffer, out Message message)) 15 { 16 await ProcessMessageAsync(message); 17 } 18 19 // There's no more data to be processed. 20 if (result.IsCompleted) 21 { 22 if (buffer.Length > 0) 23 { 24 // The message is incomplete and there's no more data to process. 25 throw new InvalidDataException("Incomplete message."); 26 } 27 break; 28 } 29 } 30 finally 31 { 32 // Since all messages in the buffer are being processed, you can use the 33 // remaining buffer's Start and End position to determine consumed and examined. 34 reader.AdvanceTo(buffer.Start, buffer.End); 35 } 36 } 37 } 38 finally 39 { 40 await reader.CompleteAsync(); 41 } 42 }
取消
PipeReader.ReadAsync:
- 支持傳遞 CancellationToken,
- 如果在讀取掛起期間取消了
CancellationToken,則會引發 OperationCanceledException, - 支持通過 PipeReader.CancelPendingRead 取消當前讀取操作的方法,這樣可以避免引發例外, 呼叫
PipeReader.CancelPendingRead將導致對PipeReader.ReadAsync的當前或下次呼叫回傳 ReadResult,并將IsCanceled設定為true, 這對于以非破壞性和非例外的方式停止現有的讀取回圈非常有用,
1 private PipeReader reader; 2 3 public MyConnection(PipeReader reader) 4 { 5 this.reader = reader; 6 } 7 8 public void Abort() 9 { 10 // Cancel the pending read so the process loop ends without an exception. 11 reader.CancelPendingRead(); 12 } 13 14 public async Task ProcessMessagesAsync() 15 { 16 try 17 { 18 while (true) 19 { 20 ReadResult result = await reader.ReadAsync(); 21 ReadOnlySequence<byte> buffer = result.Buffer; 22 23 try 24 { 25 if (result.IsCanceled) 26 { 27 // The read was canceled. You can quit without reading the existing data. 28 break; 29 } 30 31 // Process all messages from the buffer, modifying the input buffer on each 32 // iteration. 33 while (TryParseMessage(ref buffer, out Message message)) 34 { 35 await ProcessMessageAsync(message); 36 } 37 38 // There's no more data to be processed. 39 if (result.IsCompleted) 40 { 41 break; 42 } 43 } 44 finally 45 { 46 // Since all messages in the buffer are being processed, you can use the 47 // remaining buffer's Start and End position to determine consumed and examined. 48 reader.AdvanceTo(buffer.Start, buffer.End); 49 } 50 } 51 } 52 finally 53 { 54 await reader.CompleteAsync(); 55 } 56 }
PipeReader 常見問題
-
將錯誤的值傳遞給
consumed或examined可能會導致讀取已讀取的資料, -
傳遞
buffer.End作為檢查物件可能會導致以下問題:- 資料停止
- 如果資料未使用,可能最侄訓出現記憶體不足 (OOM) 例外, 例如,當一次處理來自緩沖區的單條訊息時,可能會出現
PipeReader.AdvanceTo(position, buffer.End),
-
將錯誤的值傳遞給
consumed或examined可能會導致無限回圈, 例如,如果buffer.Start沒有更改,則PipeReader.AdvanceTo(buffer.Start)將導致在下一個對PipeReader.ReadAsync的呼叫在新資料到來之前立即回傳, -
將錯誤的值傳遞給
consumed或examined可能會導致無限緩沖(最終導致 OOM), -
在呼叫
PipeReader.AdvanceTo之后使用ReadOnlySequence<byte>可能會導致記憶體損壞(在釋放之后使用), -
未能呼叫
PipeReader.Complete/CompleteAsync可能會導致記憶體泄漏, -
在處理緩沖區之前檢查 ReadResult.IsCompleted 并退出讀取邏輯會導致資料丟失, 回圈退出條件應基于
ReadResult.Buffer.IsEmpty和ReadResult.IsCompleted, 如果錯誤執行此操作,可能會導致無限回圈,
有問題的代碼
? 資料丟失
當 IsCompleted 被設定為 true 時,ReadResult 可能會回傳最后一段資料, 在退出讀回圈之前不讀取該資料將導致資料丟失,
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> dataLossBuffer = result.Buffer; 6 7 if (result.IsCompleted) 8 { 9 break; 10 } 11 12 Process(ref dataLossBuffer, out Message message); 13 14 reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End); 15 }
警告
不要使用上述代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 前面的示例用于解釋 PipeReader 常見問題,
? 無限回圈
如果 Result.IsCompleted 是 true,則以下邏輯可能會導致無限回圈,但緩沖區中永遠不會有完整的訊息,
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 if (result.IsCompleted && infiniteLoopBuffer.IsEmpty) 7 { 8 break; 9 } 10 11 Process(ref infiniteLoopBuffer, out Message message); 12 13 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 14 }
下面是另一段具有相同問題的代碼, 該代碼在檢查 ReadResult.IsCompleted 之前檢查非慷訓沖區, 由于該代碼位于 else if 中,如果緩沖區中沒有完整的訊息,它將永遠回圈,
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 7 if (!infiniteLoopBuffer.IsEmpty) 8 { 9 Process(ref infiniteLoopBuffer, out Message message); 10 } 11 else if (result.IsCompleted) 12 { 13 break; 14 } 15 16 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 17 }
? 意外掛起
在分析單條訊息時,如果無條件呼叫 PipeReader.AdvanceTo 而 buffer.End 位于 examined 位置,則可能導致掛起, 對 PipeReader.AdvanceTo 的下次呼叫將在以下情況下回傳:
- 有更多資料寫入管道,
- 以及之前未檢查過新資料,
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> hangBuffer = result.Buffer; 6 7 Process(ref hangBuffer, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(hangBuffer.Start, hangBuffer.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
? 記憶體不足 (OOM)
在滿足以下條件的情況下,以下代碼將保持緩沖,直到發生 OutOfMemoryException:
- 沒有最大訊息大小,
- 從
PipeReader回傳的資料不會生成完整的訊息, 例如,它不會生成完整的訊息,因為另一端正在撰寫一條大訊息(例如,一條為 4GB 的訊息),
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer; 6 7 Process(ref thisCouldOutOfMemory, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
? 記憶體損壞
當寫入讀取緩沖區的幫助程式時,應在呼叫 Advance 之前復制任何回傳的有效負載, 下面的示例將回傳 Pipe 已丟棄的記憶體,并可能將其重新用于下一個操作(讀/寫),
警告
不要使用以下代碼 , 使用此示例將導致資料丟失、掛起和安全問題,并且不應復制 , 以下示例用于解釋 PipeReader 常見問題,
1 public class Message 2 { 3 public ReadOnlySequence<byte> CorruptedPayload { get; set; } 4 }
1 Environment.FailFast("This code is terrible, don't use it!"); 2 Message message = null; 3 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 ReadHeader(ref buffer, out int length); 10 11 if (length <= buffer.Length) 12 { 13 message = new Message 14 { 15 // Slice the payload from the existing buffer 16 CorruptedPayload = buffer.Slice(0, length) 17 }; 18 19 buffer = buffer.Slice(length); 20 } 21 22 if (result.IsCompleted) 23 { 24 break; 25 } 26 27 reader.AdvanceTo(buffer.Start, buffer.End); 28 29 if (message != null) 30 { 31 // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer 32 // was captured. 33 break; 34 } 35 } 36 37 return message; 38 }
PipeWriter
PipeWriter 管理用于代表呼叫方寫入的緩沖區, PipeWriter 實作IBufferWriter<byte>, IBufferWriter<byte> 使得無需額外的緩沖區副本就可以訪問緩沖區來執行寫入操作,
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 // Request at least 5 bytes from the PipeWriter. 4 Memory<byte> memory = writer.GetMemory(5); 5 6 // Write directly into the buffer. 7 int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span); 8 9 // Tell the writer how many bytes were written. 10 writer.Advance(written); 11 12 await writer.FlushAsync(cancellationToken); 13 }
之前的代碼:
- 使用 GetMemory 從
PipeWriter請求至少 5 個位元組的緩沖區, - 將 ASCII 字串
"Hello"的位元組寫入回傳的Memory<byte>, - 呼叫 Advance 以指示寫入緩沖區的位元組數,
- 重繪
PipeWriter,以便將位元組發送到基礎設備,
以前的寫入方法使用 PipeWriter 提供的緩沖區, 或者,PipeWriter.WriteAsync:
- 將現有緩沖區復制到
PipeWriter, - 根據需要呼叫
GetSpan``Advance,然后呼叫 FlushAsync,
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 byte[] helloBytes = Encoding.ASCII.GetBytes("Hello"); 4 5 // Write helloBytes to the writer, there's no need to call Advance here 6 // (Write does that). 7 await writer.WriteAsync(helloBytes, cancellationToken); 8 }
取消
FlushAsync 支持傳遞 CancellationToken, 如果令牌在重繪掛起時被取消,則傳遞 CancellationToken 將導致 OperationCanceledException, PipeWriter.FlushAsync 支持通過 PipeWriter.CancelPendingFlush 取消當前重繪操作而不引發例外的方法, 呼叫 PipeWriter.CancelPendingFlush 將導致對 PipeWriter.FlushAsync 或 PipeWriter.WriteAsync 的當前或下次呼叫回傳 FlushResult,并將 IsCanceled 設定為 true, 這對于以非破壞性和非例外的方式停止暫停重繪非常有用,
PipeWriter 常見問題
- GetSpan 和 GetMemory 回傳至少具有請求記憶體量的緩沖區, 請勿假設確切的緩沖區大小 ,
- 無法保證連續的呼叫將回傳相同的緩沖區或相同大小的緩沖區,
- 在呼叫 Advance 之后,必須請求一個新的緩沖區來繼續寫入更多資料, 不能寫入先前獲得的緩沖區,
- 如果未完成對
FlushAsync的呼叫,則呼叫GetMemory或GetSpan將不安全, - 如果未重繪資料,則呼叫
Complete或CompleteAsync可能導致記憶體損壞,
IDuplexPipe
IDuplexPipe 是支持讀寫的型別的協定, 例如,網路連接將由 IDuplexPipe 表示,
與包含 PipeReader 和 PipeWriter 的 Pipe 不同,IDuplexPipe 代表全雙工連接的單側, 這意味著寫入 PipeWriter 的內容不會從 PipeReader 中讀取,
流
在讀取或寫入流資料時,通常使用反序列化程式讀取資料,并使用序列化程式寫入資料, 大多數讀取和寫入流 API 都有一個 Stream 引數, 為了更輕松地與這些現有 API 集成,PipeReader 和 PipeWriter 公開了一個 AsStream, AsStream 回傳圍繞 PipeReader 或 PipeWriter 的 Stream 實作,
System.IO.Pipelines——高性能IO(一)
System.IO.Pipelines——高性能IO(二)
System.IO.Pipelines——高性能IO(三)
轉載請標明本文來源:https://www.cnblogs.com/yswenli/p/11810317.html
更多內容歡迎Star、Fork我的的github:https://github.com/yswenli/
如果發現本文有什么問題和任何建議,也隨時歡迎交流~
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/108746.html
標籤:C#
上一篇:C# Moq
