PriorityBlockingQueue介紹
【1】PriorityBlockingQueue是一個無界的基于陣列的優先級阻塞佇列,陣列的默認長度是11,也可以指定陣列的長度,且可以無限的擴充,直到資源消耗盡為止,每次出隊都回傳優先級別最高的或者最低的元素,默認情況下元素采用自然順序升序排序,當然我們也可以通過建構式來指定Comparator來對元素進行排序,需要注意的是PriorityBlockingQueue不能保證同優先級元素的順序,
【2】優先級佇列PriorityQueue: 佇列中每個元素都有一個優先級,出隊的時候,優先級最高的先出,
PriorityBlockingQueue的原始碼分析
【1】屬性值
//默認容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //最大容量設定 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //存放資料的陣列 private transient Object[] queue; //元素個數 private transient int size; //排序規則(比較器) private transient Comparator<? super E> comparator; //獨占鎖 private final ReentrantLock lock; //佇列為空的時候的阻塞佇列 private final Condition notEmpty; //用于分配的CAS自旋鎖 private transient volatile int allocationSpinLock; //只用于序列化的普通優先佇列 private PriorityQueue<E> q;
【2】建構式
//沒有指定容量,則容量默認11 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //有指定容量則容量為指定數值 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //初始化所需要的屬性值 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
【3】核心方法分析
1)核心擴容函式
//擴容函式 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //必須釋放然后重新獲得主鎖,這一步的意義在于所有操作共享一把鎖,在進行擴容時(因為寫已滿),釋放鎖(不能寫,要等待擴容完才能寫),提供給讀操作 Object[] newArray = null; // CAS 輕量級鎖加鎖,避免并發擴容 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 擴容步長,舊值小于64時,變為兩倍+2,大于64時,變為1.5倍, int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // 超過最大容量,記憶體溢位 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 創建新陣列 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 并發擴容時,執行緒讓出 cpu 執行時間,給其他執行緒執行,自己稍后執行,原因:加鎖不成功必然有其他執行緒也在擴容,在等待程序中讓出資源給其他執行緒利用 if (newArray == null) Thread.yield(); //重新加鎖 lock.lock(); //變更記憶體指向,利用記憶體拷貝復制舊資料 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
2)入隊方法
public void put(E e) { offer(e); // never need to block } public boolean add(E e) { return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //自旋擴容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //根據比較器采用填充的方式 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
4)佇列填入方式
代碼展示
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } //雷同上面的不過比較器采用傳入的 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
圖解說明
1.利用了滿二叉樹的理念,(k - 1) >>> 1可以獲得存入節點的父節點下標,然后進行比較,若判斷應該上升,則將父節點置于k存盤的地方,
2.重復回圈,知道二叉樹root節點,或者已經找到了合適的位置

4)出隊方法
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; //將頭節點取出回傳 E result = (E) array[0]; //將末尾節點當做向頭節點位置存入的節點 E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
5)佇列修正方式
代碼展示
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //過濾掉最后一層的元素(以為滿二叉樹中,最后一層占據的元素就是n/2) int half = n >>> 1; // loop while a non-leaf while (k < half) { //獲取頭節點的左節點 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; //進行左右節點的比較,取小的一邊作為比較,以為優先佇列要確保頭節點是最小的(換而言之,保證子樹下面的頭節點為子樹里面最小的即可) if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n,Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
圖解說明
1.采用尾結點代替頭節點,然后利用下沉的方式來修正優先佇列里面的資料,
2.下沉限制在倒數第二層,以為倒數第二層就會與倒數第一層進行比較了,所以應該進行限制(下標位置超出倒數第二層的最大下標就應該停止了)
3.其次看的時候,可以把左右子樹都當做一個小的滿二叉樹,不斷逐層劃分,這樣條理更清晰,

PriorityBlockingQueue總結
【1】一個支持優先級排序的無界阻塞佇列,優先級高的先出隊,優先級低的后出隊
【2】資料結構:陣列+二叉堆(默認容量11,可指定初始容量,會自動擴容,最大容量是(Integer.MAX_VALUE - 8))
【3】鎖:ReentrantLock,存取是同一把鎖
【4】阻塞物件:NotEmpty,出隊,佇列為空時阻塞
【5】入隊,不阻塞,永遠回傳成功,無界;根據比較器進行堆化(排序)自下而上,如果比較器為 null,則按照自然順序排序,傳入比較器物件就按照比較器的順序排序,
【6】出隊,優先級最高的元素在堆頂(彈出堆頂元素),彈出前比較兩個子節點再進行堆化(自上而下),
【7】應用場景:1.業務辦理排隊叫號,VIP客戶插隊;2.電商搶購活動,會員級別高的用戶優先搶購到商品;
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/514060.html
標籤:Java
上一篇:java-多執行緒與并發
