LinkedBlockingDeque介紹
【1】LinkedBlockingDeque是一個基于鏈表實作的雙向阻塞佇列,默認情況下,該阻塞佇列的大小為Integer.MAX_VALUE,可以看做無界佇列,但也可以設定容量限制,作為有界佇列,
【2】相比于其他阻塞佇列,LinkedBlockingDeque 多了 addFirst、addLast、peekFirst、peekLast 等方法,以first結尾的方法,表示插入、獲取或移除雙端佇列的第一個元素,以 last 結尾的方法,表示插入、獲取或移除雙端佇列的最后一個元素,但本質上并沒有優化鎖的競爭情況,因為不管是從隊首還是隊尾,都是在競爭同一把鎖,只不過資料插入和獲取的方式多了,
LinkedBlockingDeque的原始碼分析
【1】屬性值
//典型的雙端鏈表結構 static final class Node<E> { E item; //存盤元素 Node<E> prev; //前驅節點 Node<E> next; //后繼節點 Node(E x) { item = x; } } // 鏈表頭 本身是不存盤任何元素的,初始化時item指向null transient Node<E> first; // 鏈表尾 transient Node<E> last; // 元素數量 private transient int count; // 容量,指定容量就是有界佇列 private final int capacity; //重入鎖 final ReentrantLock lock = new ReentrantLock(); // 當佇列無元素時,鎖會阻塞在notEmpty條件上,等待其它執行緒喚醒 private final Condition notEmpty = lock.newCondition(); // 當佇列滿了時,鎖會阻塞在notFull上,等待其它執行緒喚醒 private final Condition notFull = lock.newCondition();
【2】建構式
public LinkedBlockingDeque() { // 如果沒傳容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // 為保證可見性而加的鎖 try { for (E e : c) { if (e == null) throw new NullPointerException(); //從尾部插入元素,插入失敗拋出例外 if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
【3】核心方法分析
1)入隊方法
//添加頭結點元素 public void addFirst(E e) { //如果添加失敗,拋出例外 if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } //添加尾結點元素 public void addLast(E e) { //如果添加失敗,拋出例外 if (!offerLast(e)) throw new IllegalStateException("Deque full"); } //添加頭結點元素 public boolean offerFirst(E e) { //添加的元素為空 拋出空指標例外 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,并呼叫添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { //解鎖 lock.unlock(); } } //添加尾結點元素 public boolean offerLast(E e) { //添加的元素為空 拋出空指標例外 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,并呼叫添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } } //頭部插入 private boolean linkFirst(Node<E> node) { //當前容量大于佇列最大容量時,直接回傳插入失敗 if (count >= capacity) return false; //佇列中的頭結點 Node<E> f = first; //原來的頭結點作為 新插入結點的后一個結點 node.next = f; //替換頭結點 為新插入的結點 first = node; //尾結點不存在,將尾結點置為當前新插入的結點 if (last == null) last = node; else //原來的頭結點的上一個結點為當前新插入的結點 f.prev = node; //當前容量增加 ++count; //喚醒讀取時因佇列中無元素而導致阻塞的執行緒 notEmpty.signal(); return true; } //尾部插入 private boolean linkLast(Node<E> node) { //當前容量大于佇列最大容量時,直接回傳插入失敗 if (count >= capacity) return false; //獲取尾節點 Node<E> l = last; //將新插入的前一個節點指向原來的尾節點 node.prev = l; //尾結點設定為新插入的結點 last = node; //頭結點為空,新插入的結點作為頭節點 if (first == null) first = node; else //將原尾結點的下一個結點指向新插入的節點 l.next = node; //當前容量增加 ++count; //喚醒讀取時因佇列中無元素而導致阻塞的執行緒 notEmpty.signal(); return true; } //頭結點插入 public void putFirst(E e) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,并呼叫添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { //頭結點如果插入失敗,會阻塞該方法,直到取出結點或洗掉結點時被喚醒 while (!linkFirst(node)) notFull.await(); } finally { //解鎖 lock.unlock(); } } //尾結點插入 public void putLast(E e) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,并呼叫添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { //尾結點如果插入失敗,會阻塞該方法,直到取出結點或洗掉結點時被喚醒 while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } } //頭結點插入 可指定阻塞時間 public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //計算剩余應阻塞時間 long nanos = unit.toNanos(timeout); //這邊會加鎖,并呼叫添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; //獲取可回應中斷的鎖,保證阻塞時間到期后可重新獲得鎖 lock.lockInterruptibly(); try { //頭結點如果插入失敗,會阻塞該方法,直到取出結點或洗掉結點時被喚醒 //或者阻塞時間到期直接回傳失敗 while (!linkFirst(node)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { //解鎖 lock.unlock(); } } //頭結點插入 可指定阻塞時間 public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //計算剩余應阻塞時間 long nanos = unit.toNanos(timeout); //這邊會加鎖,并呼叫添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; //獲取可回應中斷的鎖,保證阻塞時間到期后可重新獲得鎖 try { //尾結點如果插入失敗,會阻塞該方法,直到取出結點或洗掉結點時被喚醒 //或者阻塞時間到期直接回傳失敗 while (!linkLast(node)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { //解鎖 lock.unlock(); } }
2)出隊方法
//洗掉頭結點 - 加鎖 直接回傳結果 public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { //呼叫洗掉頭結點核心方法 return unlinkFirst(); } finally { lock.unlock(); } } //洗掉尾結點 - 加鎖 直接回傳結果 public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { //呼叫洗掉尾結點核心方法 return unlinkLast(); } finally { lock.unlock(); } } //洗掉頭結點 - 加鎖,如果洗掉失敗則阻塞 public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //呼叫洗掉頭結點核心方法 while ((x = unlinkFirst()) == null) //阻塞 notEmpty.await(); return x; } finally { lock.unlock(); } } //洗掉尾結點 - 加鎖,如果洗掉失敗則阻塞 public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //呼叫洗掉尾結點核心方法 while ((x = unlinkLast()) == null) //阻塞 notEmpty.await(); return x; } finally { lock.unlock(); } } //洗掉頭結點 - 加鎖,如果洗掉失敗則阻塞 可以指定阻塞時間 public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { //計算應阻塞時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; //呼叫洗掉頭結點核心方法 while ((x = unlinkFirst()) == null) { if (nanos <= 0) //阻塞時間過期,回傳結果 return null; //阻塞 并指定阻塞時間 nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } //洗掉尾結點 - 加鎖,如果洗掉失敗則阻塞 可以指定阻塞時間 public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { //計算應阻塞時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; //呼叫洗掉尾結點核心方法 while ((x = unlinkLast()) == null) { if (nanos <= 0) //阻塞時間過期,回傳結果 return null; //阻塞 并指定阻塞時間 nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } //洗掉頭結點 private E unlinkFirst() { //獲取當前頭結點 Node<E> f = first; //頭結點為空 回傳空 if (f == null) return null; //獲取頭結點的下一個結點 Node<E> n = f.next; //獲取頭結點元素(記錄return需要用到的洗掉了哪個元素) E item = f.item; //將頭結點元素置為null f.item = null; //將頭結點的下一個結點指向自己 方便gc f.next = f; //設定頭結點為原頭結點的下一個結點 first = n; //若原頭結點的下一個結點不存在(佇列中沒有了結點) if (n == null) //將尾結點也置為null last = null; else //新的頭結點的前一個結點指向null,因為他已經作為了頭結點 所以不需要指向上一個結點 n.prev = null; //當前數量減少 --count; //喚醒因添加元素時佇列容量滿導致阻塞的執行緒 notFull.signal(); //回傳原來的頭結點中的元素 return item; } //洗掉尾結點 private E unlinkLast() { //獲取當前尾結點 Node<E> l = last; //尾結點不存在 回傳null if (l == null) return null; //獲取當前尾結點的上一個結點 Node<E> p = l.prev; //獲取當前尾結點中的元素,需要回傳記錄 E item = l.item; //將當前尾結點的元素置為null l.item = null; //并將當前尾結點的上一個結點指向自己,方便gc l.prev = l; //設定新的尾結點,為原來尾結點的前一個結點 last = p; //若無新的尾結點,頭結點置為空(佇列中沒有了結點) if (p == null) first = null; else //將新的尾結點的下一個結點指向null,因為他已經為尾結點所以不需要指向下一個結點 p.next = null; //數量減少 --count; //喚醒因添加元素時佇列容量滿導致阻塞的執行緒 notFull.signal(); //回傳原來的尾結點元素 return item; }
LinkedBlockingDeque總結
【1】一個鏈表阻塞雙端無界佇列,可以指定容量,默認為 Integer.MAX_VALUE
【2】資料結構:鏈表(同LinkedBlockingQueue,內部類Node存盤元素)
【3】鎖:ReentrantLock(同ArrayBlockingQueue)存取是同一把鎖,操作的是同一個陣列物件
【4】阻塞物件(notEmpty【出隊:佇列count=0,無元素可取時,阻塞在該物件上】,notFull【入隊:佇列count=capacity,放不進元素時,阻塞在該物件上】)
【5】入隊,首尾都可以,均可以添加洗掉,
【6】出隊,首尾都可以,均可以添加洗掉,
【7】應用場景:常用于 “作業竊取演算法”,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/513667.html
標籤:其他
上一篇:羅馬數字轉整數
下一篇:阻塞佇列詳解
