BlockingQueue - 阻塞佇列
定義
BlockingQueue 一般用于生產者-消費者模式,生產者是往佇列里添加元素的執行緒,消費者是從佇列里拿元素的執行緒,BlockingQueue 就是存放元素的容器,
1. 常用方法

-
拋出例外:如果操作無法立即執行,則會拋出例外,當阻塞佇列滿的時候,再往隊里里插入元素,會拋出 Queue full 例外,當佇列為空時,從佇列里獲取元素會拋出 NoSuchElementException 例外,
-
回傳特殊值:如果插入、洗掉操作成功,那么會回傳結果:true、false,
-
一直阻塞:當佇列滿時,執行 put 增加操作那么會阻塞;當佇列為空時,執行take 移除操作,那么也會阻塞,
-
超時退出:在 offer、poll 操作時,可以設定超時的屬性,例如超時的時間、時間單位,如果在給定的時間內沒有能夠執行完成,那么就會回傳true、false標識操作執行的狀態,
不能往阻塞佇列中插入 null,會拋出空指標例外,
可以訪問阻塞佇列中的任意元素,呼叫 remove(o)可以將佇列之中的特定物件移除,但并不高效,
2. 具體實作類
-
ArrayBlockingQueue
由陣列結構組成的有界阻塞佇列,初始化時必須指定容量大小,默認采用的是unfair 鎖,
-
LinkedBlockingQueue
由鏈表結構組成的有界阻塞佇列,具有鏈表的特性,默認的佇列的大小是
Integer.MAX_VALUE,也可以指定大小, -
DelayQueue
延時佇列,該佇列中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素,其中的元素必須實作
Delayed介面,內部是一個優先級佇列,沒有大小限制的佇列,所以 take 和 put 操作永遠不會阻塞,以為內部都是呼叫 poll 和 offer, -
PriorityBlockingQueue
基于優先級的無界佇列,優先級通過判斷傳入構造引數中的
Compator物件決定,因為是無界的,所以不會阻塞生產者put,但是當佇列中容量為空時,則會阻塞消費者 take 操作, -
SynchronousQueue
內部沒有任何容量,每個put操作必須等待一個take操作,內部的實作其實是
TransferQueue(公平模式下),TransferStack(非公平模式下), 直接使用CAS實作執行緒的安全訪問,當它生產產品(即put的時候),如果當前沒有人想要消費產品(即當前沒有執行緒執行take),此生產執行緒必須阻塞,等待一個消費執行緒呼叫take操作,take操作將會喚醒該生產執行緒,同時消費執行緒會獲取生產執行緒的產品(即資料傳遞),這樣的一個程序稱為一次配對程序(當然也可以先take后put,原理是一樣的),公平模式下,總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則,
非公平模式下,總結下來就是:堆疊頂匹配堆疊頂出堆疊,先進后出,體現非公平原則,
生產者生產資料的速度絕對不能快于消費者消費資料的速度,否則時間一長,則會耗盡所有的可用堆記憶體空間,
實作原理
下面就詳細講解下ArrayBlockingQueue的實作原理,主要是利用了 Lock 鎖的多條件(Condition)阻塞控制,
-
構造方法
// 陣列初始容量大小, // 公平鎖的方法,默認是false 非公平鎖 // notEmpty:標記為消費者條件 // notFull: 標記為生產者條件 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } -
put
/** * 插入元素到佇列中,如果佇列滿了,那么就會等待佇列可用時 * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { // 檢查元素是否為空,如果為空則會拋出空指標例外 checkNotNull(e); // 全域鎖 final ReentrantLock lock = this.lock; // 獲取全域鎖,直到獲取或者中斷則結束 lock.lockInterruptibly(); try { // 佇列滿時,生產者釋放鎖,阻塞當前執行緒不進行入隊操作 while (count == items.length) notFull.await(); // 入隊 enqueue(e); } finally { lock.unlock(); } } ? /** * 入隊操作,然后喚醒一個等待的消費者執行緒 */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }總結 put 流程:
-
所有執行 put 操作的執行緒競爭 lock 鎖,拿到了 lock 鎖的執行緒進入下一步,沒有拿到 lock 鎖的執行緒自旋競爭鎖,
-
判斷阻塞佇列是否滿了,如果滿了,則阻塞生產者執行緒繼續生產,同時釋放 lock 鎖,等待被消費者執行緒喚醒,
-
如果沒有滿,呼叫
enqueue(E x)使得 x 元素進行入隊操作,喚醒一個消費者執行緒,
-
-
take
// 獲取佇列元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取全域鎖 lock.lockInterruptibly(); try { // 當佇列為空,阻塞消費者執行緒獲取元素 while (count == 0) notEmpty.await(); // 佇列不為空,回傳出隊元素 return dequeue(); } finally { lock.unlock(); } } ? // 出隊,先進先出 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 喚醒一個生產者執行緒 notFull.signal(); return x; }總結 take 操作:
-
所有執行 take 操作的執行緒競爭 lock 鎖,沒有拿到全域鎖,則會自旋競爭鎖,
-
判斷佇列是否為空,如果為空,則阻塞消費者執行緒獲取元素,
-
如果不為空,呼叫
dequeue()回傳出隊元素,最后喚醒一個生產者執行緒,
-
put 和 take 操作都是需要先獲取鎖,
-
拿到鎖后,還要查看佇列狀態,是否滿足入隊、出隊條件,
-
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/274854.html
標籤:其他
上一篇:pwnable_hacknote
