LinkedBlockingQueue介紹
【1】LinkedBlockingQueue是一個基于鏈表實作的阻塞佇列,默認情況下,該阻塞佇列的大小為Integer.MAX_VALUE,由于這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界佇列,代表它幾乎沒有界限,佇列可以隨著元素的添加而動態增長,但是如果沒有剩余記憶體,則佇列將拋出OOM錯誤,所以為了避免佇列過大造成機器負載或者記憶體爆滿的情況出現,我們在使用的時候建議手動傳一個佇列的大小,
【2】LinkedBlockingQueue內部由單鏈表實作,只能從head取元素,從tail添加元素,LinkedBlockingQueue采用兩把鎖的鎖分離技術實作入隊出隊互不阻塞,添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以并行執行,
LinkedBlockingQueue使用
//指定佇列的大小創建有界佇列 BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100); //無界佇列 BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue的原始碼分析
【1】屬性值
// 容量,指定容量就是有界佇列 private final int capacity; // 元素數量,用原子操作類的原因在于有兩個執行緒都會操作需要保證可見性 private final AtomicInteger count = new AtomicInteger(); // 鏈表頭 本身是不存盤任何元素的,初始化時item指向null transient Node<E> head; // 鏈表尾 private transient Node<E> last; // take鎖 鎖分離,提高效率 private final ReentrantLock takeLock = new ReentrantLock(); // notEmpty條件 // 當佇列無元素時,take鎖會阻塞在notEmpty條件上,等待其它執行緒喚醒 private final Condition notEmpty = takeLock.newCondition(); // put鎖 private final ReentrantLock putLock = new ReentrantLock(); // notFull條件 // 當佇列滿了時,put鎖會會阻塞在notFull上,等待其它執行緒喚醒 private final Condition notFull = putLock.newCondition(); //典型的單鏈表結構 static class Node<E> { E item; //存盤元素 Node<E> next; //后繼節點 單鏈表結構 Node(E x) { item = x; } }
【2】建構式
public LinkedBlockingQueue() { // 如果沒傳容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化head和last指標為空值節點 last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 為保證可見性而加的鎖 try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
【3】核心方法分析
1)入隊put方法
public void put(E e) throws InterruptedException { // 不允許null元素 if (e == null) throw new NullPointerException(); int c = -1; // 新建一個節點 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 使用put鎖加鎖 putLock.lockInterruptibly(); try { // 如果佇列滿了,就阻塞在notFull上等待被其它執行緒喚醒(阻塞生產者執行緒) while (count.get() == capacity) { notFull.await(); } // 佇列不滿,就入隊 enqueue(node); c = count.getAndIncrement();// 佇列長度加1,回傳原值 // 如果現佇列長度小于容量,notFull條件佇列轉同步佇列,準備喚醒一個阻塞在notFull條件上的執行緒(可以繼續入隊) // 這里為啥要喚醒一下呢?因為存在情況是,沒人獲取時,佇列滿了而且還不斷有人塞資料,此時會一大批執行緒被阻塞,現在有空余位置了,應該被喚醒 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); // 真正喚醒生產者執行緒 } // 如果原佇列長度為0,現在加了一個元素后立即喚醒阻塞在notEmpty上的執行緒 if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { // 直接加到last后面,last指向入隊元素 last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();// 加take鎖 try { notEmpty.signal();// notEmpty條件佇列轉同步佇列,準備喚醒阻塞在notEmpty上的執行緒 } finally { takeLock.unlock(); // 真正喚醒消費者執行緒 } }
2)出隊take方法
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 使用takeLock加鎖 takeLock.lockInterruptibly(); try { // 如果佇列無元素,則阻塞在notEmpty條件上(消費者執行緒阻塞) while (count.get() == 0) { notEmpty.await(); } // 否則,出隊 x = dequeue(); c = count.getAndDecrement();//長度-1,回傳原值 if (c > 1)// 如果取之前佇列長度大于1,notEmpty條件佇列轉同步佇列,準備喚醒阻塞在notEmpty上的執行緒,原因與入隊同理 notEmpty.signal(); } finally { takeLock.unlock(); // 真正喚醒消費者執行緒 } // 為什么佇列是滿的才喚醒阻塞在notFull上的執行緒呢? // 因為喚醒是需要加putLock的,這是為了減少鎖的次數,所以,這里索性在放完元素就檢測一下,未滿就喚醒其它notFull上的執行緒, // 這也是鎖分離帶來的代價 // 如果取之前佇列長度等于容量(已滿),則喚醒阻塞在notFull的執行緒 if (c == capacity) signalNotFull(); return x; } private E dequeue() { // head節點本身是不存盤任何元素的 // 這里把head洗掉,并把head下一個節點作為新的值 // 并把其值置空,回傳原來的值 Node<E> h = head; Node<E> first = h.next; h.next = h; // 方便GC head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal();// notFull條件佇列轉同步佇列,準備喚醒阻塞在notFull上的執行緒 } finally { putLock.unlock(); // 解鎖,這才會真正的喚醒生產者執行緒 } }
LinkedBlockingQueue總結
【1】無界阻塞佇列,可以指定容量,默認為 Integer.MAX_VALUE,先進先出,存取互不干擾
【2】資料結構:鏈表(可以指定容量,默認為 Integer.MAX_VALUE,內部類Node存盤元素)
【3】鎖分離:存取互不干擾,存取操作的是不同的Node物件(takeLock【取Node節點保證前驅后繼不亂】,putLock【存Node節點保證前驅后繼不亂】,洗掉時則兩個鎖一起加)【這是最大的亮點】
【4】阻塞物件(notEmpty【出隊:佇列count=0,無元素可取時,阻塞在該物件上】,notFull【入隊:佇列count=capacity,放不進元素時,阻塞在該物件上】)
【5】入隊,從隊尾入隊,由last指標記錄,
【6】出隊,從隊首出隊,由head指標記錄,
【7】執行緒池中采用LinkedBlockingQueue而不采用ArrayBlockingQueue的原因便是因為鎖分離帶來了性能的提升,大大提高佇列的吞吐量,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/513665.html
標籤:其他
上一篇:選擇排序演算法步驟分析
下一篇:羅馬數字轉整數
