從類注釋可以得到關于SynchronousQueue的資訊:
- 佇列不存盤資料,所以沒有大小,也無法迭代,沒有大小如何理解呢?即每次進行put值進去時, 必須等待相應的 consumer 拿走資料后才可以再次 put 資料,
- queue 對應 peek, contains, clear, isEmpty … 等方法其實是無效的,
- 佇列由兩種資料結構組成,分別是后入先出的堆疊和先入先出的佇列,堆疊是非公平的,佇列是公平的,
1.結構
SynchronousQueue 繼承關系,核心成員變數及主要建構式:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// Transferer 定義了transfer方法,put,take都是用的同一個transfer方法
abstract static class Transferer<E>{
// e為空的,會直接回傳特殊值,不為慷訓傳遞給消費者
abstract E transfer(E e, boolean timed, long nanos);
}
// 堆疊實作,后入先出(非公平)
static final class TransferStack<E> extends Transferer<E>{...}
// 佇列實作,先入先出(公平)
static final class TransferQueue<E> extends Transferer<E>{...}
// tranfer變數
private transient volatile Transferer<E> transferer;
//----------------------------------建構式---------------------------------
// 默認非公平
public SynchronousQueue() {
this(false);
}
// 公平用TransferQueue,非公平就用TransferStack
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
}
這里需要著重強調的是,SynchronousQueue 沒有使用鎖(synchronized 與 reentrantlock) ,因為鎖底層的佇列無法實作匹配,所以 SynchronousQueue 必須自己保證執行緒安全,并實作執行緒的調度:
- 通過CAS與自旋實作執行緒安全
- 直接對執行緒進行阻塞(park)與喚醒,其中有兩種策略,堆疊實作的非公平配對與佇列的公平配對
1.1 TransferStack(非公平=>FILO)
static final class TransferStack<E> extends Transferer<E>{
// 堆疊中元素,這是一個鏈式堆疊
static final class SNode{...}
// 堆疊頭指標
volatile SNode head;
// SNode的三種狀態:
// 1.REQUEST:執行的是take方法,相當于消費者
static final int REQUEST = 0;
// 2.DATA:執行的是put方法,相當于生產者
static final int DATA = 1;
// 3.FULFILLING:堆疊頭正在阻塞等待其他執行緒進行 put 或 take
static final int FULFILLING = 2;
//...
}
SNode
雖然 SynchronousQueue 的特性說的是里面是沒有元素,但這句話實際的意義是 SynchronousQueue 是一個不能 peek,contains 等操作的節點,但是里面是有一條鏈表來保存競爭的執行緒和資料,
也就是說,整個SynchronousQueue的運行機制也還是通過維護一個鏈表來實作的,當有并發時,通過判斷鏈表插入一端節點的型別(mode),從而確定是否進行交換,
static final class SNode {
// 當前執行緒
// 注:不是在創建SNode時設定,而是在awaitFulfill方法中沒匹配到需要休眠時才會設定
volatile Thread waiter;
// 當前執行緒資料
// 注:只有put的執行緒item才會有值,take的執行緒item=null,若take的執行緒要取出資料只能通過match指標為中介
Object item;
// 節點型別:REQUEST(0)-消費者(take) ,DATA(1)-生產者(put), FULFILLING(2)-交換中
int mode;
volatile SNode next;
// 很重要的節點,表示和本節點配對(match)的節點,有兩個作用:
// 1.判斷阻塞堆疊元素能被喚醒的時機
// 比如執行緒A在take時由于佇列為空被阻塞了,然后執行緒B進行了put操作,那么就會將A的match設定為B,表示可以將A喚醒了
// 2.作為take執行緒獲取資料交換的中介
// 比如當執行緒A喚醒后要回傳資料,那么就可以通過match找到B,從而拿到put的資料,這個邏輯可以在下面的transfer方法可以看到
volatile SNode match;
------------------------------------------------------------------------------------------------------------------
// 建構式,傳入item
// 注:一般不直接構造,而是呼叫封裝好的snode方法:SNode snode(SNode s, Object e, SNode next, int mode)
SNode(Object item) {
this.item = item;
}
------------------------------------------------------------------------------------------------------------------
// 通過CAS把val節點連接到cmp后面
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// tryMatch 非常重要的方法,兩個作用:
// 1 嘗試將引數節點s,賦給當前執行緒的配對節點match
// 2.喚醒被阻塞的堆疊頭的執行緒,醒后就能從 match 中得到本次操作 s
// 其中 s.item 記錄著本次的操作節點,也就是記錄本次操作的資料
boolean tryMatch(SNode s) {
if (match == null &&
// CAS改變match來進行匹配,成功的條件是當前節點的match屬性為null
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) {
// 將waiter置為null
waiter = null;
// 喚醒當前node的執行緒
LockSupport.unpark(w);
}
return true;
}
// 回傳是否配對成功
return match == s;
}
// 嘗試取消,就是把match換為自己,
// 自己匹配自己的前提是match為null,也就是說,如果已經匹配了,那么這個方法不能取消
// 一般在設定超時且過期后,會將其設為cancel
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// 判斷超時失效
// 和上一個方法對比,則可以知道,判斷match是不是自己
boolean isCancelled() {
return match == this;
}
// unsafe相關代碼...
}
transfer():進堆疊&出堆疊
transfer 將 take 和 put 兩個方法都揉在了一起,所以第一個問題是如何區分put 和 take?是通過引數 e 是否為null來區分,e不為 null 是 put,為null是 take,第二個問題是如何進行執行緒管理的,或者說如何使 put 和 take 的執行緒配對的?具體分為以下三種情況:
- 情況 1:佇列中還沒有資料 或 當前節點與堆疊頂節點同型別(同put或同take)
- 情況 1.1:要加入的e設定了超時時間,并且 e 進堆疊或者出堆疊要超時了
- 情況 1.1.1:堆疊頭不為null 且 堆疊頭已經超時失效,將堆疊頂置為第二個節點
- 情況 1.1.2:堆疊頭是空的,回傳null
- 情況 1.2:沒有設定新元素e的超時時間,或者設定了但未超時,
- 用e構造新節點s,使s.next=head,然后將s設為新的堆疊頭
- 阻塞等待與s匹配的節點m
- 若沒等到(s過期了),就呼叫clean洗掉s
- 若等到了,就讓s與m出堆疊,設定新的頭結點,并回傳
- 情況 1.1:要加入的e設定了超時時間,并且 e 進堆疊或者出堆疊要超時了
- 情況 2: 當前堆疊包含于給定節點模式互補的節點(比如堆疊頂是put時阻塞,而當前節點是take操作)
- 情況 2.1:堆疊頭已經被取消,將下一個節點置為堆疊頭
- 情況 2.2:可以將當前節點s打上"正在匹配"的標記,并設定為head
- 取s的下一個節點m,s與m不斷tryMatch
- 匹配成功,洗掉s與m,回傳item
- 匹配失敗,交換m與m.next
- 若堆疊完了都沒匹配到則退出自旋,進入3
- 情況 3:當動作2匹配失敗(可能同時執行緒3搶先完成了配對),幫助這個節點完成匹配和移除(出堆疊)的操作,然后繼續執行(主回圈),這部分代碼基本和動作2的代碼一樣,只是不會回傳節點的資料,
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// 判斷是put還是take:e 為空是take方法(REQUEST),不為空是put方法(DATA)
int mode = (e == null) ? REQUEST : DATA;
// 自旋,保證一定能成功
for (;;) {
// 拿出頭節點,有幾種情況
SNode h = head;
-----------------------------------------------------------------------------------------------------------------
// 情況 1:佇列中還沒有資料 || 當前節點與堆疊頂節點同型別(同put或同take)
if (h == null || h.mode == mode) {
// 情況 1.1:要加入的e設定了超時時間,并且 e 進堆疊或者出堆疊要超時了
if (timed && nanos <= 0) {
// 情況 1.1.1:堆疊頭不為null && 堆疊頭已經超時失效
if (h != null && h.isCancelled())
casHead(h, h.next); // 丟棄堆疊頭,把堆疊頭后一個元素作為堆疊頭
// 情況 1.1.2:堆疊頭是空的
else
return null; // 直接回傳 null
// 情況 1.2:沒有設定新元素e的超時時間,或者設定了但未超時
} else if (casHead(h, s = snode(s, e, h, mode))) { // 用e構造新節點s,使s.next=head,然后將s設為新的堆疊頭
// 阻塞等待,目的是等到與s匹配的SNode
SNode m = awaitFulfill(s, timed, nanos);
// 回傳m==s代表當前節s點已經超時了
if (m == s) {
clean(s); // 堆疊中無法直接洗掉s,所以呼叫clean
return null;
}
// 只有真正匹配到值才能走到這一步
// 堆疊不為空 && 堆疊二是s(因為等到的m此時是新堆疊頂)
if ((h = head) != null && h.next == s)
// 將s.next設定為head,表示將s和他的配對m出堆疊
casHead(h, s.next);
// 回傳item
// 注:這里回傳的是put的節點的資料,若是take節點那么需要通過中介m來獲取到資料
return (E) ((mode == REQUEST) ? m.item : s.item);
}
-----------------------------------------------------------------------------------------------------------------
// 情況 2: 當前堆疊包含于給定節點模式互補的節點(比如堆疊頂是put時阻塞,而當前節點是take操作)
} else if (!isFulfilling(h.mode)) {
// 情況 2.1:堆疊頭已經被取消
if (h.isCancelled())
casHead(h, h.next); // 把下一個元素作為堆疊頭
// 情況 2.2:可以將當前節點s打上"正在匹配"的標記,并設定為head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 自旋,直到配對
for (;;) {
// m和s是正在匹配兩個節點
// 注:此時m不一定是之前的堆疊頂,因為這段時間可能又有節點進入或者之前的堆疊頂已被先一步匹配走了
SNode m = s.next; // m is s's match
// 堆疊遍歷完了,都沒有找到
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 獲取m的next節點,因為如果s和m匹配成功,mn就得補上head的位置了
// 注:雖然后面m可能會與mn交換,但在每輪回圈中mn一定是堆疊中第三個節點
SNode mn = m.next;
// 呼叫 tryMatch 讓 m和s 配對
// 注:這里呼叫tryMatch給s配對時,會喚醒阻塞在awaitFulfill方法的執行緒m,若配對失敗m還會回到阻塞狀態
if (m.tryMatch(s)) {
// 配對成功,彈出s與m,將head置為mn
casHead(s, mn);
// 回傳put的資料,若是take節點那么需要通過中介m來獲取到資料
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 若配對失敗,將m與m.next交換,開始下一輪匹配
s.casNext(m, mn);
}
}
-----------------------------------------------------------------------------------------------------------------
// 情況3:上面匹配失敗,可能是同時又執行緒3提前完成了配對
} else {
SNode m = h.next;
// 堆疊里面沒有任何等待者了,其他節點把m匹配走了
if (m == null)
casHead(h, null); // pop fulfilling node
else {
// 如果m和h匹配成功,則mn就成為新head了,
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
awaitFulfill():等待匹配節點
awaitFulfill 作用是阻塞等待匹配的節點,但不是一上來就阻塞住,而是在自旋一定次數后,仍然沒有其它執行緒來滿足自己的要求時,才會真正的阻塞住,等待其他執行緒transfer后tryMatch
- 計算死亡時間 deadline(時間戳)與自旋次數 spains
- 自旋,每次回圈都判斷s是否獲得到match
- 達到自旋次數,park當前執行緒(定時)
- 執行緒被喚醒時機:
- tryMatch喚醒:其余執行緒在transfer中遍歷堆疊時呼叫 tryMatch喚醒,若匹配成功 return 相應 m,否則重新回到阻塞
- 超時喚醒:return s(自己)
Node awaitFulfill(SNode s, boolean timed, long nanos) {
// deadline 死亡時間,如果設定了超時時間的話,死亡時間等于當前時間 + 超時時間,否則就是 0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當前執行緒
Thread w = Thread.currentThread();
// 自旋的次數,如果設定了超時時間,會自旋 32 次,否則自旋 512 次
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 當前執行緒有無被打斷,如果過了超時時間,當前執行緒就會被打斷
if (w.isInterrupted())
s.tryCancel();
// 嘗試獲取當前節點的match
SNode m = s.match;
// 該函式的唯一出口,一定要匹配到值
// 回傳的 m==s 表示超時取消,回傳的 m!=s 表確實匹配到了
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
// 超時了,取消當前執行緒的等待操作
if (nanos <= 0L) {
// 呼叫cancel,使m=s
s.tryCancel();
continue;
}
}
// 如果沒到自旋次數,那么自旋次數減少 1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //
// 如果s沒設定waiter,那么把當前執行緒設定成 waiter
else if (s.waiter == null)
s.waiter = w;
// 如果沒有設定超時,那么直接park當前執行緒
else if (!timed)
LockSupport.park(this); // 當被unpark喚醒時也是在此處,繼續回圈
// 如果設定了超時,呼叫帶有nanos超時時間的park
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
clean():清除過期節點
清除處于堆疊中的過期節點s,大致程序如下:
- 找到s的下一個沒有cancel的節點past
- 判斷head是否cancel
- 從head遍歷連接past
void clean(SNode s) {
s.item = null; // forget item 把item和waiter都置空
s.waiter = null; // forget thread
// 獲得下一個SNode
SNode past = s.next;
// 如果past被cancell了,那么就再past一個
if (past != null && past.isCancelled())
past = past.next;
// 從頭節點開始清除
SNode p;
// 把頭節點鏈接到下一個節點,節點不能為cancelled
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
while (p != null && p != past) {
// 在去除鏈接頭節點以后的節點,同樣也不能為null,
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
1.2 TransferQueue(公平=>FIFO)
static final class TransferQueue<E> extends Transferer<E>{
// 佇列頭
transient volatile QNode head;
// 佇列尾
transient volatile QNode tail;
// 佇列的元素
static final class QNode {...}
//...
}
QNode
static final class QNode {
// 當前元素的值,如果當前元素被阻塞住了,等其他執行緒來喚醒自己時,其他執行緒會把自己 set 到 item 里面
volatile Object item; // CAS'ed to or from null
// 可以阻塞住的當前執行緒
volatile Thread waiter; // to control park/unpark
// true 是 put,false 是 take
final boolean isData;
// 當前元素的下一個元素
volatile QNode next;
// 構造時傳入資料和節點型別
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 將節點val通過cas連接在cmp后面
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 將item的值通過cas變為val
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 被取消的節點就是item=this
boolean isCancelled() {
return item == this;
}
// unsafe相關代碼...
}
transfer():入隊&出隊
當前執行緒是如何把自己的資料傳給阻塞執行緒的?為了方便說明,我們假設執行緒 1 往佇列中 take 資料 ,被阻塞住了,變成阻塞執行緒 A ,然后執行緒 2 開始往佇列中 put 資料 B,大致的流程是這樣的:
- 執行緒 1 從佇列中拿資料,發現佇列中沒有資料,于是被阻塞,成為 A ;
- 執行緒 2 往隊尾 put 資料,會從隊尾往前找到第一個被阻塞的節點,假設此時能找到的就是節點 A,然后執行緒 B 把將 put 的資料放到節點 A 的 item 屬性里面,并喚醒執行緒 1;
- 執行緒 1 被喚醒后,就能從 A.item 里面拿到執行緒 2 put 的資料了,執行緒 1 成功回傳,
從這個程序中,我們能看出公平主要體現在,每次 put 資料的時候,都 put 到隊尾上,而每次拿資料時,并不是直接從隊頭拿資料,而是從隊尾往前尋找第一個被阻塞的執行緒,這樣就會按照順序釋放被阻塞的執行緒,
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// true 是 put,false 是 get
boolean isData = (e != null);
for (;;) {
// 佇列頭和尾的臨時變數,佇列是空的時候,t=h
QNode t = tail;
QNode h = head;
// tail 和 head 沒有初始化時,無限回圈
// 雖然這種continue非常耗cpu,但一般碰不到這種情況,因為tail和head 在 TransferQueue 初始化時就已經被賦值空節點了
if (t == null || h == null)
continue;
-----------------------------------------------------------------------------------------------------------------
// 情況一:首尾節點相同(空佇列)|| 尾節點的操作和當前節點操作一致(比如隊尾是take時阻塞,當前執行緒也是take)
if (h == t || t.isData == isData) {
QNode tn = t.next;
// 當 t 不是 tail 時,即 tail 已經被修改過了,因為 tail 沒有被修改的情況下,t 和 tail 必然相等
if (t != tail)
continue;
// 隊尾后面的值還不為空,t 還不是隊尾,直接把 tn 賦值給 t,這是一步加強校驗,
if (tn != null) {
// CAS修改tail為tn
advanceTail(t, tn);
continue;
}
// 超時直接回傳 null
if (timed && nanos <= 0) // can't wait
return null;
// 構造node節點
if (s == null)
s = new QNode(e, isData);
// 如果把 e 放到隊尾失敗,繼續遞回放進去
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
// awaitFulfill 同 TransferStack,阻塞住自己,等待配對節點x
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
// CAS修改head為s
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
-----------------------------------------------------------------------------------------------------------------
// 情況二:佇列不為空,并且當前操作和隊尾不一致(比如隊尾是因為 take 被阻塞的,那么當前操作必然是 put)
} else { // complementary-mode
// 如果是第一次執行,此處的 m 代表就是 tail
// 也就是這行代碼體現出佇列的公平,每次操作時,從頭開始按照順序進行操作
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
// m 代表堆疊頭
// 這里把當前的操作值賦值給阻塞住的 m 的 item 屬性,所以 m 被釋放時,就可得到此次操作的值
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 當前操作放到隊頭
advanceHead(h, m); // successfully fulfilled
// 釋放隊頭阻塞節點
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
2.方法決議 & api
SynchronousQueue 的方法都很簡單,因為已經封裝好了兩種 Transfer 的實作,TransferStack 和 TransferQueue,所以后面的方法直接呼叫就行,
2.1 放入:put
將新元素放進佇列,直到有另外一個執行緒從佇列中取走,成功結束,失敗打斷執行緒
public void put(E e) throws InterruptedException {
// e為空,拋例外
if (e == null) throw new NullPointerException();
// 呼叫transfer方法,傳入e
// 一直等待
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
2.2 取出:take
從佇列頭拿資料并洗掉資料,成功回傳,失敗打斷執行緒
public E take() throws InterruptedException {
// 呼叫transfer方法,傳入null
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
2.3 容量相關方法
與容量相關方法都是默認實作,即寫死的,
peek()
public E peek() {
return null;
}
remove()
public boolean remove(Object o) {
return false;
}
contains()
public boolean contains(Object o) {
return false;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/145362.html
標籤:其他
上一篇:ad域中巡檢統計資料用到的命令列
下一篇:微信公眾號禁用右上角分享
