1、AbstractQueue抽象佇列、BlockingQueue阻塞佇列、Deque雙端佇列
2、Queue FIFO先進先出,
寫入:佇列滿阻塞等待,取出:佇列滿阻塞等待生產
3、使用場景:多執行緒并發處理、執行緒池
4、阻塞佇列(BlockingQueue)——四組API(添加、移除、判斷佇列首部的場景)
==1、會拋出例外
==2、有回傳值,不拋出例外
==3、阻塞等待(一直阻塞等待)
==4、超時等待
=========拋出例外 add()\remove()\element()=========
//拋出例外
public static void atest(){
//陣列型別的阻塞佇列 要指定 佇列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 回傳Boolean值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
/*
java.lang.IllegalStateException: Queue full
佇列已滿 拋出例外
*/
// System.out.println(blockingQueue.add("d"));
//檢測隊首元素
System.out.println("隊首元素"+blockingQueue.element());
//移除操作 回傳移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素為空 不能再移除 拋出例外
*/
// System.out.println(blockingQueue.remove());
} public static void atest(){
//陣列型別的阻塞佇列 要指定 佇列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 回傳Boolean值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
/*
java.lang.IllegalStateException: Queue full
佇列已滿 拋出例外
*/
// System.out.println(blockingQueue.add("d"));
?
//移除操作 回傳移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素為空 不能再移除 拋出例外
*/
// System.out.println(blockingQueue.remove());
}
=========有回傳值,不拋出例外 add()\remove()\element()=========
// 不拋出例外
public static void btest(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加 回傳Boolean值 添加成功 回傳true
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// 佇列大小為3 繼續添加元素 不能繼續添加 回傳false
System.out.println(blockingQueue.offer("d"));
?
//移除 回傳元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 佇列在元素全部移除后 繼續移除操作 不能繼續移除 回傳null
System.out.println(blockingQueue.poll());
}
=========阻塞等待(一直阻塞等待) add()\remove()\element()=========
// 一直阻塞等待 public static void ctest() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); //添加元素 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); // 佇列已滿 如果繼續添加元素 則造成阻塞 一直阻塞等待 程式一直在等// blockingQueue.put("d"); //移除元素 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //佇列為空 如果繼續移除元素 則造成阻塞 一直在等待元素來移除 程式一直在等// System.out.println(blockingQueue.take());? }
=========超時等待 add()\remove()\element()=========
// 等待超時退出
public static void dtest() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加元素 offer(元素,時間,單位) offer方法的多載方法
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//佇列已滿 繼續添加元素 等待超過2秒就結束程式
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
?
//移除元素 poll(時間,單位) poll方法的多載方法
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//佇列為空 繼續移除元素 等待超過2秒就結束程式
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
5、同步佇列
==沒有容量(即不長時間存盤元素)
進去一個元素,必須等待取出(take())后才能再往里邊繼續添加(put())
==例子:
BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();//同步佇列
//添加元素 執行緒
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"執行緒1").start();
?
//移除元素 執行緒
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"執行緒2").start();
}
AQS
1、一個用來構建鎖和同步器的框架,定義了鎖的實作機制,并開放出擴展的地方,讓子類去實作,(封裝得很好,但又有子類擴展的地方)
例如:lock(加鎖)時鎖的內部機制:
使用鎖Lock時,AQS開放state欄位,然子類可以根據state欄位來決定是否能夠獲得鎖,對于獲取不到鎖的執行緒,AQS會自動進行管理,無需子類鎖關心,
2、使用AQS能夠簡單且高效地構造出大量應用廣泛的同步器:ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等,都基于AQS,也可以自定義同步器,
3、AQS底層:
==(1)由 同步佇列 + 條件佇列 聯合實作(CLH佇列鎖)
====一般情況下有同步佇列(雙向鏈表)組成,條件佇列(單向鏈表)不是必須存在的,當程式中存在condition時,才會存在此串列,
====同步佇列管理獲取不到鎖的執行緒的排隊和釋放
====條件佇列是在一定場景下,對同步佇列的補充(非必須的),如,獲得鎖的執行緒從空佇列中拿資料(佇列是空的,拿不到資料的),此時,條件佇列會管理該執行緒,使執行緒阻塞,
==(2)核心思想:AQS內部維護一個CLH佇列來管理,
====執行緒請求共享資源時,
如果被請求的共享資源空閑,則將當前請求資源的執行緒設定為有效執行緒,并且將共享資源設定為鎖定狀態;
如果被請求的共享資源被占用,則需要CLH佇列鎖實作的機制來實作執行緒阻塞等待以及執行緒被喚醒使鎖的分配:即將暫時獲取不到鎖的執行緒加入到佇列中,(上邊的同步佇列)
==(3)CLH鎖佇列:
====是一個虛擬的雙向佇列(不存在佇列實體,僅存在節點間的關聯關系)
====AQS是將每條請求共享資源的執行緒封裝成一個CLH鎖佇列的一個個節點(Node)實作鎖的分配,
==(4)AQS中同步佇列的作業流程和資料結構:(結合兩圖理解)

====作業程序:
(1)當前執行緒獲取同步狀態失敗,同步器會將當執行緒及等待狀態等資訊構成一個Node節點,加入CLH佇列中,放在隊尾,同步器重新設定尾節點,
(2)加入佇列后,會阻塞當前執行緒
(3)同步狀態被釋放并且同步器重新設定首節點,同步器喚醒等待佇列中第一個節點,讓其再次獲取同步狀態,
====AQS使用一個int成員變數來表示同步狀態,通過內置的FIFO佇列來完成獲取資源執行緒的排隊作業,
====AQS使用CAS對該同步狀態進行原子性操作實作對其值的修改
private volatile int state;//共享變數,使用volatile修飾保證執行緒可見性
====狀態資訊通過protected型別的getState,setState,compareAndSetState進行操作
//回傳同步狀態的當前值
protected final int getState() {
return state;
}
//設定同步狀態的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)將同步狀態值設定為給定值update如果當前同步狀態的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4、AQS對資源的共享方式
定義了兩種資源共享方式:
==(1)Exclusive(獨占):又分為公平鎖和非公平鎖
只有一個執行緒能執行,如ReentrantLock
====公平鎖:按照執行緒在佇列中的排隊順序, 先到者先拿到鎖
====非公平鎖:當前執行緒要獲取鎖時,無視佇列內的順序,直接去強鎖,誰搶到了就是誰的,
==(2)Share(共享):
多個執行緒可以同時執行,如Semaphore\CyclicBarrier\ReadWriteLock\CountDownLatch
==(3)ReentrantReadWriteLock:組合兩種資源共享方式的
====讀鎖運行多個執行緒同時執行對同一資源進行讀操作
====寫鎖僅允許一個執行緒能執行對同一資源進行寫操作
==(4)不同自定義的同步器有不同的共享資源方式,自定義同步器在實作時,只需要實作共享資源state的獲取與釋放方式即可,
5、AQS使用的設計模式
(1)基于模板設計模式的
(2)自定義同步器的方式:
==1、繼承AbstractQueuedSynchronizer并重寫指定的方法
重寫方法是指對于共享資源state的獲取和釋放
==2、將AQS組合在自定義同步組件的實作中,并呼叫其模板方法
這些模板方法會呼叫上邊繼承重寫的方法
====AQS提高的模板方法
protected boolean tryAcquire(int)//獨占方式,嘗試獲取資源,成功則回傳true,失敗則回傳false,
protected boolean tryRelease(int)//獨占方式,嘗試釋放資源,成功則回傳true,失敗則回傳false,
protected int tryAcquireShared(int)//共享方式,嘗試獲取資源,負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源,
protected boolean tryReleaseShared(int)//共享方式,嘗試釋放資源,成功則回傳true,失敗則回傳false,
protected boolean isHeldExclusively()//該執行緒是否正在獨占資源,只有用到condition才需要去實作它,
====除以上的方法外,AQS類中其它方法不能被重寫,都為final
====例:
=======獨占方式=======
ReentrantLock
(1)state初始化為0,表示未鎖定狀態;
(2)A執行緒lock()時,會呼叫tryAcquire()獨占該鎖并state+1;
(3)其它執行緒tryAcquire()時會失敗,直到A執行緒unlock()到state=0,即釋放,其它執行緒才能獲取該鎖,
(4)A執行緒釋放前,A自己時可以重復獲取此鎖的(state++),即可重入
(5)A執行緒釋放,一定要將state回歸為state=0
=======共享方式=======
CountDownLatch,任務分為N個子執行緒進行執行
(1)state初始化為N(與執行緒數一致)
(2)N個子執行緒并行每執行countDown()一次,state CAS減一
(3)所有子執行緒都執行完成后即state=0,會unpark()主呼叫執行緒
(4)主呼叫執行緒從await()函式回傳,繼續后續的操作
6、AQS組件
(1)Semaphore(信號量):允許多個執行緒同時訪問
====synchronized和ReentrantLock是一次只允許一個執行緒訪問某個資源
(2)CountDownLatch(倒計時器):同步工具類,用來協調多個執行緒間的同步,通常用來控制執行緒等待,可以讓某個執行緒等待直到倒計時結束,再開始執行,
(3)CyclicBarrier(回圈柵欄):可以實作執行緒間的技術等待,功能更強大復雜,
== Cyclic(可回圈使用)的Barrier(屏障):
====讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最后一個執行緒到達屏障(同步點)時,才會打開屏障,所有被攔截的執行緒此時才會繼續執行,
====CyclicBarrier的默認構造方法:CyclicBarrier(int parties)
=======parties引數:表示屏障攔截的執行緒數量
=======每個執行緒呼叫await()方法告知CyclicBarrier已到達屏障,然后被阻塞,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/458448.html
標籤:其他
上一篇:Makefile學習筆記
下一篇:Dubbo 學習筆記
