一、原子操作
先看一段問題代碼
/// <summary> /// 獲取自增 /// </summary> public static void GetIncrement() { long result = 0; Console.WriteLine("開始計算"); //10個并發執行 Parallel.For(0, 10, (i) => { for (int j = 0; j < 10000; j++) { result++; } }); Console.WriteLine("結束計算"); Console.WriteLine($"result正確值應為:{10000 * 10}"); Console.WriteLine($"result 現值為:{result}"); Console.ReadLine(); }

這是多執行緒下,result的值不同步的原因,
1.基于Lock實作
平時大家用的最多的應該就是加鎖了,同一時間,只有一個執行緒進入代碼塊,
實作代碼:
private static Object _obj = new object();
/// <summary> /// 原子操作基于Lock實作 /// </summary> public static void AtomicityForLock() { long result = 0; Console.WriteLine("開始計算"); //10個并發執行 Parallel.For(0, 10, (i) => { //lock鎖 lock (_obj) { for (int j = 0; j < 10000; j++) { result++; } } }); Console.WriteLine("結束計算"); Console.WriteLine($"result正確值應為:{10000 * 10}"); Console.WriteLine($"result 現值為:{result}"); Console.ReadLine(); }
結果:

2.基于CAS實作
CAS是一種有名的無鎖演算法,無鎖編程,即不適用鎖的情況下實作多執行緒之間的變數同步,也就是在沒有執行緒被阻塞的情況下實作變數的同步,
CAS在.NET中的實作類是Interlocked,內部提供很多原子操作的方法,最終都是呼叫Interlocked.CompareExchange(ref out,更新值,期望值) //基于記憶體屏障的方式操作 (七個步驟)
說到執行緒安全,不要一下子就想到加鎖,尤其是可能會呼叫頻繁或者是要求高性能的場合,
- CAS(Compare And Swap)比較并替換,是執行緒并發運行時用到的一種技術
- CAS是原子操作,保證并發安全,而不能保證并發同步
- CAS是CPU的一個指令(需要JNI呼叫Native方法,才能呼叫CPU的指令)
- CAS是非阻塞的、輕量級的樂觀鎖
CAS的適用場景
讀多寫少:如果有大量的寫操作,CPU開銷可能會過大,因為沖突失敗后會不斷重試(自旋),這個程序中會消耗CPU
單個變數原子操作:CAS機制所保證的只是一個變數的原子操作,而不能保證整個代碼塊的原子性,比如需要保證三個變數共同進行原子性的更新,就不得不使用悲觀鎖了
Interlocked主要函式如下:
Interlocked.Increment 原子操作,遞增指定變數的值并存盤結果,
Interlocked.Decrement 原子操作,遞減指定變數的值并存盤結果,
Interlocked.Add 原子操作,添加兩個整數并用兩者的和替換第一個整數
Interlocked.Exchange 原子操作,賦值
Interlocked.CompareExchange(ref a, b, c); 原子操作,a引數和c引數比較, 相等b替換a,不相等不替換,方法回傳值始終是第一個引數的原值,也就是記憶體里的值
用Interlocked.Increment實作上面自增功能
代碼:
/// <summary> /// 自增CAS實作 /// </summary> public static void AtomicityForInterLock() { long result = 0; Console.WriteLine("開始計算"); Parallel.For(0, 10, (i) => { for (int j = 0; j < 10000; j++) { //自增 Interlocked.Increment(ref result); } }); Console.WriteLine($"結束計算"); Console.WriteLine($"result正確值應為:{10000 * 10}"); Console.WriteLine($"result 現值為:{result}"); Console.ReadLine(); }
結果:

Interlocked下原子操作的方法最終都是呼叫Interlocked.CompareExchange(ref a, b, c)實作的,現在我們利用CompareExchange自己實作一個原子操作功能
實作“一個變數自增到10000,然后又初始化到1開始自增的功能“
代碼:
/// <summary> /// 基于CAS原子操作自己寫 /// </summary> public static void AtomicityForMyCalc() { long result = 0; Console.WriteLine("開始計算"); Parallel.For(0, 10, (i) => { long init = 0; long incrementNum = 0; for (int j = 0; j < 10000; j++) { do { init = result; incrementNum = result + 1; incrementNum= incrementNum > 10000 ? 1 : incrementNum; //自增到10000后初始化成1 } //如果result=init,則result的值被incrementNum替換,否則result不變,回傳的是result的原始值 while (init != Interlocked.CompareExchange(ref result, incrementNum, init)); if(incrementNum==10000) { Console.WriteLine($"自增到達10000啦!值被初始化為1"); } } }); Console.WriteLine($"結束計算"); Console.WriteLine($"result正確值應為:{10000}"); Console.WriteLine($"result 現值為:{result}"); Console.ReadLine(); }
結果:

3.自旋鎖SpinLock
自旋鎖(spinlock):
是指當一個執行緒在獲取鎖的時候,如果鎖已經被其它執行緒獲取,那么該執行緒將回圈等待,然后不斷的判斷鎖是否能夠被成功獲取,直到獲取到鎖才會退出回圈,
什么情況下使用自旋鎖:
自旋鎖非常有助于避免阻塞,但是如果預期有大量阻塞,由于旋轉過多,您可能不應該使用自旋鎖,當鎖是細粒度的并且數量巨大(例如鏈接的串列中每個節點一個鎖)時以及鎖保持時間總是非常短時,旋轉可能非常有幫助,
短時間鎖定的情況下,自旋鎖(spinlock)更快,(因為自旋鎖本質上不會讓執行緒休眠,而是一直回圈嘗試對資源訪問,直到可用,所以自旋鎖執行緒被阻塞時,不進行執行緒背景關系切換,而是空轉等待,對于多核CPU而言,減少了切換執行緒背景關系的開銷,從而提高了性能,)如果機器單核或鎖定時間長的要避免使用,因為占有著邏輯核心會導致其他的執行緒也不可用,
SpinLock和Lock的區別:
SpinLock,自旋鎖,嘗試獲取該鎖的執行緒持續不斷的check是否可以獲得,此時執行緒仍然是激活狀態,只是在空轉,浪費cpu而已,但是spinlock避免了執行緒調度和背景關系切換,如果鎖的時間極短的話,使用該鎖反而效率會高,
而lock是執行緒被block了,這將引起執行緒調度和背景關系切換等行為,
示例:
//創建自旋鎖 private static SpinLock spin = new SpinLock(); public static void Spinklock() { Action action = () => { bool lockTaken = false; try { //申請獲取鎖 spin.Enter(ref lockTaken); //臨界區 for (int i = 0; i < 10; i++) { Console.WriteLine($"當前執行緒{Thread.CurrentThread.ManagedThreadId.ToString()},輸出:1"); } } finally { //作業完畢,或者產生例外時,檢測一下當前執行緒是否占有鎖,如果有了鎖釋放它 //避免出行死鎖 if(lockTaken) { spin.Exit(); } } };
Action action2 = () => { bool lockTaken = false; try { //申請獲取鎖 spin.Enter(ref lockTaken); //臨界區 for (int i = 0; i < 10; i++) { Console.WriteLine($"當前執行緒{Thread.CurrentThread.ManagedThreadId.ToString()},輸出:2"); } } finally { //作業完畢,或者產生例外時,檢測一下當前執行緒是否占有鎖,如果有了鎖釋放它 //避免出行死鎖 if (lockTaken) { spin.Exit(); } } }; //并行執行2個action Parallel.Invoke(action, action2); }
結果:

申請鎖下面的臨界區保證是順序執行的,不會因為多執行緒穿插輸出,
4.讀寫鎖ReaderWriterLockSlim
- 讀寫鎖是一個具有特殊用途的執行緒鎖,適用于頻繁讀取且讀取需要一定時間的場景,共享資源的讀取操作通常是可以同時執行的,
- 普通的互斥鎖不管是獲取還是修改操作無法同時執行,如果多個執行緒為了讀取操作而獲取互斥鎖,那么同一時間只有一個執行緒可以執行讀取操作,
- 頻繁讀取的場景下會對吞吐量造成影響
- 讀寫鎖把鎖分為讀取鎖和寫入鎖,執行緒可以根據對共享資源的操作型別獲取讀取鎖還是寫入鎖,讀取鎖可以被多個執行緒同時獲取,寫入鎖不可以被多個執行緒同時獲取,且讀取鎖和寫入鎖不可以被不同的線同時獲取,
| 操作 | 讀取鎖狀態 | 寫入鎖狀態 | 獲取鎖是否需要等待 |
|---|---|---|---|
| 獲取讀取鎖 | 未被獲取 | 未被獲取 | 無需等待 |
| 獲取讀取鎖 | 已被其他執行緒獲取 | 未獲取 | 無需等待 |
| 獲取讀取鎖 | 未被獲取 | 已被其他執行緒獲取 | 需要等待其他執行緒釋放 |
| 獲取寫入鎖 | 未被獲取 | 未被獲取 | 無需等待 |
| 獲取寫入鎖 | 已被其他執行緒獲取 | 未被獲取 | 需要等待其他執行緒釋放 |
| 獲取寫入鎖 | 未被獲取 | 已被其他執行緒獲取 | 需要等待其他執行緒釋放 |
代碼示例:
//讀寫鎖, //策略支持遞回 private static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); private static int index = 0; static void read() { try { //進入讀鎖 rwl.EnterReadLock(); for (int i = 0; i < 5; i++) { Console.WriteLine($"執行緒id:{Thread.CurrentThread.ManagedThreadId},讀資料,讀到index:{index}"); } } finally { //退出讀鎖 rwl.ExitReadLock(); } } static void write() { try { //嘗試獲寫鎖 while (!rwl.TryEnterWriteLock(50)) { Console.WriteLine($"執行緒id:{Thread.CurrentThread.ManagedThreadId},等待寫鎖"); } Console.WriteLine($"執行緒id:{Thread.CurrentThread.ManagedThreadId},獲取到寫鎖"); for (int i = 0; i < 5; i++) { index++; Thread.Sleep(50); } Console.WriteLine($"執行緒id:{Thread.CurrentThread.ManagedThreadId},寫操作完成"); } finally { //退出寫鎖 rwl.ExitWriteLock(); } } /// <summary> /// 執行多執行緒讀寫 /// </summary> public static void test() { var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); Task[] task = new Task[6]; task[1] = taskFactory.StartNew(write); //寫 task[0] = taskFactory.StartNew(read); //讀 task[2] = taskFactory.StartNew(read); //讀 task[3] = taskFactory.StartNew(write); //寫 task[4] = taskFactory.StartNew(read); //讀 task[5] = taskFactory.StartNew(read); //讀 for (var i=0; i<6; i++) { task[i].Wait(); } }

可以看到,在執行緒4寫入期間,執行緒7是等待執行緒4寫萬再寫,執行緒7寫期間其它執行緒也沒有操場,等寫完后,讀的操作是多個執行緒交叉的,
適合的場景舉例:
- 多執行緒寫檔案,多執行緒并發寫檔案時,會報資源被占用錯誤,用這里的寫鎖就可以獨占資源寫完再到下一個執行緒寫,
- 本地快取的讀寫操作,幾個快取值寫完才能讀出來,防止讀到不完整資料,
二、執行緒安全
1.執行緒安全集合
BlockingCollection:一個支持界限和阻塞的容器(執行緒安全集合),與佇列結構相似,常用函式如下
Add :向容器中插入元素
TryTake:從容器中取出元素并洗掉
TryPeek:從容器中取出元素,但不洗掉,
CompleteAdding:告訴容器,添加元素完成,此時如果還想繼續添加會發生例外,
IsCompleted:告訴消費執行緒,生產者執行緒還在繼續運行中,任務還未完成,
普通用法示例:
/// <summary> /// 執行緒安全集合用法 /// </summary> public static void BC() { //執行緒安全集合 using (BlockingCollection<int> blocking = new BlockingCollection<int>()) { int NUMITEMS = 10000; for (int i = 1; i < NUMITEMS; i++) { blocking.Add(i); } //完成添加 blocking.CompleteAdding(); int outerSum = 0; // 定義一個委托方法取出集合元素 Action action = () => { int localItem; int localSum = 0; //取出并洗掉元素,先進先出 while (blocking.TryTake(out localItem)) { localSum += localItem; } //兩數相加替換第一個值 Interlocked.Add(ref outerSum, localSum); }; //并行3個執行緒執行,多個執行緒同時取集合的資料 Parallel.Invoke(action, action, action); Console.WriteLine($"0+...{NUMITEMS-1} = {((NUMITEMS * (NUMITEMS - 1)) / 2)},輸出結果:{outerSum}"); //此集合是否已標記為已完成添加且為空 Console.WriteLine($"執行緒安全集合.IsCompleted={blocking.IsCompleted}"); } }
結果:

限制集合長度示例:
/// <summary> /// 限制集合長度 /// </summary> public static void BCLimtLength() { //限制集合長度為5個,后面進的會阻塞等集合少于5個再進來 BlockingCollection<int> blocking = new BlockingCollection<int>(5); var task1= Task.Run(() => { for (int i = 0; i < 20; i++) { blocking.Add(i); Console.WriteLine($"集合添加:({i})"); } blocking.CompleteAdding(); Console.WriteLine("完成添加"); }); // 延遲500ms執行等待先生產資料 var task2 = Task.Delay(500).ContinueWith((t) => { while (!blocking.IsCompleted) { var n = 0; if (blocking.TryTake(out n)) { Console.WriteLine($"取出:({n})"); } } Console.WriteLine("IsCompleted = true"); }); Task.WaitAll(task1, task2); }
結果:

在BlockingCollection中使用Stack(堆疊,先進后出)示例:
/// <summary> /// 執行緒安全集合,先進后出 /// </summary> public static void BCStack() { //執行緒安全集合,引數表明堆疊標識,佇列長度為5 BlockingCollection<int> blocking = new BlockingCollection<int>(new ConcurrentStack<int>(), 5); var task1 = Task.Run(() => { for (int i = 0; i < 20; i++) { blocking.Add(i); Console.WriteLine($"集合添加:({i})"); } blocking.CompleteAdding(); Console.WriteLine("完成添加"); }); // 等待先生產資料 var task2 = Task.Delay(500).ContinueWith((t) => { while (!blocking.IsCompleted) { var n = 0; if (blocking.TryTake(out n)) { Console.WriteLine($"取出:({n})"); } } Console.WriteLine("IsCompleted = true"); }); Task.WaitAll(task1, task2); }

一開始入了0-4,從最后的4開始取,
2.執行緒安全字典
ConcurrentDictionary :這個比較好理解,普通字典多執行緒并發時添加時會報錯,而這個則是執行緒安全的,不會報錯,
普通字典示例:
//普通字典 private static IDictionary<string, string> Dictionaries { get; set; } = new Dictionary<string, string>(); /// <summary> /// 字典增加值 /// </summary> public static void AddDictionaries() { Stopwatch sw = new Stopwatch(); sw.Start(); //并發1000個執行緒寫 Parallel.For(0, 1000, (i) => { var key = $"key-{i}"; var value = https://www.cnblogs.com/wei325/p/$"value-{i}"; // 不加鎖會報錯 // lock (Dictionaries) // { Dictionaries.Add(key, value); // } }); sw.Stop(); Console.WriteLine("Dictionaries 當前資料量為: {0}", Dictionaries.Count); Console.WriteLine("Dictionaries 執行時間為: {0} ms", sw.ElapsedMilliseconds); }
不加鎖時結果:

加鎖后:

執行緒安全字典示例:
//執行緒安全字典 private static IDictionary<string, string> ConcurrentDictionaries { get; set; } = new ConcurrentDictionary<string, string>(); /// <summary> /// 執行緒安全字典添加值 /// </summary> public static void AddConcurrentDictionaries() { Stopwatch sw = new Stopwatch(); sw.Start(); //并發1000個執行緒寫 Parallel.For(0, 1000, (i) => { var key = $"key-{i}"; var value = https://www.cnblogs.com/wei325/p/$"value-{i}"; // 無須加鎖 ConcurrentDictionaries.Add(key, value); }); sw.Stop(); Console.WriteLine("ConcurrentDictionaries 當前資料量為: {0}", ConcurrentDictionaries.Count); Console.WriteLine("ConcurrentDictionaries 執行時間為: {0} ms", sw.ElapsedMilliseconds); }

可以看到,執行緒安全字典比普通字典性能略好,且執行緒安全字典無需加鎖,
三、執行緒池
1.通過QueueUserWorkItem啟動作業者執行緒
ThreadPool執行緒池中有兩個多載的靜態方法可以直接啟動作業者執行緒:
- ThreadPool.QueueUserWorkItem(waitCallback);
- ThreadPool.QueueUserWorkItem(waitCallback,Object);
先把WaitCallback委托指向一個帶有Object引數的無回傳值方法,再使用ThreadPool.QueueUserWorkItem(WaitCallback)就可以一步啟動此方法,此時異步方法的引數被視為null,
示例1:
public class ThreadLoopDemo { /// <summary> /// 回呼方法 /// </summary> /// <param name="obj"></param> static void CallMethod(object state) { Console.WriteLine("RunWorkerThread開始作業"); Console.WriteLine("作業者執行緒啟動成功!"); } public static void Test() { //作業者執行緒最大數目,I/O執行緒的最大數目 ThreadPool.SetMaxThreads(1000, 1000);
//啟動作業者執行緒 ThreadPool.QueueUserWorkItem(new WaitCallback(CallMethod!)); Console.ReadKey(); } }
執行Test方法,結果:

示例2:
使用第二個多載方法ThreadPool.QueueUserWorkItem(WaitCallback,object)方法可以把object物件作為引數傳送到回呼函式中,
public class ThreadLoopDemo { /// <summary> /// 回呼方法 /// </summary> /// <param name="obj"></param> static void CallMethod(object state) { Console.WriteLine("RunWorkerThread開始作業"); Order order=state as Order; Console.WriteLine($"orderName:{order.orderName},price:{order.price}"); Console.WriteLine("作業者執行緒啟動成功!"); } public static void Test() { //作業者執行緒最大數目,I/O執行緒的最大數目 ThreadPool.SetMaxThreads(1000, 1000); Order order = new Order() { orderName = "冰箱", price = 1888 }; //啟動作業者執行緒 ThreadPool.QueueUserWorkItem(new WaitCallback(CallMethod!),order); Console.ReadKey(); } } public class Order { public string orderName { get; set; } public decimal price { get; set; } }
執行Test方法,結果:

通過ThreadPool.QueueUserWork啟動作業者執行緒非常方便,但是WaitCallback委托指向的必須是一個帶有object引數的無回傳值方法,
執行緒池還可以重用執行緒,比喻可以吧執行緒池大小設為5個,去執行一批任務,防止大量創建新執行緒損耗大量cpu,
所以這個方法啟動的作業者執行緒僅僅適合于帶單個引數和無回傳值的情況,
2.執行緒池等待(信號量)
ThreadPool并沒有Thread的Join等待介面,那么想讓ThreadPool等待要這么做呢?
ManualResetEvent:通知一個或多個正在等待的執行緒已發生的事件,相當于發送了一個信號
mre.WaitOne:卡住當前主執行緒,一直等到信號修改為true的時候,才會接著往下跑
public class ThreadLoopDemo { /// <summary> /// 執行 /// </summary> public static void Test() { //用來控制執行緒等待,false默認為關閉狀態 ManualResetEvent mre = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(p => { Console.WriteLine("執行緒1開始"); Thread.Sleep(1000); Console.WriteLine($"執行緒1結束,帶引數,{Thread.CurrentThread.ManagedThreadId}"); //通知執行緒,修改信號為true mre.Set(); }); //阻止當前執行緒,直到等到信號為true在繼續執行 mre.WaitOne(); //關閉執行緒,相當于設定成false mre.Reset(); Console.WriteLine("信號被關閉了"); ThreadPool.QueueUserWorkItem(p => { Console.WriteLine("執行緒2開始"); Thread.Sleep(2000); Console.WriteLine("執行緒2結束"); mre.Set(); }); mre.WaitOne(); Console.WriteLine("主執行緒結束"); } }
執行Test,結果:

3.Task
Task.Factory.StartNew:創建一個新的執行緒,Task的執行緒也是從執行緒池中拿的(ThreadPool),官方建議用Task而不是直接使用ThreadPool,因為Task是對ThreadPool的封裝和優化,
Task.WaitAny:等待一群執行緒中的其中一個完成,這里是卡主執行緒,一直等到一群執行緒中的最快的一個完成,才能繼續往下執行,打個簡單的比方:從三個地方獲取配置資訊(資料庫,config,IO),同時開啟三個執行緒來訪問,誰快誰來執行,
Task.WaitAll:等待所有執行緒完成,這里也是卡主執行緒,一直等待所有子執行緒完成任務,才能繼續往下執行,
Task.WhenAll:等待所有執行緒完成,這里不卡主執行緒,一直等待所有子執行緒完成任務,才能繼續往下執行,
Task.ContinueWhenAny:回呼形式的,任意一個執行緒完成后執行的后續動作,這個就跟WaitAny差不多,只不是是回呼形式的,
Task.ContinueWhenAll:回呼形式的,所有執行緒完成后執行的后續動作,理解同上
代碼示例:
public class TaskDemo { /// <summary> /// 一個比較耗時的方法,回圈1000W次 /// </summary> /// <param name="name"></param> public static void DoSomething(string name) { int iResult = 0; for (int i = 0; i < 1000000000; i++) { iResult += i; } Console.WriteLine($"{name},執行緒Id:{Thread.CurrentThread.ManagedThreadId},{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}"+Environment.NewLine); } public static void Test() { //執行緒容器 List<Task> taskList = new List<Task>(); Stopwatch watch = new Stopwatch(); watch.Start(); Console.WriteLine("************任務開始**************"); //啟動5個執行緒 for (int i = 0; i < 5; i++) { string name = $"Task:{i}"; Task task = Task.Factory.StartNew(() => { DoSomething(name); }); taskList.Add(task); } //回呼形式的,任意一個完成后執行的后續動作 Task any = Task.Factory.ContinueWhenAny(taskList.ToArray(), task => { Console.WriteLine($"ContinueWhenAny,當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},一個執行緒執行完的回呼,繼續執行后面動作,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}" + Environment.NewLine); }); //回呼形式的,全部任務完成后執行的后續動作 Task all = Task.Factory.ContinueWhenAll(taskList.ToArray(), tasks => { Console.WriteLine($"ContinueWhenAll,當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},全部執行緒執行完的回呼,執行緒數:{tasks.Length},{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}" + Environment.NewLine); }); //把兩個回呼也放到容器里面,包含回呼一起等待 taskList.Add(any); taskList.Add(all); //WaitAny:執行緒等待,當前執行緒等待其中一個執行緒完成 Task.WaitAny(taskList.ToArray()); Console.WriteLine($"WaitAny,當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},其中一個完成已完成,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}" + Environment.NewLine); //WaitAll:執行緒等待,當前執行緒等待所有執行緒的完成 Task.WaitAll(taskList.ToArray()); Console.WriteLine($"WaitAll,當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},全部執行緒完成,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}" + Environment.NewLine); Console.WriteLine($"************任務結束**************耗時:{watch.ElapsedMilliseconds}ms,{Thread.CurrentThread.ManagedThreadId},{DateTime.Now}"); } }
結果:

4.執行緒池調度原理
根據 TaskCreationOptions 的不同,出現了三個分支
- LongRunning:獨立執行緒,和執行緒池無關
- 包含 PreferFairness時:preferLocal=false,進入全域佇列
- 不包含 PreferFairness時:preferLocal=ture,進入本地佇列
進入全域佇列的任務能夠公平地被各個執行緒池中的執行緒領取執行,
下圖中 Task1先進入全域佇列,隨后被 Thread1 領走,Thread3 通過 WorkStealing 機制竊取了 Thread2 中的 Task3,

- 1、執行緒任務1先去到執行緒池調度佇列,
- 2、執行緒池佇列根據引數去到公共佇列或執行緒的本地佇列,
- 3、執行緒池執行緒1從公共執行緒池中取Task1到本地佇列執行
- 4、Thead3發現自己的佇列為空,公共佇列也為空,會從其它的執行緒中竊取任務執行,
四、并行
Parallel:是并行編程,在Task的基礎上做了封裝,.NET FrameWork 4.5之后的版本可用,呼叫Parallel執行緒參與執行任務,
與Task區別: 使用Task開啟子執行緒的時候,主執行緒是屬于空閑狀態,并不參與執行;Parallel開啟子執行緒的時候,主執行緒也會參與計算
示例1:
/// <summary> /// 一個比較耗時的方法,回圈1000W次 /// </summary> /// <param name="name"></param> public static void DoSomething(string name) { int iResult = 0; for (int i = 0; i < 1000000000; i++) { iResult += i; } Console.WriteLine($"{name},執行緒Id:{Thread.CurrentThread.ManagedThreadId},{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}" + Environment.NewLine); } public static void Test1() { //并行編程 Console.WriteLine($"并行編程開始,主執行緒Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("【示例1】"); //示例1: //一次性執行1個或多個執行緒,效果類似:Task WaitAll,只不過Parallel的主執行緒也參與了計算 Parallel.Invoke( () => { DoSomething("并行1-1"); }, () => { DoSomething("并行1-2"); }, () => { DoSomething("并行1-3"); }, () => { DoSomething("并行1-4"); }, () => { DoSomething("并行1-5"); }); Console.WriteLine("*************并行結束************"); Console.ReadLine(); }
執行Test1結果:

示例2:
public static void Test2() { //并行編程 Console.WriteLine($"并行編程開始,主執行緒Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("【示例2】"); //示例2: //定義要執行的執行緒數量 Parallel.For(0, 5, t => { int a = t; DoSomething($"并行2-{a}"); }); Console.WriteLine("*************并行結束************"); Console.ReadLine(); }
結果:

示例3:
public static void Test3() { //并行編程 Console.WriteLine($"并行編程開始,主執行緒Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("【示例3】"); ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = 3//執行執行緒的最大并發數量,執行完成一個,就接著開啟一個 }; //遍歷集合,根據集合數量執行執行緒數量 Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, (t, status) => { //status.Break();//這一次結束, //status.Stop();//整個Parallel結束掉,Break和Stop不可以共存 DoSomething($"并行4-{t}"); }); }
結果:分成3次并行

五、異步IO
1.異步IO于同步IO比較


異步IO在資料準備階段不會阻塞主執行緒,而同步IO則會阻塞主執行緒,
2.異步讀寫檔案
這里使用 FileStream 類,它帶有一個引數 useAsync,可以避免在許多情況下阻塞執行緒池的執行緒,可以通過 useAsync = true 來進行啟用或在建構式中進行引數呼叫,
但是我們不能對 StreamReader 和 StreamWriter 中的引數進行設定,但是,如果你想使用該引數 useAsync,則需要自己新建一個 FileStream 物件,
請注意,異步呼叫是在 UI 中的,即使執行緒池執行緒阻塞,在 await 期間,用戶界面執行緒也不會被阻塞,
異步寫入文本
/// <summary> /// 異步寫入檔案 /// </summary> /// <returns></returns> public async Task WriteTextAsync() { var path = "temp.txt"; //檔案名 var content = Guid.NewGuid().ToString(); //寫入內容 using (var fs = new FileStream(path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, bufferSize: 4096, useAsync: true)) { var buffer = Encoding.UTF8.GetBytes(content); await fs.WriteAsync(buffer, 0, buffer.Length); } }
執行完查看根目錄檔案結果:

異步讀取檔案
/// <summary> /// 異步讀取文本 /// </summary> /// <returns></returns> public static async Task ReadTextAsync() { var fileName = "temp.txt"; //檔案名 using (var fs = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.Read, FileShare.None, bufferSize: 4096, useAsync: true)) { var sb = new StringBuilder(); var buffer = new byte[4096]; var readLength = 0; while ((readLength = await fs.ReadAsync(buffer, 0, buffer.Length)) != 0) { var text = Encoding.UTF8.GetString(buffer, 0, readLength); sb.Append(text); } Console.WriteLine("讀取檔案內容:"+sb.ToString()); } }
執行結果:

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/456940.html
標籤:C#
下一篇:C# 將PDF轉為Excel
