>>回傳《C# 并發編程》
- 1. 用 async 代碼封裝異步方法與 Completed 事件
- 2. 用 async 代碼封裝 Begin/End 方法
- 3. 用 async 代碼封裝并行代碼
- 4. 用 async 代碼封裝 Rx Observable 物件
- 5. 用 Rx Observable 物件封裝 async 代碼
- 6. Rx Observable 物件和資料流網格
異步封裝
1. 用 async 代碼封裝異步方法與 Completed 事件
public static void MyDownloadStringTaskAsyncRun()
{
WebClient client = new WebClient();
string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result;
System.Console.WriteLine(res);
}
public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address)
{
var tcs = new TaskCompletionSource<string>();
// 這個事件處理程式會完成 Task 物件,并自行注銷,
DownloadStringCompletedEventHandler handler = null;
handler = (_, e) =>
{
client.DownloadStringCompleted -= handler;
if (e.Cancelled)
tcs.TrySetCanceled();
else if (e.Error != null)
tcs.TrySetException(e.Error);
else
tcs.TrySetResult(e.Result);
};
// 登記事件,然后開始操作,
client.DownloadStringCompleted += handler;
client.DownloadStringAsync(address);
return tcs.Task;
}
輸出:
<!DOCTYPE html><!--STATUS OK-->
<html>
... ...
</html>
2. 用 async 代碼封裝 Begin/End 方法
public static void GetResponseAsyncRun()
{
WebRequest request = WebRequest.Create("http://www.baidu.com");
var response = request.MyGetResponseAsync().Result;
System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}");
}
public static Task<WebResponse> MyGetResponseAsync(this WebRequest client)
{
return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null);
}
輸出:
WebResponse.ContentLength:14615
- 建議: 要在呼叫
FromAsync之前呼叫BeginOperation, - 呼叫
FromAsync,并讓用BeginOperation方法回傳的IAsyncOperation作為引數,這樣也是可以的,但是FromAsync會采用效率較低的實作方式,
3. 用 async 代碼封裝并行代碼
await Task.Run(() => Parallel.ForEach(...));
通過使用 Task.Run ,所有的并行處理程序都推給了執行緒池,
Task.Run 回傳一個代表并行任務的 Task 物件
- UI 執行緒可以(異步地)等待它完成(非阻塞)
4. 用 async 代碼封裝 Rx Observable 物件
事件流中幾種可能關注的情況:
- 事件流結束前的最后一個事件;
- 下一個事件;
- 所有事件,
public delegate void HelloEventHandler(object sender, HelloEventArgs e);
public class HelloEventArgs : EventArgs
{
public string Name { get; set; }
public HelloEventArgs(string name)
{
Name = name;
}
public int SayHello()
{
System.Console.WriteLine(Name + " Hello.");
return DateTime.Now.Millisecond;
}
}
public static event HelloEventHandler HelloHandlerEvent;
public static void FirstLastRun()
{
var task = Task.Run(() =>
{
Thread.Sleep(500);
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));
});
var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
.Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
.Select(s =>
{
// 復雜的計算程序,
Thread.Sleep(100);
var result = s;
Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
return result;
})
.Take(3)//這個標識3個就結束了
;
var res =
Task.Run(async () => await observable
// //4個hello,3個result,res為最后一個的結果
//.FirstAsync()//4個hello,1個result,res為第一個的結果
//.LastAsync()//4個hello,3個result,res為最后一個的結果
//.ToList()//4個hello,3個result,res為3個的結果
).Result;
System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}");
task.Wait();
}
輸出:
lilei Hello.
HanMeimei Hello.
Tom Hello.
Jerry Hello.
Now Millisecond result 534 on thread 7
Now Millisecond result 544 on thread 7
Now Millisecond result 544 on thread 7
Res:544,ResType:Int32
在 await 呼叫 Observable 物件或 LastAsync 時,代碼(異步地)等待事件流完成,然后返 回最后一個元素,
- 在內部,await 實際是在訂閱事件流,完成后退訂
IObservable<int> observable = ...; int lastElement = await observable.LastAsync(); // 或者 int lastElement = await observable;
使用 FirstAsync 可捕獲事件流中, FirstAsync 方法執行后的下一個事件,
- 本例中
await訂閱事件流,然后在第一個事件到達后立即結束(并退訂):IObservable<int> observable = ...; int nextElement = await observable.FirstAsync();
使用 ToList 可捕獲事件流中的所有事件:
IObservable<int> observable = ...;
IList<int> allElements = await observable.ToList();
5. 用 Rx Observable 物件封裝 async 代碼
任何異步操作都可看作一個滿足以下條件之一的可觀察流:
- 生成一個元素后就完成;
- 發生錯誤,不生成任何元素,
ToObservable 和 StartAsync 都會立即啟動異步操作,而不會等待訂閱
- 但之后訂閱呢,或等待執行完再訂閱呢,能得到結果嗎
- 可以,后面例子中的“輸出”中有體現
如果要讓 observable 物件在接受訂閱后才啟動操作,可使用 FromAsync
- 跟
StartAsync一樣,它也支持使用CancellationToken取消
public static void AsyncObservableRun()
{
var client = new HttpClient();
IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接執行
IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接執行
IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//訂閱后執行
var res = Task.Run(async () =>
await response1
//await response2
//await response3
).Result;
System.Console.WriteLine($"Res:{res}");
}
輸出(response1):
Run 1.
Run 2.
Res:1
輸出(response2):
Run 1.
Run 2.
Res:2
輸出(response1):
Run 1.
Run 2.
Run 3.
Res:3
ToObservable和StartAsync都回傳一個 observable 物件,表示一個已經啟動的異步操作FromAsync在每次被訂閱時都會啟動一個全新獨立的異步操作,
下面的例子使用一個已有的 URL 事件流,在每個 URL 到達時發出一個請求:
public static void SelectManyRun()
{
IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable();
IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token));
var res = Task.Run(async () => await observable.LastAsync()).Result;
System.Console.WriteLine($"Res:{res}");
}
輸出:
Run 1.
Run 2.
Run 3.
Res:3
6. Rx Observable 物件和資料流網格
同一個專案中
- 一部分使用了 Rx Observable 物件
- 一部分使用了資料流網格
現在需要它們能互相溝通,
網格轉可觀察流
public static void BlockToObservableRun()
{
var buffer = new BufferBlock<int>();
IObservable<int> integers = buffer.AsObservable();
integers.Subscribe(
data =https://www.cnblogs.com/BigBrotherStone/p/> Console.WriteLine(data),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Done"));
buffer.Post(1);
buffer.Post(2);
buffer.Complete();
buffer.Completion.Wait();
}
輸出:
1
2
AsObservable 方法會把資料流塊的完成資訊(或出錯資訊)轉化為可觀察流的完成資訊,
- 如果資料流塊出錯并拋出例外,這個例外資訊在傳遞給可觀察流時,會被封裝在
AggregateException物件中,
可觀察流轉網格
public static void ObservableToBlockRun()
{
IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Select(x => x.Timestamp)
.Take(5);
var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x));
ticks.Subscribe(display.AsObserver());
try
{
display.Completion.Wait();
Console.WriteLine("Done.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
輸出:
2020/2/1 上午1:42:24 +00:00
2020/2/1 上午1:42:25 +00:00
2020/2/1 上午1:42:26 +00:00
2020/2/1 上午1:42:27 +00:00
2020/2/1 上午1:42:28 +00:00
Done.
- 跟前面一樣,可觀察流的完成資訊會轉化為塊的完成資訊
- 可觀察流的錯誤資訊會轉化為 塊的錯誤資訊,
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/75331.html
標籤:C#
上一篇:Rx基礎
