一天沒有出過家門,實屬無聊,沒事瞎寫寫
1. 基本概念
1.1 多行程和多執行緒的概念
程式是由指令和資料組成,指令要運行,資料要加載,指令被 CPU 加載運行,資料被加載到記憶體,指令運行時可由 CPU 調度硬碟、網路等設備,一個執行緒就是一個指令,CPU 調度的最小單位,一個行程就是一系列的指令流,由 CPU 一條一條執行
-
行程是程式在計算機上的一次執行活動,當你運行一個程式,你就啟動了一個行程,一系列指令
-
執行緒是行程中的實際運行單位,是獨立運行與行程之中的子任務,一條單獨的指令
1.2 并行與并發
并發和并行都是同時處理多路請求,目的是最大化 CPU 的利用率,并行是指兩個或者多個事件在同一時刻發生,并發是指多個事件在同一事件間隔內發生
-
并發是指單核 CPU 運行多執行緒時,時間片進行很快的切換,執行緒輪流執行 CPU
-
并行是指多核 CPU 運行多執行緒時,真正的在同一時刻運行
1.3 計算機存盤體系
在很早之前,CPU 的頻率與記憶體的頻率在一個層面上,上世紀 90 年代,CPU 的頻率大大提升,但記憶體的頻率沒有得到提升,導致 CPU 的運行速度比記憶體讀寫速度快很多,使 CPU 花費很長的時間等待資料的到來或把資料寫入到記憶體中,為了解決 CPU 運算速度與記憶體讀寫速度不匹配的矛盾,就出現了 CPU 快取,CPU 快取分為三個級別,分別是 L1、L2、L3,級別越小越接近 CPU,速度也越來越快,容量也越來越小

多核 CPU 的情況下有多個一級快取,如何保證快取內部資料一致性,不讓系統資料混亂,解決方案就是快取一致性協議(Modified Exclusive Shared Or Invalid,MESI)或者鎖住總線,其中鎖住總線,效率非常低下CPU 串行,所以實際使用 MESI,MESI 通過四種狀態來進行標記
| 狀態 | 描述 | 監聽任務 | 狀態轉換 |
|---|---|---|---|
| M 修改(Modified) | 該Cache line有效,資料被修改了,和記憶體中的資料不一致,資料只存在于本Cache中, | 快取行必須時刻監聽所有試圖讀該快取行相對就主存的操作,這種操作必須在快取將該快取行寫回主存并將狀態變成S(共享)狀態之前被延遲執行, | 當被寫回主存之后,該快取行的狀態會變成獨享(exclusive)狀態, |
| E 獨享、互斥(Exclusive) | 該Cache line有效,資料和記憶體中的資料一致,資料只存在于本Cache中, | 快取行也必須監聽其它快取讀主存中該快取行的操作,一旦有這種操作,該快取行需要變成S(共享)狀態, | 當CPU修改該快取行中內容時,該狀態可以變成Modified狀態 |
| S 共享(Shared) | 該Cache line有效,資料和記憶體中的資料一致,資料存在于很多Cache中, | 快取行也必須監聽其它快取使該快取行無效或者獨享該快取行的請求,并將該快取行變成無效(Invalid), | 當有一個CPU修改該快取行時,其它CPU中該快取行可以被作廢(變成無效狀態 Invalid), |
| I 無效(Invalid) | 該Cache line無效, | 無 | 無 |
對于 M 和 E 狀態而言總是精確的,他們在和該快取行的真正狀態是一致的,而 S 狀態可能是非一致的
1.4 執行緒的狀態
sleep、yield 和 join 區別:
-
sleep 執行后執行緒進入阻塞狀態,當前執行緒休眠一段時間
-
yield 執行后執行緒進入就緒狀態,使當前執行緒和所有等待的執行緒一起進行競爭 CPU 資源
-
join執行執行緒進入阻塞狀態,t.join 表示阻塞呼叫此方法的執行緒,直到執行緒 t 完成,方可繼續執行,底層實際呼叫 wait 方法

-
新建狀態(New):執行緒物件被創建后,就進入了新建狀態,例如:Thread thread = new Thread()
-
就緒狀態(Runnable):也被稱為"可執行狀態",執行緒物件唄創建后,其它執行緒呼叫了該物件的 start() 方法,從而就啟動該執行緒,例如T.stat(),處于就緒狀態的執行緒,隨時可能被CPU調度執行
-
運行狀態(Running):執行緒獲取 CPU 權限進行執行,需要注意的是,執行緒只能從就緒狀態進入到運行狀態
-
阻塞狀態(Blocked):阻塞狀態是執行緒放棄CPU使用權,暫時停止運行,直到執行緒進入就緒狀態,才有機會轉到運行狀態,阻塞的情況分三種:
-
等待阻塞:通過呼叫執行緒的wait() 方法,讓執行緒等待某作業的完成
-
同步阻塞:執行緒在獲取 synchronized 同步鎖失敗,它會進入同步阻塞狀態
-
其它阻塞:通過呼叫執行緒的 sleep() 或 join() 或發出 I/O 請求時,執行緒會進入到阻塞狀態,當 sleep() 狀態超時,join() 等待執行緒終止或者超時、或者 I/O 處理完畢時,執行緒重新轉入到就緒狀態
-
-
死亡狀態(Dead):執行緒執行完了或者因例外退出了 run() 方法,該執行緒結束生命周期
2.多執行緒的實作方式
2.1繼承 Thread 類創建執行緒
Thead 類本質上是實作了 Runnable 介面的一個實體,代表一個執行緒的實體
1 /** 2 * @description: 多執行緒實作方法1:集成Thread類 3 * @author: DZ 4 **/ 5 @Slf4j 6 public class MyThread1 extends Thread { 7 @Override 8 public void run() { 9 log.info("MyThread1"); 10 log.info("MyThread2"); 11 } 12 ? 13 public static void main(String[] args) { 14 MyThread1 t1 = new MyThread1(); 15 MyThread1 t2 = new MyThread1(); 16 t1.start(); 17 t2.start(); 18 } 19 }
2.2實作 Runnable 介面創建執行緒
如果自己的類已經 extends 另一個類,就無法直接 extends Thread,此時可以通過實作 Runnable 介面
避免單繼承的局限性、適合多個相同的執行緒去處理同一個資源
1 /** 2 * @description: 多執行緒實作方法2:實作Runnable介面 3 **/ 4 @Slf4j 5 public class MyThread2 implements Runnable { 6 ? 7 @Override 8 public void run() { 9 log.info("MyThread1"); 10 log.info("MyThread2"); 11 } 12 ? 13 public static void main(String[] args) { 14 MyThread2 m = new MyThread2(); 15 //1.呼叫run方法 16 Thread t1 = new Thread(m); 17 Thread t2 = new Thread(m); 18 t1.start(); 19 t2.start(); 20 } 21 }
2.3實作 Callable 介面,通過 Future Task 包裝器來創建 Thread 執行緒
可以獲取執行緒的回傳值
1 /** 2 * @description: 多執行緒實作方法2:實作Callable介面 3 * @author: DZ 4 **/ 5 @Slf4j 6 public class MyThread3 implements Callable { 7 @Override 8 public String call() throws Exception { 9 log.info("MyThread1"); 10 log.info("MyThread2"); 11 return "MyThread3"; 12 } 13 ? 14 public static void main(String[] args) throws ExecutionException, InterruptedException { 15 MyThread3 m = new MyThread3(); 16 //存盤回傳值,其中泛型為回傳值的型別 17 FutureTask<String> futureTask = new FutureTask<>(m); 18 new Thread(futureTask).start(); 19 System.out.println(futureTask.get()); 20 } 21 ? 22 }
2.4通過執行緒池
2.4.1 執行緒池的主要引數
1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 2 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 3 Executors.defaultThreadFactory(), defaultHandler); 4 }
-
corePoolSize
當向執行緒池提交一個任務時,若執行緒池已創建的執行緒數小于corePoolSize,即便此時存在空閑執行緒,也會通過創建一個新執行緒來執行該任務,直到已創建的執行緒數大于或等于corePoolSize時,(除了利用提交新任務來創建和啟動執行緒(按需構造),也可以通過 prestartCoreThread() 或 prestartAllCoreThreads() 方法來提前啟動執行緒池中的基本執行緒,)
-
maximumPoolSize
執行緒池所允許的最大執行緒個數,當佇列滿了,且已創建的執行緒數小于maximumPoolSize,則執行緒池會創建新的執行緒來執行任務,另外,對于無界佇列,可忽略該引數
-
keepAliveTime
當執行緒池中執行緒數大于核心執行緒數時,執行緒的空閑時間如果超過執行緒存活時間,那么這個執行緒就會被銷毀,直到執行緒池中的執行緒數小于等于核心執行緒數
-
workQueue
用于傳輸和保存等待執行任務的阻塞佇列
-
ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列
-
LinkedBlockingQueue 一個由鏈表結構組成的有界阻塞佇列
-
PriorityBlockingQueue 一個支持優先級排序的無界阻塞佇列
-
DelayQueue 一個使用優先級佇列實作的無界阻塞佇列
-
SynchronousQueue 一個不存盤元素的阻塞佇列
-
LinkedTransferQueue 一個由鏈表結構組成的無界阻塞佇列
-
LinkedBlockingDeque 一個由鏈表結構組成的雙向阻塞佇列
作用:阻塞佇列可以保證任務佇列中沒有任務時阻塞獲取任務的執行緒,使得執行緒進入wait狀態,釋放cpu資源,當佇列中有任務時才喚醒對應執行緒從佇列中取出訊息進行執行,使得在執行緒不至于一直占用cpu資源,
-
threadFactory
用于創建新執行緒,threadFactory創建的執行緒也是采用new Thread()方式,threadFactory創建的執行緒名都具有統一的風格:pool-m-thread-n(m為執行緒池的編號,n為執行緒池內的執行緒編號)
-
handler
當執行緒池和佇列都滿了,再加入執行緒會執行此策略
AbortPolicy: 直接拋出例外,阻止執行緒正常運行
1 public static class AbortPolicy implements RejectedExecutionHandler { 2 public AbortPolicy() {} 3 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 4 throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString()); 5 } 6 }
CallerRunsPolicy: 直接在方法的呼叫執行緒中執行,除非執行緒池已關閉
1 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2 public CallerRunsPolicy() {} 3 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 4 if (!e.isShutdown()) { 5 r.run(); 6 } 7 } 8 }
DiscardPolica: 丟棄當前的執行緒任務而不做任何處理,如果系統允許在資源不足的情況下棄部分任務,則這將是保障系統安全、穩定的一種很好的方案
1 public static class DiscardPolicy implements RejectedExecutionHandler { 2 public DiscardPolicy() {} 3 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 4 } 5 }
DiscardOlderPolicy: 移除執行緒佇列中最早(老)的一個執行緒任務,并嘗試提交當前任務
1 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2 public DiscardOldestPolicy() { } 3 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 4 if (!e.isShutdown()) { 5 e.getQueue().poll();// 最早(老)的任務出佇列 6 e.execute(r); 7 } 8 } 9 }
2.4.2如何設定執行緒池
-
CPU密集型
盡量使用較小的執行緒池,一般為CPU核心數+1, 因為CPU密集型任務使得CPU使用率很高,若開過多的執行緒數,會造成CPU過度切換,
-
IO密集型任務
可以使用稍大的執行緒池,一般為2*CPU核心數, IO密集型任務CPU使用率并不高,因此可以讓CPU在等待IO的時候有其他執行緒去處理別的任務,充分利用CPU時間
-
混合型任務
執行緒數 = CPU核心數 * (1+平均等待時間 / 平均作業時間)
2.4.3 代碼示例
1 import lombok.extern.slf4j.Slf4j; 2 import org.junit.Test; 3 ? 4 import java.util.concurrent.*; 5 ? 6 /** 7 * @description: 通過執行緒池實作多執行緒 8 * @author: DZ 9 **/ 10 @Slf4j 11 public class MyThread4 { 12 //通常使用方式,定義前5個引數即可,其余默認 13 private ThreadPoolExecutor threadPoolExecutor0 = new ThreadPoolExecutor(5, 10, 60, 14 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); 15 ? 16 //所有引數均自定義(增加工廠ThreadFactory和拒絕方式Handle) 17 private ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(5, 10, 60, 18 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() { 19 @Override 20 public Thread newThread(Runnable r) { 21 Thread thread = new Thread(r); 22 log.info("我是執行緒{}", thread.getName()); 23 return thread; 24 } 25 }, new RejectedExecutionHandler() { 26 @Override 27 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 28 log.info("執行緒被去掉{}", new Thread(r).getName()); 29 } 30 }); 31 ? 32 //所有引數均自定義,拒絕方式使用默認new ThreadPoolExecutor.AbortPolicy(),new ThreadPoolExecutor.DiscardOldestPolicy(),new ThreadPoolExecutor.CallerRunsPolicy(),new ThreadPoolExecutor.DiscardPolicy() 33 private ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(5, 10, 60, 34 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() { 35 @Override 36 public Thread newThread(Runnable r) { 37 Thread thread = new Thread(r); 38 log.info("我是執行緒{}", thread.getName()); 39 return thread; 40 } 41 }, new ThreadPoolExecutor.DiscardPolicy()); 42 ? 43 @Test 44 public void testRunnable() { 45 threadPoolExecutor0.execute(new Runnable() { 46 @Override 47 public void run() { 48 log.info("MyThread1"); 49 log.info("MyThread2"); 50 } 51 }); 52 } 53 ? 54 @Test 55 public void testCallable() throws ExecutionException, InterruptedException { 56 Future<String> submit = threadPoolExecutor1.submit(new Callable<String>() { 57 @Override 58 public String call() throws Exception { 59 log.info("MyThread1"); 60 log.info("MyThread2"); 61 return "MyThread4"; 62 } 63 }); 64 System.out.println(submit.get()); 65 } 66 }
3 常見的執行緒池
3.1 FixedThreadPool
適用于任務數量已知,且相對耗時的任務
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 3 }
3.2 SingleThreadExecutor
這種執行緒池非常適合所有任務都需要按被提交的順序來執行的場景,是個單執行緒的串行,
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); 4 } 5 ? 6 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { 7 return new FinalizableDelegatedExecutorService 8 (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory)); 9 }
3.3 CachedThreadPool
核心執行緒池為0,存活時間為60s,適合小而快的任務
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 3 } 4 ? 5 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 6 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory); 7 }
3.4 ScheduledThreadPool
支持定時或者周期執行的任務
1 public ScheduledThreadPoolExecutor(int corePoolSize) { 2 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); 3 } 4 public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { 5 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory); 6 } 7 public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) { 8 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler); 9 } 10 public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) { 11 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler); 12 }
eg
1 public static void main(String[] args) { 2 ScheduledExecutorService service = Executors.newScheduledThreadPool(5); 3 // 1. 延遲一定時間執行一次 4 service.schedule(() ->{ 5 System.out.println("schedule ==> 延遲一定時間執行一次"); 6 },2, TimeUnit.SECONDS); 7 // 2. 按照固定頻率周期執行 8 service.scheduleAtFixedRate(() ->{ 9 System.out.println("scheduleAtFixedRate ==> 按照固定頻率周期執行"); 10 },2,3,TimeUnit.SECONDS); 11 //3. 按照固定頻率周期執行 12 service.scheduleWithFixedDelay(() -> { 13 System.out.println("scheduleWithFixedDelay ==> 按照固定頻率周期執行"); 14 },2,5,TimeUnit.SECONDS); 15 }
-
首先我們看第一個方法 schedule , 它有三個引數,第一個引數是執行緒任務,第二個delay 表示任務執行延遲時長,第三個unit 表示延遲時間的單位,如上面代碼所示就是延遲兩秒后執行任務
1 public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
-
第二個方法是 scheduleAtFixedRate 如下, 它有四個引數,command 引數表示執行的執行緒任務 ,initialDelay 引數表示第一次執行的延遲時間,period 引數表示第一次執行之后按照多久一次的頻率來執行,最后一個引數是時間單位,如上面案例代碼所示,表示兩秒后執行第一次,之后按每隔三秒執行一次
1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
-
第三個方法是 scheduleWithFixedDelay 如下,它與上面方法是非常類似的,也是周期性定時執行, 引數含義和上面方法一致,這個方法和 scheduleAtFixedRate 的區別主要在于時間的起點計時不同,scheduleAtFixedRate 是以任務開始的時間為時間起點來計時,時間到就執行第二次任務,與任務執行所花費的時間無關;而 scheduleWithFixedDelay 是以任務執行結束的時間點作為計時的開始,如下所示
1 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
3.5 SingleThreadEcheduledExecutor
它實際和 ScheduledThreadPool,執行緒池非常相似,它只是 ScheduledThreadPool的一個特例,內部只有一個執行緒,它只是將 ScheduledThreadPool 的核心執行緒數設定為了 1,如原始碼所示:
1 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { 2 return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); 3 }
3.6 ForkJoinPool
這是一個在 JDK7引入的新新執行緒池,它的主要特點是可以充分利用多核CPU,可以把一個任務拆分為多個子任務,這些子任務放在不同的處理器上并行執行,當這些子任務執行結束后再把這些結果合并起來,這是一種分治思想,
3.7 newWorkStealingPool
WorkStealingPool背后是使用ForkJoinPool實作的(JDK8)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/379383.html
標籤:其他
