DelayQueue介紹
【1】DelayQueue 是一個支持延時獲取元素的阻塞佇列, 內部采用優先佇列 PriorityQueue 存盤元素,同時元素必須實作 Delayed 介面;在創建元素時可以指定多久才可以從佇列中獲取當前元素,只有在延遲期滿時才能從佇列中提取元素,延遲佇列的特點是:不是先進先出,而是會按照延遲時間的長短來排序,下一個即將執行的任務會排到佇列的最前面,注意:不能將null元素放置到這種佇列中,
DelayQueue使用
DelayQueue<Delayed> delayeds = new DelayQueue<>();
DelayQueue的原始碼分析
【1】繼承關系分析
class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> //放入的元素必須實作 Delayed 介面,而 Delayed 介面又繼承了 Comparable 介面,所以自然就擁有了比較和排序的能力 public interface Delayed extends Comparable<Delayed> { //getDelay 方法回傳的是“還剩下多長的延遲時間才會被執行”, //如果回傳 0 或者負數則代表任務已過期, //元素會根據延遲時間的長短被放到佇列的不同位置,越靠近佇列頭代表越早過期, long getDelay(TimeUnit unit); } public interface Comparable<T> { //比較回傳結果(一般采用0或1) public int compareTo(T o); }
【2】屬性值
//用于保證佇列操作的執行緒安全 private final transient ReentrantLock lock = new ReentrantLock(); // 優先級佇列,存盤元素,用于保證延遲低的優先執行,其中比較的值是時間 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 用于標記當前是否有執行緒在排隊(僅用于取元素時) leader 指向的是第一個從佇列獲取元素阻塞的執行緒 private Thread leader = null; // 條件,用于表示現在是否有可取的元素 當新元素到達,或新執行緒可能需要成為leader時被通知 private final Condition available = lock.newCondition();
【3】建構式
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; } public boolean add(E e) { return offer(e); }
【4】核心方法分析
1)入隊put方法
public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入隊 q.offer(e); if (q.peek() == e) { // 若入隊的元素位于佇列頭部,說明當前元素延遲最小,將 leader 置空 //為什么要置空,要結合take方法,leader有值說明它之前獲得了頭節點,但是頭節點時間還沒到期(故需要休眠一定的時間【距離頭節點到期的時間】) //此時頭結點更新了,所以之前持有頭節點的執行緒作廢了,不應該阻止其他執行緒繼續搶占,應該大家一起搶,從新定義頭節點的歸屬 leader = null; // available條件佇列轉同步佇列,準備喚醒阻塞在available上的執行緒 available.signal(); } return true; } finally { lock.unlock(); // 解鎖,真正喚醒阻塞的執行緒 } }
2)出隊take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek();// 取出堆頂元素( 最早過期的元素,但是不彈出物件) if (first == null)// 如果堆頂元素為空,說明佇列中還沒有元素,直接阻塞等待 available.await();//當前執行緒無限期等待,直到被喚醒,并且釋放鎖, else { long delay = first.getDelay(NANOSECONDS);// 堆頂元素的到期時間 if (delay <= 0)// 如果小于0說明已到期,直接呼叫poll()方法彈出堆頂元素 return q.poll(); // 如果delay大于0 ,則下面要阻塞了 // 將first置為空方便gc first = null; // 如果有執行緒爭搶的Leader執行緒,則進行無限期等待, if (leader != null) available.await(); else { // 如果leader為null,把當前執行緒賦值給它 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待剩余等待時間 available.awaitNanos(delay); } finally { // 如果leader還是當前執行緒就把它置為空,讓其它執行緒有機會獲取元素 if (leader == thisThread) leader = null; } } } } } finally { // 成功出隊后,如果leader為空且堆頂還有元素,就喚醒下一個等待的執行緒 if (leader == null && q.peek() != null) // available條件佇列轉同步佇列,準備喚醒阻塞在available上的執行緒 available.signal(); // 解鎖,真正喚醒阻塞的執行緒 lock.unlock(); } }
DelayQueue總結
【1】一個使用優先級佇列實作的無界阻塞佇列
【2】資料結構:PriorityQueue(與PriorityBlockingQueue類似,不過沒有阻塞功能)
【3】阻塞物件:Condition available
【4】鎖:ReentrantLock
【5】入隊:不阻塞,無界佇列,與優先級佇列入隊相同,available
【6】出隊:1.為空時阻塞 ,2.檢查堆頂元素過期時間【小于等于0則出隊,大于0,說明沒過期,則阻塞(判斷leader執行緒是否為空【為了保證優先級】,不為空(已有執行緒阻塞),直接阻塞,為空,則將當前執行緒置為leader,并按照過期時間進行阻塞)】
【7】應用場景(只能說適用,但一般不會用這個):
1.商城訂單超時關閉:淘寶訂單業務:下單之后如果三十分鐘之內沒有付款就自動取消訂單
2.異步短信通知功能:餓了么訂餐通知:下單成功后60s之后給用戶發送短信通知,
3.關閉空閑連接,服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉,
4.快取過期清除,快取中的物件,超過了存活時間,需要從快取中移出,
5.任務超時處理,在網路協議滑動視窗請求應答式互動時,處理超時未回應的請求等,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/513740.html
標籤:其他
上一篇:面試官:說說 String.intern() 和常量池?不同 JDK 版本有什么區別?
下一篇:java常用注解校驗引數
