>>回傳《C# 并發編程》
- 1. 簡介
- 2. 不可變堆疊和佇列
- 3. 不可變串列
- 4. 不可變Set集合
- 5. 不可變字典
- 6. 執行緒安全字典
- 7. 阻塞佇列
- 8. 阻塞堆疊和包
- 9. 異步佇列
- 10. 異步堆疊和包
- 11. 阻塞/異步佇列
1. 簡介
- 不可變集合
- 不可變集合之間通常共享了大部分存盤空間,因此其實浪費并不大
- 因為是無法修改的,所以是執行緒安全的
- 執行緒安全集合
- 可同時被多個執行緒修改的可變集合
- 執行緒安全集合混合使用了細粒度鎖定和無鎖技術,以確保執行緒被阻塞的時間最短
- 通常情況下是根本不阻塞
- 執行緒安全集合混合使用了細粒度鎖定和無鎖技術,以確保執行緒被阻塞的時間最短
- 對很多執行緒安全集合進行列舉操作時,內部創建了該集合的一個快照(snapshot),并對這個快照進行列舉操作,
- 執行緒安全集合的主要優點是多個執行緒可以安全地對其進行訪問,而代碼只會被阻塞很短的時間,或根本不阻塞,
- 可同時被多個執行緒修改的可變集合
下面對常用的屬于不可變集合和執行緒安全集合型別的,特定資料結構的集合進行說明,
2. 不可變堆疊和佇列
不可變集合采用的模式是回傳一個修改過的集合,原始的集合參考是不變化的,
- 這意味著,如果參考了特定的不可變集合的實體,它是不會變化的,
var stack = ImmutableStack<int>.Empty;
stack = stack.Push(13);
var biggerStack = stack.Push(7);
// 先顯示“7”,接著顯示“13”,
foreach (var item in biggerStack)
Console.WriteLine($"biggerStack {item}");
// 只顯示“13”,
foreach (var item in stack)
Console.WriteLine($"stack {item}");
輸出:
biggerStack 7
biggerStack 13
stack 13
兩個堆疊實際上在內部共享了存盤專案 13 的記憶體,
- 這種實作方式的效率很高,并且可以很方便地創建當前狀態的快照
- 每個不可變集合的實體都是絕對執行緒安全的
ImmutableQueue 使用方法類似,
- 不可變集合的一個實體是永遠不改變的,
- 因為不會改變,所以是絕對執行緒安全的,
- 對不可變集合使用修改方法時,回傳修改后的集合,
- 不可變集合非常適用于共享狀態,但不適合用來做交換資料的通道,
3. 不可變串列
不可變串列的內部是用二叉樹組織資料的,這么做是為了讓不可變串列的實體之間共享的記憶體最大化,
- 這導致
ImmutableList<T>和List<T>在常用操作上有性能上的差別(參見下表),
| 操 作 | List<T> |
ImmutableList<T> |
|---|---|---|
| Add | 平攤 O(1) | O(log N) |
| Insert | O(N) | O(log N) |
| RemoveAt | O(N) | O(log N) |
| Item[index] | O(1) | O(log N) |
不可變串列確實可以使用index獲取資料項,但需要注意性能問題,不能簡單地用它來替代 List<T>,
- 這意味著應該盡量使用
foreach而不是用for
4. 不可變Set集合
ImmutableHashSet<T>- 是一個不含重復元素的集合
ImmutableSortedSet<T>- 是一個已排序的不含重復元素的集合
- 都有相似的介面
//ImmutableHashSet
var hashSet = ImmutableHashSet<int>.Empty;
hashSet = hashSet.Add(13);
hashSet = hashSet.Add(7);
// 顯示“7”和“13”,次序不確定,
foreach (var item in hashSet)
Console.Write(item + " ");
System.Console.WriteLine();
hashSet = hashSet.Remove(7);
//ImmutableSortedSet
var sortedSet = ImmutableSortedSet<int>.Empty;
sortedSet = sortedSet.Add(13);
sortedSet = sortedSet.Add(7);
// 先顯示“7”,接著顯示“13”,
foreach (var item in sortedSet)
Console.Write(item + " ");
var smallestItem = sortedSet[0];
// smallestItem == 7
sortedSet = sortedSet.Remove(7);
輸出:
7 13
7 13
| 操 作 | ImmutableHashSet<T> |
ImmutableSortedSet<T> |
|---|---|---|
Add |
O(log N) | O(log N) |
Remove |
O(log N) | O(log N) |
Item[index] |
不可用 | O(log N) |
ImmutableSortedSet 索引操作的時間復雜度是 O(log N),而不是 O(1),這跟 上節中 ImmutableList<T> 的情況類似,
- 這意味著它們適用同樣的警告:使用
ImmutableSortedSet<T>時,應該盡量用 foreach 而不是用 for ,
可以先快速地以可變方式構建,然后轉換成不可變集合,
5. 不可變字典
ImmutableDictionary<TKey,TValue>ImmutableSortedDictionar y<TKey,TValue>
//ImmutableDictionary
var dictionary = ImmutableDictionary<int, string>.Empty;
dictionary = dictionary.Add(10, "Ten");
dictionary = dictionary.Add(21, "Twenty-One");
dictionary = dictionary.SetItem(10, "Diez");
// 顯示“10Diez”和“21Twenty-One”,次序不確定,
foreach (var item in dictionary)
Console.WriteLine(item.Key + ":" + item.Value);
var ten = dictionary[10]; // ten == "Diez"
dictionary = dictionary.Remove(21);
//ImmutableSortedDictionary
var sortedDictionary = ImmutableSortedDictionary<int, string>.Empty; sortedDictionary = sortedDictionary.Add(10, "Ten");
sortedDictionary = sortedDictionary.Add(21, "Twenty-One");
sortedDictionary = sortedDictionary.SetItem(10, "Diez");
// 先顯示“10Diez”,接著顯示“21Twenty-One”,
foreach (var item in sortedDictionary)
Console.WriteLine(item.Key + ":" + item.Value);
ten = sortedDictionary[10];
// ten == "Diez"
sortedDictionary = sortedDictionary.Remove(21);
輸出:
10:Diez
21:Twenty-One
10:Diez
21:Twenty-One
操 作 I
| 操 作 | ImmutableDictionary<TK,TV> |
ImmutableSortedDictionary<TK,TV> |
|---|---|---|
Add |
O(log N) | O(log N) |
SetItem |
O(log N) | O(log N) |
Item[key] |
O(log N) | O(log N) |
Remove |
O(log N) | O(log N) |
6. 執行緒安全字典
var dictionary = new ConcurrentDictionary<int, string>();
var newValue = https://www.cnblogs.com/BigBrotherStone/p/dictionary.AddOrUpdate(0,
key => "Zero",
(key, oldValue) => "Zero");
AddOrUpdate 方法有些復雜,這是因為這個方法必須執行多個步驟,具體步驟取決于并發字典的當前內容,
- 方法的第一個引數是鍵
- 第二個引數是一個委托,它把鍵(本例中為 0)轉換成添加到字典的值(本例中為“Zero”)
- 只有當字典中沒有這個鍵時,這個委托才會運行,
- 第三個引數也是一個委托,它把鍵(0)和原來的值轉換成字典中修改后的值
(“Zero”),- 只有當字典中已經存在這個鍵時,這個委托才會運行,
AddOrUpdatereturn 這個鍵對應的新值(與其中一個委托回傳的值相同),
AddOrUpdate 可能要多次呼叫其中一個(或兩個)委托,這種情況很少,但確實會發生,
- 因此這些委托必須簡單、快速,并且不能有副作用
- 這些委托只能創建新的值,不能修改程式中其他變數
- 這個原則適用于所有
ConcurrentDictionary<TKey,TValue>的方法所使用的委托
// 使用與前面一樣的“字典”,
string currentValue;
bool keyExists = dictionary.TryGetValue(0, out currentValue);
// 使用與前面一樣的“字典”,
string removedValue;
bool keyExisted = dictionary.TryRemove(0, out removedValue);
-
如果多個執行緒讀寫一個共享集合, 使用
ConcurrentDictrionary<TKey,TValue>是最合適的 -
如果不會頻繁修改(很少修改), 那更適合使用
ImmutableDictionary<TKey, TValue>, -
如果一些執行緒只添加元素,另一些執行緒只移除元素,那最好使用生產者/消費者集合,
7. 阻塞佇列
GetConsumingEnumerable會阻塞執行緒CommpleteAdding方法執行后所有被GetConsumingEnumerable阻塞的執行緒開始執行- 每個元素只會被消費一次
private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>();
public static async Task BlockingCollectionSP()
{
Action consumerAction = () =>
{
Console.WriteLine($"started print({Thread.CurrentThread.ManagedThreadId}).");
// 先顯示“7”,后顯示“13”,
foreach (var item in _blockingQueue.GetConsumingEnumerable())
{
Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
}
Console.WriteLine($"ended print({Thread.CurrentThread.ManagedThreadId}).");
};
Task task1 = Task.Run(consumerAction);
Task task2 = Task.Run(consumerAction);
Task task3 = Task.Run(consumerAction);
_blockingQueue.Add(7);
System.Console.WriteLine($"added 7.");
_blockingQueue.Add(13);
System.Console.WriteLine($"added 13.");
_blockingQueue.CompleteAdding();
System.Console.WriteLine("CompleteAdding.");
try
{
_blockingQueue.Add(15);
}
catch (Exception ex)
{
System.Console.WriteLine($"{ex.GetType().Name}:{ex.Message}");
}
await Task.WhenAll(task1, task2, task3);
}
輸出:
started print(4).
started print(3).
started print(6).
added 7.
added 13.
CompleteAdding.
ended print(6).
InvalidOperationException:The collection has been marked as complete with regards to additions.
print(4) 7
ended print(4).
print(3) 13
ended print(3).
8. 阻塞堆疊和包
- 在默認情況下,.NET 中的
BlockingCollection<T>用作阻塞佇列,但它也可以作為任何型別的生產者/消費者集合, BlockingCollection<T>實際上是對執行緒安全集合進行了封裝, 實作了IProducerConsumerCollection<T>介面,- 因此可以在創建
BlockingCollection<T>實體時指明規則
- 因此可以在創建
BlockingCollection<int> _blockingStack = new BlockingCollection<int>( new ConcurrentStack<int>());
BlockingCollection<int> _blockingBag = new BlockingCollection<int>( new ConcurrentBag<int>());
替換到阻塞佇列示例代碼中試試,
9. 異步佇列
public static async Task BufferBlockPS()
{
BufferBlock<int> _asyncQueue = new BufferBlock<int>();
Func<Task> concurrentConsumerAction = async () =>
{
while (true)
{
int item;
try
{
item = await _asyncQueue.ReceiveAsync();
}
catch (InvalidOperationException)
{
System.Console.WriteLine($"exit({Thread.CurrentThread.ManagedThreadId}).");
break;
}
Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
}
};
Func<Task> consumerAction = async () =>
{
try
{
// 先顯示“7”,后顯示“13”, 單執行緒可用
while (await _asyncQueue.OutputAvailableAsync())
{
Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {await _asyncQueue.ReceiveAsync()}");
}
}
catch (Exception ex)
{
System.Console.WriteLine($"{ex.GetType().Name}({Thread.CurrentThread.ManagedThreadId}):{ex.Message}");
}
};
Task t1 = consumerAction();
Task t2 = consumerAction();
// Task t1 = concurrentConsumerAction();
// Task t2 = concurrentConsumerAction();
// 生產者代碼
await _asyncQueue.SendAsync(7);
await _asyncQueue.SendAsync(13);
await _asyncQueue.SendAsync(15);
System.Console.WriteLine("Added 7 13 15.");
_asyncQueue.Complete();
await Task.WhenAll(t1, t2);
}
輸出:
Added 7 13 15.
print(4) 7
print(6) 13
print(4) 15
InvalidOperationException(3):The source completed without providing data to receive.
10. 異步堆疊和包
Nito.AsyncEx 庫
AsyncCollection<int> _asyncStack = new AsyncCollection<int>( new ConcurrentStack<int>());
AsyncCollection<int> _asyncBag = new AsyncCollection<int>( new ConcurrentBag<int>());
11. 阻塞/異步佇列
在阻塞佇列中已經介紹了BufferBlock<T>
這里介紹 ActionBlock<int>
public static async Task ActionBlockPS()
{
ActionBlock<int> queue = new ActionBlock<int>(u => Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {u}"));
// 異步的生產者代碼
await queue.SendAsync(7);
await queue.SendAsync(13);
System.Console.WriteLine("Added async.");
// 同步的生產者代碼
queue.Post(15);
queue.Post(17);
System.Console.WriteLine("Added sync.");
queue.Complete();
System.Console.WriteLine($"Completed({Thread.CurrentThread.ManagedThreadId}).");
}
輸出:
Added async.
Added sync.
Completed(1).
print(3) 7
print(3) 13
print(3) 15
print(3) 17
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/75333.html
標籤:C#
上一篇:c# 匿名方法(函式) 匿名委托 內置泛型委托 lamada
下一篇:取消任務
