簡介
到此為止,我們學到的基本上都是有關順序編程的知識,即程式中所有事物在任意時刻都只能執行一個步驟,
編程問題中相當大的一部分都可以通過使用順序編程來解決,然而,對于某些問題,如果能夠并發地執行程式中的多個部分,則會變得非常方便,并發編程可以使得程式的處理速度得到極大的提高,但是在得到提高的同時,并發也會帶來一些問題,當并行執行的任務彼此開始互相干涉時,時機的并發問題就會接踵而至,
了解并發可以使我們意識到明顯正確的程式可能會展現出不正確的行為,
并發的多面性
更快的執行
現在的計算機基本上都是多核處理器,這意味我們的程式可以并行處理,而并發通常是提高運行在單個處理器上程式的性能,這聽起來好像并發并沒有什么用,事實上,我們考慮一下,在單核處理器上運行的并發程式開銷確實應該比該程式的所有部分都順序執行的開銷要大,因為執行緒之間存在背景關系切換的開銷,從表面上看,讓每個程式去占據一個處理器進行順序處理會開銷小一些,并且可以節省背景關系切換的代價,
但是我們如果考慮到阻塞問題,我們就會發現,如果我們的程式都是順序執行,一旦遇到了某個阻塞情況我們的程式就會無法執行下去,并且占據處理器,直到執行緒可以繼續執行下去,但是如果使用并發來撰寫,即使某個程式發生了阻塞,仍然不會影響其他程式的繼續執行,
實作并發最直接的方式是在作業系統級別使用行程,行程是運行在它自己的地址空間內的程式,多任務作業系統通過周期性地將CPU從一個行程切換到另一個行程,來實作同時運行多個進行,因為作業系統通常會將行程互相隔離開,使得它們不會互相干涉,但是Java所使用的并發系統會共享記憶體和I/O這樣的資源,以使得這些資源不會被多個任務同時訪問,
基本的執行緒機制
并發編程使我們可以將一個程式分割成多個分離的獨立的任務,通過多執行緒機制,這些任務中的每一個都將由執行執行緒來驅動,一個行程可以有多個執行緒,多個執行緒共享一個行程的資源,因此,單個行程可以擁有多個并發執行的任務,
定義任務
執行緒可以驅動任務,因此我們需要一種任務的抽象,這可以由Runable介面來提供,如果需要定義任務,只需要實作Runable介面并撰寫run()方法,使得該任務可以執行我們得命令,
下面例子中的LiftOff任務將顯示發射的倒計時
class LiftOff implements Runnable{
private Integer countDown = 10;
@Override
public void run() {
while(countDown-->0){
System.out.println("倒計時:"+countDown);
}
System.out.println("發射~");
}
}
當從Runable匯出一個類時,它必須具有run()方法,但是注意,這個方法與常規方法并沒有差異,也就是說直接執行run()方法依舊是單執行緒執行,若要實作執行緒行為,必須顯式地將一個任務附著到執行緒上
Thread類
將Runable物件轉變為作業任務的方式就是將它提交給一個Thread構造器,下面例子中展示了如何使用Thread來驅動上面的任務
public static void main(String[] args) {
new Thread(new LiftOff()).run();
/**
* 倒計時:9
* 倒計時:8
* 倒計時:7
* 倒計時:6
* 倒計時:5
* 倒計時:4
* 倒計時:3
* 倒計時:2
* 倒計時:1
* 倒計時:0
* 發射~
*/
}
Thread構造器只需要一個Runable物件,呼叫Thread物件的start方法為該執行緒執行必須的初始化操作,然后呼叫Runable的run方法,以便在這個新執行緒中啟動任務,注意,由于我們開辟了一條新的執行緒去執行任務,而不是在原有執行緒的基礎上去順序執行任務,所以在執行run方法時,main中的代碼也將繼續執行下去,
如果存在多個執行緒任務,不同的任務的執行將在執行緒被換進交換出時混在一起,這種交換是由執行緒調度器自動控制的,如果存在多處理器,執行緒調度器將會在這些處理器之間默默地分發執行緒,
在main中創建Thread物件時,它并沒有補獲任何對這些物件的參考,在使用普通物件時,如果沒有參考,將會被垃圾回收,但是在使用Thread時,就不會這個樣子了,每個Thread在任務執行完畢之前,都會一直存著,
Executor
雖然我們可以創建執行緒來完成我們的任務,但是在多執行緒形況下,如果頻繁地創建執行緒,會因為執行緒過多而導致堆疊記憶體溢位,因為每個執行緒,虛擬機都會為其分配單獨地堆疊記憶體,同時,除了記憶體溢位的風險,由于創建執行緒需要耗費大量的資源,所以我們需要某種方式來為我們控制執行緒的數量,同時在常規狀態下,保持執行緒而不需要在使用時重新創建,
JavaSE5的java.util.concurrent包中的執行器Executor將為我們管理Thread物件,從而簡化并發編程,Executor在客戶端和任務執行之間提供了一個間接層;與客戶端直接執行任務不同,這個中介物件將執行任務,Executor允許我們管理異步任務的執行,而無需顯式地管理執行緒的宣告周期,
我們重新構建上面的發射任務,增加了id標識來標識任務的編號
class LiftOff implements Runnable{
private Integer countDown = 10;
private static int count = 0;
private final int id = count++;
@Override
public void run() {
while(countDown-->0){
System.out.println(id+"----倒計時:"+countDown);
}
System.out.println("發射~");
}
}
我們使用Executor來代替顯式創建執行緒的方式,ExecutorService(具有服務宣告周期的Executor,例如關閉)直到如何構建恰當的背景關系來執行Runable物件,下面示例中,CachedThreadPool將為每個任務都創建一個執行緒,注意ExecutorService物件是使用靜態的Executor方法創建的
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new LiftOff());
}
/**
* 0----倒計時:9
* 1----倒計時:9
* 4----倒計時:9
* 0----倒計時:8
* 0----倒計時:7
* 2----倒計時:9
* 2----倒計時:8
* 2----倒計時:7
* ....
*/
}
newCachedThreadPool
前面例子中使用的newCachedThreadPool是一種沒有執行緒數量上限的執行緒池,這意味著一旦并發數過大,很容易引起記憶體溢位,在沒有任務時,該執行緒池并不會保持任何執行緒,它會在創建的所有執行緒達到生產時間上限60s時,沒有新任務的情況下銷毀執行緒,如果有新任務,執行緒池同時還有執行緒未過期的空閑執行緒則復用執行緒,過期或者沒有空閑則創建新執行緒,采用SynchronousQueue裝等待的任務,這個阻塞佇列沒有存盤空間,這意味著只要有請求到來,就必須要找到一條作業執行緒處理他,如果當前沒有空閑的執行緒,那么就會再創建一條新的執行緒,
FixedThreadPool
與newCachedThreadPool不同,FixedThreadPool創建的是一種定長執行緒池,可控制執行緒最大并發數,超出的執行緒會在佇列中等待,也就是說,執行緒池中的執行緒數量是固定的,通過構造時使用corePoolSize和maximunPoolSize來控制其長期保持的執行緒數量和一個最大執行緒數量,keepAliveTime為0,意味著一旦有多余的空閑執行緒,就會被立即停止掉,由于阻塞佇列是一個無界佇列,因此永遠不可能拒絕任務,
SingleThreadExecutor
SingleThreadExecutor可以看作是執行緒數量為一的FixedThreadPool,如果向SingleThreadExecutor提交了多個任務,那么這些任務將進行排隊,每個任務都會在下個任務開始前結束運行,所有任務使用同一條執行緒,SingleThreadExecutor執行執行緒是有序的,采用的阻塞佇列為LinkedBlockingQueue,
注意:雖然java為我們封裝了三種直接可以使用的執行緒池,但在阿里的程式員手冊中并不支持著三種執行緒池,因為它們認為程式員需要根據實際情況去對執行緒的數量使用的拒絕策略以及執行緒存活時間等引數進行設計,而不是簡單地將java庫中的這三個執行緒池作為首要使用選項,
從任務中生產回傳值(Callable)
Runable是執行作業的獨立任務,但是它不回傳任何值,如果你希望任務在完成時能夠回傳一個值,那么可以實作Callable介面而不是Runable介面,Callable是一種具有型別引數的泛型,它的型別引數的表示是從方法call()(而不是run())中回傳的值,并且必須使用ExecutorService()方法呼叫它,
public class SynchronizedDemo3 {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
List<Future<String>> results = new ArrayList<>();
for(int i=0 ;i<10;i++){
results.add(service.submit(new TaskWithResult(i)));
}
for(Future<String> fs: results){
try {
System.out.println(fs.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
service.shutdown();
/**
* result of TaskWithResult 0
* result of TaskWithResult 1
* result of TaskWithResult 2
* result of TaskWithResult 3
* result of TaskWithResult 4
* result of TaskWithResult 5
* result of TaskWithResult 6
* result of TaskWithResult 7
* result of TaskWithResult 8
* result of TaskWithResult 9
*/
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
@Override
public String call() throws Exception {
return "result of TaskWithResult "+id;
}
}
submit()會產生Future物件,通過Future物件我們就可以獲取到我們的任務的執行結果,isDone()方法查詢Future是否已經完成,當任務完成時,它具有一個結果,我們可以呼叫get()來獲取該結果,當然,如果不適應isDone()直接get()也沒有問題,get()將阻塞直到結果準備就緒,
優先級
執行緒的優先級將該執行緒的重要性傳遞給了執行緒調度器,調度器將傾向于讓優先權最高的執行緒先執行,但是這并不意味著低優先級的執行緒永遠得不到執行,優先級只代表獲取到cpu時間片的概率,
在絕大多數時間里,所有執行緒有應該以默認的優先級運行,
我們可以通過getPriority()和setPriority()來讀取和修改執行緒的優先級
Thread.currentThread().setPriority(priority);
后臺執行緒
所謂后臺執行緒,是指程式運行時在后臺提供一種通用服務的執行緒,并且這種執行緒并不屬于程式中不可或缺的部分,因此,當所有非后臺執行緒結束時,程式就會終止,同時會殺死行程中所有的后臺程式,也就是說,只要有任何非后臺程式還在運行,程式就不會終止,main函式就是一個非后臺執行緒
public class SynchronizedDemo6 {
public static void main(String[] args) throws InterruptedException {
for (int i=0 ;i<10;i++){
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true);
daemon.start();
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
}
class SimpleDaemons implements Runnable{
@Override
public void run() {
try {
while(true){
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread()+" "+this);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
必須在執行緒啟動之前呼叫setDaemon()方法,才能把它設定為后臺執行緒,
一旦main完成了作業,所有的后臺程式都會被結束掉
編碼的變體
到目前為止,所有的任務類都是通過實作Runable介面來完成實作的,但是除了這種方式我們還可以直接繼承Thread,
class MyThread extends Thread{
@Override
public void run() {
for (int i=0;i<10;i++){
System.out.println(i);
}
}
}
除此之外我們還能夠通過呼叫適當的Thread構造器為Thread物件賦予具體的名稱,這個名稱可以通過getName()獲取到,
public static void main(String[] args) {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"開始啟動~");
}
},"1號執行緒");
thread1.start();
/**
* 1號執行緒開始啟動~
*/
}
注意,start()如果在構造器中被呼叫將可能會引發問題,因為另一個任務可能會在構造器結束前開始執行,這意味著有些初始化作業還未完成,而任務卻能夠訪問那些處于不穩定的物件(未完成初始化的物件),因此我們建議使用Executor而不是顯式地創建Thread物件去執行任務,
捕獲例外
由于執行緒的本質特性,我們無法通過常規的try-catch捕獲從執行緒中逃逸的例外,一旦例外逃出任務的run方法,它就會向外傳播到控制臺,除非我們為執行緒提供一個例外處理器
public class SynchronizedDemo9 {
public static void main(String[] args) {
Thread thread = new Thread(new SimpleTask9());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("catch "+e);
}
});
thread.start();
/**
* catch java.lang.RuntimeException: Thread-0發生的例外
*/
}
}
class SimpleTask9 implements Runnable{
@Override
public void run() {
throw new RuntimeException(Thread.currentThread().getName()+"發生的例外");
}
}
但是注意,當我們使用執行緒池時我們無法直接接觸到執行緒,所以沒辦法為每個執行緒設定例外處理器,因此,我們需要為執行緒池提供我們重寫的執行緒工廠,并在工廠方法中為每個執行緒設定例外處理器,
下面例子中,在執行緒工廠中為每個執行緒提供了實作Thread.UncaughtExceptionHandler介面的例外處理器,
public class SynchronizedDemo8 {
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("catch "+e.getMessage());
}
});
return thread;
}
});
executor.execute(new SimpleTask());
}
static class SimpleTask implements Runnable{
@Override
public void run() {
throw new RuntimeException("發生例外~~");
}
}
}
臨界資源
可以把單執行緒程式當作在問題域求解的單一物體,每次只能做一件事,因為只有一個物體,所以永遠不用擔心諸如“兩個物體視圖使用同一個資源”這樣的問題,
有了并發就可以同時做很多事情了,但是,兩個或多個執行緒彼此相互干涉的問題也就出現了,如果不防范這種沖突,就可能發生兩個執行緒同時視圖訪問同一個銀行賬戶等諸多問題,
不正確地訪問資源
下面代碼中,以多個售票員賣票為例展示不正確的訪問資源
//臨界資源演示
Runnable r4 = ()->{
while(Ticket.tickets>0) {
System.out.println(Thread.currentThread().getName() + "賣出一張票,剩余:" + Ticket.tickets--);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread s1 = new Thread(r4,"售票員1");
Thread s2 = new Thread(r4,"售票員2");
Thread s3 = new Thread(r4,"售票員3");
Thread s4 = new Thread(r4,"售票員4");
s1.start();
s2.start();
s3.start();
s4.start();
class Ticket{
public static int tickets= 100;
}
運行結果:

上述買票問題我們會發現多個執行緒之間在賣票時余票出現了問題,
解決共享資源競爭
前面的示例展示了使用執行緒時的一個基本問題:你永遠都不知道一個執行緒何時在運行,想象一下,你坐在桌邊拿著叉子,正準備去叉盤中的最后一片食物時,這片事物突然消失了,因為你的執行緒被掛起了,而另一個用餐者進入并吃掉了它,但是你還不知情,這正是在偏斜并發程式時需要注意的問題,我們需要某種方式來防止兩個任務訪問相同的資源,
防止這種沖突的方法就是當資源被一個任務使用時,為其加上一把鎖,這樣在使用者解開鎖之前,其他任務將無法訪問它了,而在鎖解開時,另一個任務就可以鎖定并使用它,
基本上所有的并發模式在解決沖突問題時,都是從采用序列化訪問共享資源的方式,這意味著在給定時刻只允許一個任務訪問共享資源,通常這是通過在代碼前加上一條鎖陳述句來實作的,這就使得在一段時間內只有一個任務可以運行這段代碼,因為鎖陳述句產生了一種互相排斥的效果,因此這種機制常被稱為互斥量(mutex)
Java以提供關鍵字synchronized的形式,為防止資源沖突提供了內置支持,當任務要執行被syncgronized關鍵字保護的代碼片段的時候,它將檢查鎖是否可用,然后獲取鎖,執行代碼,釋放鎖,
共享資源一般四以物件形式存在的記憶體片段,但也可以是檔案、輸入輸出埠,或者是列印機,要控制對共享資源的訪問呢,得先把他包裝進一個物件,然后把所有要訪問這個資源的方法標記為synchronized,如果某個任務處于一個對同步方法的訪問中,那么在這個執行緒從該方法回傳前,其他所有想訪問該方法的執行緒都會被阻塞,
對于前面的售票問題,我們可以通過增加synchronized關鍵字來解決
public class SynchronizedDemo10 {
public static void main(String[] args) {
//臨界資源演示
Runnable r4 = ()->{
while(Ticket.tickets>0) {
synchronized ("2"){
if(Ticket.tickets>0){
System.out.println(Thread.currentThread().getName() + "賣出一張票,剩余:" + --Ticket.tickets);
}
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread s1 = new Thread(r4,"售票員1");
Thread s2 = new Thread(r4,"售票員2");
Thread s3 = new Thread(r4,"售票員3");
Thread s4 = new Thread(r4,"售票員4");
s1.start();
s2.start();
s3.start();
s4.start();
/**
* ...
* 售票員1賣出一張票,剩余:6
* 售票員2賣出一張票,剩余:5
* 售票員3賣出一張票,剩余:4
* 售票員1賣出一張票,剩余:3
* 售票員2賣出一張票,剩余:2
* 售票員4賣出一張票,剩余:1
* 售票員3賣出一張票,剩余:0
*
*/
}
}
class Ticket{
public static int tickets= 100;
}
在上面例子中,我們在增加synchronized之后,售票的剩余票數開始正常,并沒有多售的現象,
所有物件都含有一個單一的鎖(也成為監視器),當在物件上呼叫任意synchronized方法的時候,此物件都會被加鎖,這時該物件上的其他synchronized方法只有等到前一個方法被呼叫完畢并釋放鎖之后才能被呼叫,對于一個物件來說,其所有的同步方法都共享同一個鎖,者可以被用來防止多個任務同時訪問同一個方法,
注意,在使用并發時,將域設定為私有是非常重要的,否則synchronized就不能阻止其他任務直接訪問域,這樣會產生沖突
一個任務可以多次獲得物件的鎖,如果一個方法在同一個物件上呼叫了第二個同步方法,JVM會跟蹤物件被加鎖的次數,如果一個物件被解鎖,其計數為0.在任務第一次給物件加鎖時,計數變為1,每當這個相同的任務在這個物件上獲得鎖時,計數都會增加,顯然只有首先獲得了鎖的時候才被允許獲取多個鎖,
針對每個類也有一個鎖(它是類的Class的一部分),所以synchronized static方法可以在類的范圍內防止對static資料的并發訪問,
前面例子中我們沒有在方法上加synchronized關鍵字,而是直接在方法內部增加了synchronized代碼塊,我們可以看到前面代碼的synchronized后面跟著一個引數,那就是鎖,但這個鎖我們可以自定義,我們可以認為常規的synchronized方法就是默認將物件鎖作為synchronized的鎖引數,
使用顯式的Lock物件
除了使用synchronized關鍵字進行同步加鎖之外,我們還能夠通過Lock物件進行顯式加鎖,Lock物件必須被顯式地創建、鎖定和釋放,因此它與內建的鎖形式相比,代碼缺乏優雅性,但是對于解決某些型別的問題來說,它更加靈活,下面使用lock來重寫售票互斥
public static void main(String[] args) {
Lock lock = new ReentrantLock();
//臨界資源演示
Runnable r4 = ()->{
while(Ticket.tickets>0) {
lock.lock();
if(Ticket.tickets>0){
System.out.println(Thread.currentThread().getName() + "賣出一張票,剩余:" + --Ticket.tickets);
}
lock.unlock();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread s1 = new Thread(r4,"售票員1");
Thread s2 = new Thread(r4,"售票員2");
Thread s3 = new Thread(r4,"售票員3");
Thread s4 = new Thread(r4,"售票員4");
s1.start();
s2.start();
s3.start();
s4.start();
注意,如果我們使用互斥鎖的形式去替代同步方法,我們最好是將unlock()放在finally子句中,并且在try中嘗試回傳,因為這樣可以確保unlock()動作不會提前發生,過早地將資料暴露給第二個任務,
雖然使用lock物件看似更加地繁瑣,但事實上,這種形式可以為我們提供一些便利,如果在synchronized代碼中出現例外,那我們沒有辦法去做一些清理收尾作業,但是如果是使用lock物件,出翔例外時,我們還有機會在finally代碼中做一些清理作業,
大體上,當沃恩使用synchronized關鍵字時,需要寫的代碼量更少,并且用戶錯誤出現的可能性也會降低,因此只有在解決特殊問題時,才使用顯式的lock物件,比如我們可以通過lock物件實作一個自旋鎖,要實作這些,我們必須使用concurrent類別庫
下面代碼中,我們使用了tryLock實作了如果執行緒沒有獲得鎖,可以去執行其他任務,而不是堵塞在那兒,
public class SynchronizedDemo11 {
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Task11());
executor.execute(new Task11());
/**
* 1號執行緒搶到吃飯資格,開始吃飯
* 2號執行緒沒搶到吃飯資格,只能先去打太極
*/
}
}
class Task11 implements Runnable{
private static int idCount=0;
private final int id = ++idCount;
private static Lock lock = new ReentrantLock();
@Override
public void run() {
if(lock.tryLock()){
System.out.println(id+"號執行緒搶到吃飯資格,開始吃飯");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}else {
System.out.println(id+"號執行緒沒搶到吃飯資格,只能先去打太極");
}
}
public int getId() {
return id;
}
}
下面例中通過tryLock我們實作了自旋鎖,我們的執行緒在未獲得鎖的情況下,會再次嘗試獲取鎖,只要五次都獲取失敗才會放棄獲取鎖
public class SynchronizedDemo11 {
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Task11());
executor.execute(new Task11());
/**
* 1號執行緒搶到飯,開始吃飯
* 2號執行緒第1次搶飯失敗
* 2號執行緒第2次搶飯失敗
* 2號執行緒第3次搶飯失敗
* 2號執行緒第4次搶飯失敗
* 2號執行緒第5次搶飯失敗
*/
}
}
class Task11 implements Runnable{
private static int idCount=0;
private final int id = ++idCount;
private static Lock lock = new ReentrantLock();
@Override
public void run() {
int count=5;
while (count-- > 0){
if(lock.tryLock()){
System.out.println(id+"號執行緒搶到飯,開始吃飯");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
break;
}
System.out.println(id+"號執行緒第"+(5-count)+"次搶飯失敗");
}
}
public int getId() {
return id;
}
}
顯式的Lock物件在加鎖和釋放鎖方面,相對于內建的synchronized鎖來說,還賦予了更細粒度的控制力,
原子性和易變性
原子操作是不能被執行緒調度機制中斷的操作,一旦操作開始,那么它一定可以在發生背景關系呢切換之前執行完畢,
原子性可以應用于除了long和double之外所有基本型別之上的簡單操作,對于讀取和寫入除long和double之外的基本型別這樣的操作,可保證它們會被當作原子操作,但是JVM將64位的讀取和寫入當作兩個分離的32位操作來執行,這就產生了一個在讀寫操作中間發生背景關系切換,從而導致不同的任務可以看到不正確結果的可能性,但是當你定義long或double變數時,使用了volatile關鍵字,就會獲得原子性,
Volatile是如何保證域的同步性的
我們現在使用的大多數機器都是多核處理器,由于多個處理器擁有多個CPU,而每個CPU會擁有自己的本地記憶體(高速快取),每個CPU在執行操作前會預先將多條操作命令和資料讀取到本地記憶體中,然后進行操作,當操作完成后再一次性重繪到主存中,這就會引起一個問題,當多個CPU同時讀取一個變數時,也就是一個變數同時出現在了不同的本地記憶體中,一旦某個cpu執行的是一個修改操作,就會導致資料的不一致性問題,即其他cpu讀取的是一個不正確的資料值,這時候我們就需要引入volatile關鍵字,把目標變數宣告為volatile(不穩定的,這就指示JVM,這個變數是不穩定的,每次使用它都到主存中進行讀取,一般說來,多任務環境下各任務間共享的標志都應該加volatile修飾,
Volatile修飾的成員變數在每次被執行緒訪問時,都強迫從共享記憶體中重讀該成員變數的值,而且,當成員變數發生變化時,強迫執行緒將變化值回寫到共享記憶體,這樣在任何時刻,兩個不同的執行緒總是看到某個成員變數的同一個值,注意,被synchronized修飾的方法和代碼塊也是被直接重繪到主存中的,

volatile為什么對自增操作無效
我們前面提到,對于用volatile修飾的域的讀寫操作是直接面向主存的,因此,通過這種方式可以避免臟讀,但是這條法則在面對自增操作時會失效,首先,自增操作分成三步:
- 從主存讀取變數值到cpu暫存器
- 暫存器里的值+1
- 暫存器的值寫回主存
假設N個執行緒同一時候運行到了第一步,這就意味著現在這個變數處于N個執行緒的暫存器中,所以當某個執行緒將新值重繪到主存中時,其他的執行緒也都完成了修改,它們并不會重新去讀這個變數,
原子類
對多執行緒訪問同一變數,我們需要加鎖,而鎖是比較消耗性能的,jdk1.5之后,新增的原子操作類提供了一種簡單、性能高效、執行緒安全地更新一個變數的方式,這些類同樣位于juc包下的atomic包下,發展到jdk1.8,該報共有17個類,囊括了原子更新基本型別、原子更新陣列、原子更新新屬性、原子更新參考,
下面寫一個個原子類使用的簡單例子
AtomicIndteger實作一個執行緒安全的自增
/**
* AtomicInteger demo
*/
public class AtomicIntegerDemo {
private static AtomicInteger sum = new AtomicInteger(0);
public static void increase(){
sum.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException{
for (int i=0;i<10;i++){
new Thread(()->{
for (int j=0;j<10;j++){
increase();
System.out.println(sum);
}
}).start();
}
}
}
執行緒本地存盤
防止任務在共享資源上產生沖突的第二種方式是根除對變數的共享,執行緒本地存盤是一種自動化機制,可以為使用相同變數的每個不同執行緒都創建不同的存盤
我們防止任務在共享資源上產生沖突的第一種方式是通過對共享資源進行加鎖,確保同一時間只有一個執行緒能夠對共享資源進行操作,其實除了加鎖以外,我們還能夠將共享資源變成區域變數的形式,這樣對于每個執行緒都會擁有自己的變數,而執行緒之間是互相隔離的,不會對其他執行緒上的變數造成影響,但這種方式只會造成過多的資源浪費,就像一個公司里每個人都需要使用列印機,但是公司不可能為每個人都配備一臺列印機,
因此,我們本章引入概念叫做執行緒本地存盤,那什么是執行緒本地存盤,我們可以這么認為,將執行緒作為鍵,并將我們需要的共享資源作為值,這樣,當每個執行緒去取變數時,都能夠取到屬于自己的變數,
舉個例子,現在有一場考試,考試為了公平起見需要所有人上交手機,所以大家就都把手機放到同一個箱子里,但是我們要確保我們去拿的時候能夠拿回自己的手機
public class SynchronizedDemo13 {
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
executor.execute(new Runnable() {
@Override
public void run() {
Box.putPhone(new Phone(Thread.currentThread().getName()));
Thread.yield();
System.out.println("Thread "+Thread.currentThread().getName()+"拿到了"+Box.getPhone().getHost()+"的手機");
}
});
}
/**
* Thread pool-1-thread-1拿到了pool-1-thread-1的手機
* Thread pool-1-thread-2拿到了pool-1-thread-2的手機
* Thread pool-1-thread-3拿到了pool-1-thread-3的手機
* Thread pool-1-thread-5拿到了pool-1-thread-5的手機
* Thread pool-1-thread-4拿到了pool-1-thread-4的手機
*/
}
}
class Box{
private static ThreadLocal<Phone> phones = new ThreadLocal<>();
public static void putPhone(Phone phone){
phones.set(phone);
}
public static Phone getPhone(){
return phones.get();
}
}
class Phone{
private String host;
public Phone(String host){
this.host = host;
}
public String getHost(){
return host;
}
}
另外還在其他書上看到過,如果執行緒自然死亡,那么ThreadLcoal的資料也會跟著消失,如果是執行緒池,在執行緒會被復用的情況下,要手動清除,可以把map中的值set成null,讓GC可以作業,也可以呼叫ThreadLocal的remove方法,
并發編程實戰這本書里也提到ThreadLocal會降低代碼的可重用性,并在類之間引入隱含的耦合性,因此在使用時要格外小心,
執行緒狀態
借用Java 并發編程的藝術》圖一張
??
執行緒的狀態會影響任務的執行,一個執行緒可以具有多種狀態,而每種狀態都會因為不同的原因而產生,
一個執行緒可以處于以下四種狀態之一
- 初始態:執行緒被創建完成,但還未被呼叫start()
- 就緒態:在這種狀態下,只要被分配到了CPU時間片,執行緒就可以運行,
- 阻塞態:執行緒能夠運行,但是由于某種條件阻止了它的運行,在阻塞期間,調度器將忽略執行緒,不會為其分配任何時間片,直到執行緒重新進入就緒狀態,
- 死亡:執行緒執行完畢,通常是任務結束或者被中斷,
進入阻塞態的原因
一個任務進入阻塞狀態,可能有如下原因:
- 通過sleep(milliseconds)使任務進入休眠狀態,這種情況下,任務會在指定時間內不會運行,一旦時間結束,重新自動進入就緒態,
- 通過wait()使執行緒掛起,直到執行緒得到了notify()或notifyAll()訊息,執行緒才會進入就緒態,
- 任務正在等待某個輸入輸出完成
- 任務試圖在某個物件上呼叫其同步方法,但是為獲得鎖,
中斷
很多時候我們在執行緒中的任務會因為一些狀況使得我們不希望讓其繼續執行下去,這時候我們就需要通過某種方式來控制器流程,讓其提前結束任務執行,
這里有三種方式,并且我們會為其講解其優劣:
1.cancel標記
我們可以為我們的自定義現線提供一個取消標記,執行緒中的任務可以不斷去檢查這個標記來判斷是否需要提前終止任務,
public class SynchronizedDemo15 {
public static void main(String[] args) throws InterruptedException {
SimpleThread15 thread = new SimpleThread15();
thread.start();
Thread.sleep(3000);
thread.cancel();
/**
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread has been cancelled...
*/
}
}
class SimpleThread15 extends Thread{
private boolean cancelFlag = false;
@Override
public void run() {
try {
while (!cancelFlag){
System.out.println("thread executing...");
sleep(500);
}
System.out.println("thread has been cancelled...");
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
public void cancel(){
cancelFlag = true;
}
}
根據結果可以看到任務在執行3秒后,被主執行緒通過cancel方法取消掉了,在打開取消標記的同時,執行緒也就結束了運行,
這種方式可以實作任務的中斷,但是缺點有兩點:
- 我們需要自定義執行緒增加canel標記,同時由于需要使用取消標記,所以我們的run方法需要與Thread進行系結,也就是只能通過重寫Thread的run方法來撰寫任務,而不同將任務與執行緒分離去實作Runable介面,
- 由于我們通常使用執行緒組去管理執行緒,我們一般無法直接與執行緒進行互動,并且由于執行緒需要與任務系結,那么執行緒池中的執行緒將無法執行來自于其他外來的任務,因為執行緒池中的執行緒全部被系結了任務
2.拋出例外
我們通過拋出例外也可以實作執行緒的中斷,
public class SynchronizedDemo16 {
public static void main(String[] args) throws InterruptedException {
SimpleThread15 thread = new SimpleThread15();
thread.start();
Thread.sleep(3000);
thread.cancel();
/**
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread executing...
* thread has been cancelled...
*/
}
}
class SimpleThread16 extends Thread{
private boolean cancelFlag = false;
@Override
public void run() {
try {
while (true){
System.out.println("thread executing...");
sleep(500);
if(cancelFlag){
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
System.out.println("thread has been cancelled...");
System.out.println(e.getMessage());
}
}
public void cancel(){
cancelFlag = true;
}
}
但是通過拋出例外也會面臨一些問題:
- 例外處理問題:我們拋出例外后就意味著我們需要為執行緒設定例外處理程式來處理例外,并實作一些收尾作業,但是我們一般使用的是執行緒池,這意味著我們無法為每個任務都設定一個例外處理程式,因為執行緒池中的執行緒是通用的,隨即處理任務的,因此我們需要在run方法中手動處理所有的例外來確保任務能夠完美地做一些清理作業,
- 由于執行緒池分離執行緒和任務,我們沒有辦法動態地去控制任務的中斷標記來拋出例外,
3.interrupt方法
事實上,java中的Thread已經為我們提供的執行緒中斷的方法了,通過interrupt方法可實作任務的動態中斷,
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
System.out.println("executing...");
Thread.sleep(300);
}
} catch (InterruptedException e) {
System.out.println("Thread has been interrupted...");
}
}
});
thread.start();
Thread.sleep(1000);
thread.interrupt();
/**
* executing...
* executing...
* executing...
* executing...
* Thread has been interrupted...
*/
}
上面是單個執行緒的實作方式,我們可以看到在執行緒啟動后,執行緒會在回圈內不斷列印陳述句,但是在一秒后,我們的主執行緒通過呼叫interrupt()方法中斷了執行緒的執行,讓他出現了例外,
注意:雖然我們上面通過interrupt()方法中斷了執行緒的運行,但事實上,該方法除了sleep()狀態以外無法中斷執行緒的任何操作,在執行緒中存在一個中斷標記,我們通過interrupt()方法將其設定為true,而我們的sleep方法會檢查該標記,如果為true則拋出中斷例外,因此我們知道處理sleep以外,執行緒是不會因為該標記發生中斷的(包括鎖池中與阻塞時都不會去檢查中斷標記),但是中斷標記為我們動態中斷提供了幫助,我們只需要在任務中不斷檢查執行緒的中斷標記就可以實作由外部執行緒控制目標執行緒的動態中斷,
下面提供執行緒池版本的中斷方式
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new Runnable() {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()){
System.out.println("executing...");
Thread.sleep(300);
}
System.out.println("Thread has been interrupted...");
} catch (InterruptedException e) {
System.out.println("Thread has been interrupted...");
}
}
});
Thread.sleep(1000);
future.cancel(true);
/**
* executing...
* executing...
* executing...
* executing...
* Thread has been interrupted...
*/
}
執行緒池中無法直接與執行緒直接互動,因此我們無法直接呼叫執行緒的中斷方法,但是我們可以通過執行緒池的submit來啟動任務,這樣就可以通過回傳的Future物件來持有任務的背景關系,然后通過cancel發法來控制執行緒的中斷情況,
補充
Thread.interrupted():判斷當前執行緒中斷標記,并將其改回false,
Thread.currentThread().isInterrupted():判斷當前執行緒中斷標記,不修改中斷標記,
執行緒之間的互動協作
當我們使用執行緒同時運行多個任務時,我們需要借助鎖來同步兩個任務的行為,從而使得一個任務不會干涉另一個任務的資源,但是這種方式只能解決資源競爭的問題,我們還需要一些方式使得多個執行緒可以互相協調來完成同一個任務,比如有個工地,甲方聘請了兩個施工隊,施工隊A負責鋪設鋼結構,而施工隊B則負責澆筑水泥,但我們知道,水泥的澆筑需要建立在鋼結構已經鋪設完成的情況下,這時先決條件,不能夠交換順序,因此我們需要一種方式來使得人物之間能夠有一些互動來共同完成任務,
wait()和notify()
wait()方法可以使我們的執行緒等待某個條件發生變化,而改變這個條件超出當前方法的控制能力,因此執行緒只能等待,但是我們一定不希望在任務檢查這個條件的同時,不斷進行慷訓圈,這被稱為忙等待,通常是一種不良的CPU周期使用方式,只會不斷浪費CU的性能,因此我們需要wait()在等待外部條件變化前將執行緒掛起,并且只有notify()和notifyAll()發生時,任務才會被喚醒去檢查所產生的變化,
呼叫sleep()時鎖并沒有被釋放,呼叫yield()也是這樣的情況,但是放一個任務執行到wait()時,執行緒的執行會被掛起,物件上的鎖被釋放,這就意味著另一個鎖可以獲得這個鎖,我們可以這么認為,wait()就是在告訴外界:我剛剛已經完成了所能完成的事,因此我要在這里等待,但是在我等待期間并不會阻礙其他執行緒執行同步方法,
wait有兩個版本:
- 接受毫秒數,基本上與sleep相似,但是wait()會在等待期間釋放鎖,但是也可以被提前換醒或者等到時間截止自動蘇醒,
- 無引數,表示會無限期等待直到被喚醒,
wait()、notify()和notifyAll()有一個特點,它們并不是Thread的一部分,而是作為基類Object的一部分,這看起來很奇怪,因為這三個方法是針對執行緒的功能現在卻作為基類的一部分而實作,事實上因為這些方法是對鎖的操作,而這些方法操作的鎖也是所有物件的一部分,所以我們可以將wait()放在同步控制方法中,而不用考慮這個類是實作Thread還是Runable介面,其實這三個方法只能在同步方法和同步塊中被呼叫,而sleep()可以在非同步方法中呼叫,因為sleep不需要操作鎖,如果在非同步方法中呼叫這三個方法,可以通過編譯,畢竟這個是屬于Object的一部分方法,但是在運行期間會出現IllegalMonitorStateException例外,
總而言之,當我們需要呼叫者三個方法我們需要針對鎖物件去呼叫,也就是說我們必須在拿到鎖的情況下才能呼叫這三個方法,
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future task1 = executor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized ("1"){
while (true){
System.out.println("任務1正在執行...");
// Thread.sleep(100);
if(Thread.currentThread().isInterrupted()){
System.out.println("任務1暫停執行...");
"1".wait();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Future task2 = executor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized ("1"){
while (true){
System.out.println("任務2正在執行...");
//Thread.sleep(100);
if(Thread.currentThread().isInterrupted()){
System.out.println("任務2暫停執行...");
"1".wait();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
task1.cancel(true);
Thread.sleep(100);
executor.shutdownNow();
/**
* ...
* 任務1正在執行...
* 任務1正在執行...
* 任務1正在執行...
* 任務1暫停執行...
* 任務2正在執行...
* 任務2正在執行...
* 任務2正在執行...
* ...
*/
}
上面例子中我們發布了兩個任務,然后任務1會在執行0.1秒后呼叫wait(),由于任務1和任務2需要的鎖物件都是"1"物件,因此,任務二會在任務1釋放鎖之前被阻塞,我們通過運行結果看出,在執行wait()之后,被阻塞的執行緒2也開始運行,而任務1則停止運行,事實上,如果任務2再次呼叫notify()或者notifyAll(),任務1將被再次喚醒,進入鎖池等待鎖的釋放,當競爭到鎖之后會用上次wait()的地方往后繼續執行,
notify()和notifyAll()是對鎖物件進行操作的,正常情況,當有多個執行緒爭奪鎖時,第一個拿到鎖的任務執行完畢之后就會有第二個執行緒搶到鎖進行執行,但是如果是因為wait()進入等待,那么即使它需要的鎖被釋放它也不會去搶占鎖,因為它已經被掛起,需要notify()或notifyAll()進行喚醒,前者喚醒等待該鎖的wait等待佇列中隨機某個任務進入鎖池搶占鎖,而后者會喚醒所有等待該鎖的掛起佇列進入鎖池搶占鎖,
生產者和消費者問題
執行緒協作中生產者消費者問題是最為經典的案例之一,我們通過這個問題來對執行緒協作已經上面三個方法進行進一步理解,
首先,生產者負責生產產品,消費者負責消費產品,但是消費者一定是在生產者生產出物品之后才能進行消費,而生產者也不會無限生產產品,它一定會等到生產到目標產品數后等到產品銷售一空之后再繼續生產,畢竟誰也不能保證產品一定就能賣完,
首先我們創建一個產品類,產品上有一個id條碼,是在生產時有生產者寫入的
class Product{
private int id;
public Product(int id){
this.id = id;
}
@Override
public String toString() {
return "product"+id;
}
}
我們再創建一個商店類,生產者生產的產品需要放到商店銷售,同時消費者也會再商店消費產品,同時,商店由于規模原因只能放置三十件商品,
class Shop{
private LinkedList<Product> products = new LinkedList<>();
public void addProduct(Product product){
products.add(product);
}
public Product getProduct(){
return products.pop();
}
public boolean isEmpty(){
return products.isEmpty();
}
public boolean isFull(){
return products.size()>=30;
}
}
有了商店之后我們就需要供貨商來生產產品了,生產者會不停地制造商品直到產滿商品后不再繼續生產,當商品銷售一空時再被喚醒繼續開始生產,
class Producer implements Runnable{
private Shop shop;
Random random = new Random();
public Producer(Shop shop){
this.shop = shop;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(300);
synchronized (shop) {
if (shop.isFull()) {
System.out.println("完成產品生產...");
shop.wait();
}else {
if(!shop.isEmpty()){
shop.notifyAll();
}
Product product = new Product(random.nextInt(100));
shop.addProduct(product);
System.out.println("生產者生產了一件產品:" + product);
}
}
}
} catch(InterruptedException ex){
ex.printStackTrace();
}
}
}
對于客戶來說,只要商店的產品不為空,我就可以消費
class Consumer implements Runnable{
private Shop shop;
private static int count = 0;
private final int id = ++count;
public Consumer(Shop shop){
this.shop = shop;
}
@Override
public void run() {
try{
while (true){
Thread.sleep(350);
synchronized (shop){
if(shop.isEmpty()){
System.out.println("商品賣完了...");
shop.notifyAll();
shop.wait();
}else {
System.out.println(id+"號顧客消費商品:"+shop.getProduct().toString());
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
場景
public static void main(String[] args) {
Shop shop = new Shop();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(shop));
executor.execute(new Consumer(shop));
executor.execute(new Consumer(shop));
/**
* ...
* 生產者生產了一件產品:product58
* 生產者生產了一件產品:product78
* 2號顧客消費商品:product58
* 生產者生產了一件產品:product86
* 1號顧客消費商品:product78
* 2號顧客消費商品:product86
* 生產者生產了一件產品:product48
* 1號顧客消費商品:product48
* 商品賣完了...
* ...
*/
}
上面例子中我們制造了一個生產者和兩個消費者,消費者不斷消費生產者生產出的產品,這就是我們通過wait()、notify()以及notify()可以實作的執行緒間的互動案例,
使用顯式的Lock和Condition
事實上,除了同步方法和同步塊,我們還經常會使用Lock物件進行執行緒同步控制,我們可以在同步塊中通過對鎖物件通過wait()登封昂發進行執行緒控制,那么如果是使用Lock物件該如何,其實,Lock物件也為我們提供了允許執行緒掛起的Condition,我們可以通過再Condition上呼叫await()來掛起執行緒,然后當條件發生變化時通過signal()和signalAll()來喚醒在這個condition上被掛起的執行緒,
當Lock鎖使用公平模式的時候,可以使用Condition的signal(),執行緒會按照FIFO的順序沖await()中喚醒,當每個鎖上有多個等待條件時,可以優先使用Condition,這樣可以具體一個Condition控制一個條件等待,
https://www.jianshu.com/p/be2dc7c878dc
同步佇列
wait()和notify()方法以一種非常低級的方式解決了任務互動的問題,但在許多情況下我們可以瞄向更高的抽象級別,使用同步佇列來解決任務協作問題,同步佇列在任何時刻都只允許一個任務插入或移除元素,在java.util.concurrent.BlockingQueue介面中提供了這個佇列,這個介面有大量的實作,我們可以使用LinkedBlockingQueue,它是一個無界佇列,還可以使用ArrayBlockingQueue,它具有固定的尺寸,因此我們可以在它被阻塞前向其中放置有限數量的元素,
如果消費者任務視圖從佇列中獲取物件,而此時為空,那么這些佇列還可以掛起消費者任務,并當有更多元素可用時恢復消費者任務,相較于notify等,它更加簡單可靠,
下面例子中我們通過對生產者消費者佇列進行重構
首先將同步佇列作為產品容器,并設定長度為10,
class Shop2{
private BlockingQueue<Product> products = new ArrayBlockingQueue<Product>(10);
public void addProduct(Product product){
products.add(product);
}
public Product getProduct() throws InterruptedException {
return products.take();
}
}
產品物件保持不變
class Product{
private int id;
public Product(int id){
this.id = id;
}
@Override
public String toString() {
return "product"+id;
}
}
在生產者消費者中我們去除了所有的同步控制
class Producer implements Runnable{
private Shop shop;
Random random = new Random();
public Producer(Shop shop){
this.shop = shop;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(300);
synchronized (shop) {
if (shop.isFull()) {
System.out.println("完成產品生產...");
shop.wait();
}else {
if(!shop.isEmpty()){
shop.notifyAll();
}
Product product = new Product(random.nextInt(100));
shop.addProduct(product);
System.out.println("生產者生產了一件產品:" + product);
}
}
}
} catch(InterruptedException ex){
ex.printStackTrace();
}
}
}
class Consumer implements Runnable{
private Shop shop;
private static int count = 0;
private final int id = ++count;
public Consumer(Shop shop){
this.shop = shop;
}
@Override
public void run() {
try{
while (true){
Thread.sleep(350);
synchronized (shop){
if(shop.isEmpty()){
System.out.println("商品賣完了...");
shop.notifyAll();
shop.wait();
}else {
System.out.println(id+"號顧客消費商品:"+shop.getProduct().toString());
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后看看運行場景,可以發現加入阻塞佇列后即便不需要任何同步控制,生產者和消費者之間也可以很好地協作下去,
public static void main(String[] args) {
Shop shop = new Shop();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(shop));
executor.execute(new Consumer(shop));
executor.execute(new Consumer(shop));
/**
* ...
* 生產者生產了一件產品:product58
* 生產者生產了一件產品:product78
* 2號顧客消費商品:product58
* 生產者生產了一件產品:product86
* 1號顧客消費商品:product78
* 2號顧客消費商品:product86
* 生產者生產了一件產品:product48
* 1號顧客消費商品:product48
* 商品賣完了...
* ...
*/
}
suspend()和resume()
不建議使用,基本舍棄了,雖然它可以做到所謂的執行緒暫停,
原因如下:
- 容易造成同步物件被獨占:當我有一個執行緒占用同步資源時發生暫停,那么在被resume()之前該執行緒所占用的資源將沒有執行緒可以訪問,
- 導致資料不同步:當我有一個執行緒在修改臨界資源時發生暫停,我們無法保證整個修改已經完成,如果只修改了一般卻發生了暫停,那么其他執行緒在訪問時拿到的資料就會與暫停執行緒恢復后修改的資料出現差異,
詳細可以訪問:
獨占:https://www.jianshu.com/p/a075800838e8
資料不同步:https://www.jianshu.com/p/03f9b7cf8c07
stop()
不建議使用,基本舍棄了,雖然它可以做到所謂的執行緒暫停,
原因:
執行緒不安全:對使用stop(),是因為它不安全,它會解除由執行緒獲取的所有鎖定,當在一個執行緒物件上呼叫stop()方法時,這個執行緒物件所運行的執行緒就會立即停止,假如一個執行緒正在執行:synchronized void { x = 3; y = 4;} 由于方法是同步的,多個執行緒訪問時總能保證x,y被同時賦值,而如果一個執行緒正在執行到x = 3;時,被呼叫了 stop()方法,即使在同步塊中,它也干脆地stop了,這樣就產生了不完整的殘廢資料,而多執行緒編程中最最基礎的條件要保證資料的完整性,所以請忘記執行緒的stop方法,以后我們再也不要說“停止執行緒”了,而且如果物件處于一種不連貫狀態,那么其他執行緒能在那種狀態下檢查和修改它們,結果很難檢查出真正的問題所在,
死鎖
死鎖感覺沒什么好講的,最為典型的案例就是哲學家就餐問題,對于這個案例我們可以參考http://c.biancheng.net/view/1233.html,
死鎖的形成主要有四個條件:
- 請求等待條件
- 互斥條件
- 不可剝奪條件
- 回圈等待條件
新類別庫中的構件
javaSE5中引入了大量的新類來解決并發問題,
CountDownLatch
他被用來同步一個或多個任務,強制它們等待由其他任務執行的一組操作完成,
我們可以向CountDownLaunch物件設定一個初始計數值,任何通過該物件呼叫await()的方法都將被阻塞,直到這個計數值到0,而減小這個計數值的方式就是呼叫它的counDown(),
假設我們有一組操作是準備操作,同時準備操作的數量是固定的,我們可以通過CountDownLatch來設計一個需要等待準備操作完成才能正式進行作業的案例,
假設我們現在有一個游戲,叫做123木頭人,這個游戲必須在喊完“123木頭人”的情況下才能睜開眼睛,因此我們通過CountDownLatch來模擬這個程序,
public class SynchronizedDemo23 {
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(1);
executor.execute(new OpenEyes(countDownLatch));
executor.execute(new Ready(countDownLatch));
/**
* 1
* 2
* 3
* 木頭人
* 睜開眼睛
*/
}
}
class Ready implements Runnable{
private CountDownLatch countDownLatch;
public Ready(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("1");
Thread.sleep(1000);
System.out.println("2");
Thread.sleep(1000);
System.out.println("3");
Thread.sleep(1000);
System.out.println("木頭人");
Thread.sleep(1000);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class OpenEyes implements Runnable{
private CountDownLatch countDownLatch;
public OpenEyes(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try{
countDownLatch.await();
System.out.println("睜開眼睛");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我們可以發現我們是先執行睜眼的執行緒,但是由于CountDownLatch還沒歸零,因此該執行緒會阻塞到CountDownLatch歸零為止,
注意: CountDownLatch是一個一次性的,一旦歸零就不可恢復不可二次使用,如果希望有一個可復用的CountDownLatch可以使用CyclicBarrier
DelayQueue
這是一個誤解的BlockingQueue,用于放置實作了Delayed介面物件,其中的物件只有才事件到期后才能從佇列中取走,這種佇列是有序的,即對頭物件一定是第一個到期的,如果沒有到期的物件,通過poll()將回傳null值,
在現實生活中,就存在很多需要用到延時的情況,拿我們在銀行的定期存款作為一個例子,我們都知道,每個銀行都會有一些定期存的業務,因為定期存的利息會比活期高,但是一旦定期存后就不能隨意取出,必須等到期限滿后才能取出,
public class SynchronizedDemo24 {
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
//銀行開業
Bank bank = new Bank();
executor.execute(bank);
//路人甲存入一筆存款
bank.deposit("111111",3000d,3);
//路人乙存入一筆存款
bank.deposit("222222",10000d,6);
//路人甲是否取到錢
boolean flag1 = false;
//路人乙是否取到錢
boolean flag2 = false;
while (true){
Thread.sleep(1000);
if(!flag1){
flag1 = bank.getDeposition("111111")>0?true:false;
}
if(!flag2){
flag2 = bank.getDeposition("222222")>0?true:false;
}
if(flag1 && flag2){
break;
}
}
/**
* 111111存入了一筆3000.0RMB的3月的定期存款
* 222222存入了一筆10000.0RMB的6月的定期存款
* 當前111111賬戶的存款還有1月到期
* 111111賬戶取出0.0RMB
* ...
* 當前111111賬戶的存款還有0月到期
* 111111賬戶取出0.0RMB
* 當前222222賬戶的存款還有2月到期
* 222222賬戶取出0.0RMB
* 111111的定期到期可以取出......
* 111111賬戶取出3012.5RMB
* 當前222222賬戶的存款還有1月到期
* ...
* 當前222222賬戶的存款還有0月到期
* 222222賬戶取出0.0RMB
* 222222的定期到期可以取出......
* 222222賬戶取出10041.666666666666RMB
*/
}
}
class Bank implements Runnable{
private DelayQueue<Deposition> depositionsInTime = new DelayQueue<>();
private LinkedList<Deposition> depositionsOutTime = new LinkedList<>();
public void deposit(String account,Double money,Integer months){
depositionsInTime.add(new Deposition(account, money, months));
System.out.println(account+"存入了一筆"+money+"RMB的"+months+"月的定期存款");
}
public Double getDeposition(String account){
Double money = 0d;
synchronized (depositionsOutTime){
for(Deposition d:depositionsOutTime){
if(d.getAccount().equals(account)){
money+=d.getMoney();
depositionsOutTime.remove(d);
}
}
}
if(money == 0){
for (Deposition d:depositionsInTime) {
if(d.getAccount().equals(account)){
System.out.println("當前"+account+"賬戶的存款還有"+d.getDelay(TimeUnit.SECONDS)+"月到期");
}
}
}
System.out.println(account+"賬戶取出"+money+"RMB");
return money;
}
@Override
public void run() {
try {
while (true){
Thread.sleep(500);
Deposition deposition;
while ((deposition = depositionsInTime.poll()) != null){
deposition.setMoney(deposition.getMoney()*(1+(0.05/12.0)));
System.out.println(deposition.getAccount()+"的定期到期可以取出......");
depositionsOutTime.add(deposition);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Deposition implements Delayed {
private final String account;
private Double money;
private long trigger;
public Deposition(String account,Double money,Integer months){
this.account = account;
this.money = money;
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND,months);
this.trigger = calendar.getTimeInMillis();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
Deposition deposit = (Deposition) o;
return this.trigger> deposit.trigger?1:-1;
}
public String getAccount() {
return account;
}
public Double getMoney() {
return money;
}
public void setMoney(Double money) {
this.money = money;
}
}
上面例子中我們將銀行作為延遲物件的持有者,每筆定期存款都作為一個延時物件,然后甲乙兩個路人會在存款后不斷嘗試取款,但是只有在到期后他們才會取出,同時一旦到期銀行也會列印到期資訊,相當于模擬了我們的短信通知,
PriorityBlockingQueue
PriorityBlockingQueue是一個支持優先級的無界阻塞佇列,直到系統資源耗盡,默認情況下元素采用自然順序升序排列,也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造引數Comparator來對元素進行排序,但需要注意的是不能保證同優先級元素的順序,PriorityBlockingQueue也是基于最小二叉堆實作,使用基于CAS實作的自旋鎖來控制佇列的動態擴容,保證了擴容操作不會阻塞take操作的執行,
詳細的原理可以參考這篇博文https://www.cnblogs.com/yaowen/p/10708249.html
總結
并發相關的知識點遠遠不止這些,我們還需要更多地學習,其實每個構件的內在框架與使用邏輯都是值得我們去學習的,所以關于構件和鎖的機制會在后面不斷去更新新的博客,盡量做到每個章節只講一種構件,剖析其思想與優劣,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/227921.html
標籤:java
