佇列實質就是一種存盤資料的結構 通常用鏈表或者陣列實作 一般而言佇列具備FIFO先進先出的特性,當然也有雙端佇列(Deque)優先級佇列 主要操作:入隊(EnQueue)與出隊(Dequeue)
BlockingQueue
佇列實質就是一種存盤資料的結構 通常用鏈表或者陣列實作 一般而言佇列具備FIFO先進先出的特性,當然也有雙端佇列(Deque)優先級佇列 主要操作:入隊(EnQueue)與出隊(Dequeue)
1、ArrayBlockingQueue 由陣列支持的有界佇列
2、LinkedBlockingQueue 由鏈接節點支持的可選有界佇列
3、PriorityBlockingQueue 由優先級堆支持的無界優先級佇列
4、DelayQueue 由優先級堆支持的、基于時間的調度佇列
BlockingQueue底層依賴ReenTrantLock 和Condition(條件佇列只能在獨占模式下使用,共享模式不能用)(AQS框架實作)
ArrayBlockingQueue
BlockingQueue<Ball> blockingQueue = new ArrayBlockingQueue<Ball>(1);初始化原始碼分析
public ArrayBlockingQueue(int capacity) {
this(capacity, false); -------------false默認創建時非公平鎖
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];-------初始化陣列
lock = new ReentrantLock(fair);------創建非公平鎖
notEmpty = lock.newCondition(); ---------------初始化了兩個條件佇列,對應兩個條件對列notEmpty ,notFull
notFull = lock.newCondition();
}
// 兩個條件佇列如下圖:

為什么會有連兩個條件佇列,因為有put(生產者),和take(消費者) 兩個操作 put的時候要看(Notfull)是不是滿了,take(Notempty)是看是不是空了,
put生產者操作原始碼解讀
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //----先上來獲取獨占鎖,才能進入下面的邏輯,
try {
while (count == items.length)
notFull.await(); // 判斷當前佇列是不是已經滿了,如果count == items.length,那么這個下執行緒就要入條件佇列等待
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* 加入條件佇列等待,條件佇列入口
*/
public final void await() throws InterruptedException {
//如果當前執行緒被中斷則直接拋出例外
if (Thread.interrupted())
throw new InterruptedException();
//把當前節點加入條件佇列
Node node = addConditionWaiter();
//釋放掉已經獲取的獨占鎖資源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在同步佇列中則不斷掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//這里被喚醒可能是正常的signal操作也可能是中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 走到這里說明節點已經條件滿足被加入到了同步佇列中或者中斷了
* 這個方法很熟悉吧?就跟獨占鎖呼叫同樣的獲取鎖方法,從這里可以看出條件佇列只能用于獨占鎖
* 在處理中斷之前首先要做的是從同步佇列中成功獲取鎖資源
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//走到這里說明已經成功獲取到了獨占鎖,接下來就做些收尾作業
//洗掉條件佇列中被取消的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根據不同模式處理中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 這里的判斷邏輯是:
* 1.如果現在不是中斷的,即正常被signal喚醒則回傳0
* 2.如果節點由中斷加入同步佇列則回傳THROW_IE,由signal加入同步佇列則回傳REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 這個方法是把條件節點轉運到CLH同步佇列中并把信號量置為初始值0,回傳轉運成功與否標志,只有把條件節點轉運到同步佇列才又可能被喚醒,
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/139909.html
標籤:Java
上一篇:如何發布自己的專案到中央倉庫
