SynchronousQueue介紹
【1】SynchronousQueue是一個沒有資料緩沖的BlockingQueue,生產者執行緒對其的插入操作put必須等待消費者的移除操作take,

【2】如圖所示,SynchronousQueue 最大的不同之處在于,它的容量為 0,所以沒有一個地方來暫存元素,導致每次取資料都要先阻塞,直到有資料被放入;同理,每次放資料的時候也會阻塞,直到有消費者來取,
【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因為 SynchronousQueue 不需要去持有元素,它所做的就是直接傳遞(direct handoff),由于每當需要傳遞的時候,SynchronousQueue 會把元素直接從生產者傳給消費者,在此期間并不需要做存盤,所以如果運用得當,它的效率是很高的,
SynchronousQueue的原始碼分析
【1】建構式
//默認采用非公平 public SynchronousQueue() { this(false); } //可以選擇模式 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
【2】核心方法分析
//這些方法本質上都是呼叫屬性值transferer的transfer方法 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return transferer.transfer(null, true, 0); }
s
Transferer分析
【1】Transferer是SynchronousQueue的內部抽象類,雙堆疊和雙佇列演算法共享該類,他只有一個transfer方法,用于轉移元素,從生產者轉移到消費者;或者消費者呼叫該方法從生產者取資料,
【2】Transferer有兩個實作類:TransferQueue和TransferStack,
【3】這兩個類的區別就在于是否公平,TransferQueue是公平的,TransferStack非公平,
【4】原始碼展示
// 堆疊和佇列共同的介面,負責執行 put or take abstract static class Transferer<E> { // e 為空的,會直接回傳特殊值,不為慷訓傳遞給消費者 // timed 為 true,說明會有超時時間 abstract E transfer(E e, boolean timed, long nanos); }
TransferQueue分析
【1】節點元素
//佇列節點元素 static final class QNode { // 當前元素的下一個元素 volatile QNode next; // 當前元素的值,如果當前元素被阻塞住了,等其他執行緒來喚醒自己時,其他執行緒會把自己 set 到 item 里面 volatile Object item; // 可以阻塞住的當前執行緒 volatile Thread waiter; // 節點型別:true是 put,false是 take final boolean isData; .... }
【2】構造方法
//佇列頭結點指標 transient volatile QNode head; //佇列尾結點指標 transient volatile QNode tail; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
【3】核心方法
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
//根據是否傳入資料 判斷是獲取還是存放
boolean isData = https://www.cnblogs.com/chafry/p/(e != null);
for (;;) {
// 佇列頭和尾的臨時變數,佇列是空的時候,t=h
QNode t = tail;
QNode h = head;
// tail 和 head 沒有初始化時,無限回圈,雖然這種 continue 非常耗cpu,但感覺不會碰到這種情況
// 因為 tail 和 head 在 TransferQueue 初始化的時候,就已經被賦值空節點了
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 首尾節點相同,說明是空佇列
// 或者尾節點的操作和當前節點操作一致
if (h == t || t.isData =https://www.cnblogs.com/chafry/p/= isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) //直至拿到尾節點
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
//超時直接回傳 null
if (timed && nanos <= 0) // can't wait
return null;
//構建新節點
if (s == null)
s = new QNode(e, isData);
//將新建節點塞入佇列
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s);
// 阻塞住自己
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
// 佇列不為空,并且當前操作和隊尾不一致,也就是說當前操作是隊尾是對應的操作
// 比如說隊尾是因為 take 被阻塞的,那么當前操作必然是 put
else {
// 也就是這行代碼體現出佇列的公平,每次操作時,從頭開始按照順序進行操作
QNode m = h.next;
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData =https://www.cnblogs.com/chafry/p/= (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 當前操作放到隊頭
advanceHead(h, m);
// 釋放隊頭阻塞節點
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
TransferStack分析
【1】節點元素
// 堆疊中節點的幾種型別: // 1. 消費者(請求資料的) static final int REQUEST = 0; // 2. 生產者(提供資料的) static final int DATA = https://www.cnblogs.com/chafry/p/1; // 3. 二者正在匹配中 static final int FULFILLING = 2; // 堆疊中的節點 static final class SNode { // 下一個節點 volatile SNode next; volatile SNode match; // the node matched to this // 等待著的執行緒 volatile Thread waiter; Object item; // 模式,也就是節點的型別,是消費者,是生產者,還是正在匹配中 int mode; ... }
【2】核心方法
// TransferStack.transfer()方法 E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 根據e是否為null決定是生產者還是消費者 int mode = (e == null) ? REQUEST : DATA; // 自旋+CAS for (;;) { // 堆疊頂元素 SNode h = head; // 堆疊頂沒有元素,或者堆疊頂元素跟當前元素是一個模式的 // 也就是都是生產者節點或者都是消費者節點 if (h == null || h.mode == mode) { // empty or same-mode // 如果有超時而且已到期 if (timed && nanos <= 0) { // can't wait // 如果頭節點不為空且是取消狀態 if (h != null && h.isCancelled()) // 就把頭節點彈出,并進入下一次回圈 casHead(h, h.next); // pop cancelled node else // 否則,直接回傳null(超時回傳null) return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // 入堆疊成功(因為是模式相同的,所以只能入堆疊) // 呼叫awaitFulfill()方法自旋+阻塞當前入堆疊的執行緒并等待被匹配到 SNode m = awaitFulfill(s, timed, nanos); // 如果m等于s,說明取消了,那么就把它清除掉,并回傳null if (m == s) { // wait was cancelled clean(s); // 被取消了回傳null return null; } // 到這里說明匹配到元素了 // 因為從awaitFulfill()里面出來要不被取消了要不就匹配到了 // 如果頭節點不為空,并且頭節點的下一個節點是s // 就把頭節點換成s的下一個節點 // 也就是把h和s都彈出了 // 也就是把堆疊頂兩個元素都彈出了 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 根據當前節點的模式判斷回傳m還是s中的值 return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill // 到這里說明頭節點和當前節點模式不一樣 // 如果頭節點不是正在匹配中 // 如果頭節點已經取消了,就把它彈出堆疊 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配 // 且s成為了新的頭節點,它的狀態是正在匹配中 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 如果m為null,說明除了s節點外的節點都被其它執行緒先一步匹配掉了 // 就清空堆疊并跳出內部回圈,到外部回圈再重新入堆疊判斷 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // 如果m和s嘗試匹配成功,就彈出堆疊頂的兩個元素m和s if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m // 回傳匹配結果 return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 嘗試匹配失敗,說明m已經先一步被其它執行緒匹配了 // 就協助清除它 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 到這里說明當前節點和頭節點模式不一樣 // 且頭節點是正在匹配中 SNode m = h.next; // m is h's match if (m == null) // waiter is gone // 如果m為null,說明m已經被其它執行緒先一步匹配了 casHead(h, null); // pop fulfilling node else { SNode mn = m.next; // 協助匹配,如果m和s嘗試匹配成功,就彈出堆疊頂的兩個元素m和s if (m.tryMatch(h)) // help match // 將堆疊頂的兩個元素彈出后,再讓s重新入堆疊 casHead(h, mn); // pop both h and m else // lost match // 嘗試匹配失敗,說明m已經先一步被其它執行緒匹配了 // 就協助清除它 h.casNext(m, mn); // help unlink } } } } // 三個引數:需要等待的節點,是否需要超時,超時時間 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 到期時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 當前執行緒 Thread w = Thread.currentThread(); // 自旋次數 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 當前執行緒中斷了,嘗試清除s if (w.isInterrupted()) s.tryCancel(); // 檢查s是否匹配到了元素m(有可能是其它執行緒的m匹配到當前執行緒的s) SNode m = s.match; // 如果匹配到了,直接回傳m if (m != null) return m; // 如果需要超時 if (timed) { // 檢查超時時間如果小于0了,嘗試清除s nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) // 如果還有自旋次數,自旋次數減一,并進入下一次自旋 spins = shouldSpin(s) ? (spins-1) : 0; // 后面的elseif都是自旋次數沒有了 else if (s.waiter == null) // 如果s的waiter為null,把當前執行緒注入進去,并進入下一次自旋 s.waiter = w; // establish waiter so can park next iter else if (!timed) // 如果不允許超時,直接阻塞,并等待被其它執行緒喚醒,喚醒后繼續自旋并查看是否匹配到了元素 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 如果允許超時且還有剩余時間,就阻塞相應時間 LockSupport.parkNanos(this, nanos); } } // SNode里面的方向,呼叫者m是s的下一個節點 // 這時候m節點的執行緒應該是阻塞狀態的 boolean tryMatch(SNode s) { // 如果m還沒有匹配者,就把s作為它的匹配者 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; // 喚醒m中的執行緒,兩者匹配完畢 LockSupport.unpark(w); } // 匹配到了回傳true return true; } // 可能其它執行緒先一步匹配了m,回傳其是否是s return match == s; }
SynchronousQueue總結
【1】是一個沒有資料緩沖的BlockingQueue,容量為0,它不會為佇列中元素維護存盤空間,它只是多個執行緒之間資料交換的媒介,
【2】資料結構:鏈表,在其內部類中維護了資料
先消費(take),后生產(put);
第一個執行緒Thread0是消費者訪問,此時佇列為空,則入隊(創建Node結點并賦值)
第二個執行緒Thread1也是消費者訪問,與隊尾模式相同,繼續入隊
第三個執行緒Thread2是生產者,攜帶了資料e,與隊尾模式不同,不進行入隊操作,直接將該執行緒攜帶的資料e回傳給隊首的消費者,并喚醒隊首執行緒Thread1(默認非公平策略是堆疊結構),出隊,
反之,先生產(put)后消費(take),原理一樣
【3】鎖:CAS+自旋(無鎖)【阻塞:自旋了一定次數后呼叫 LockSupport.park()】
【4】存取呼叫同一個方法:transfer()
put、offer 為生產者,攜帶了資料 e,為 Data 模式,設定到 SNode或QNode 屬性中,
take、poll 為消費者,不攜帯資料,為 Request 模式,設定到 SNode或QNode屬性中,
【5】程序
執行緒訪問阻塞佇列,先判斷隊尾節點或者堆疊頂節點的 Node 與當前入隊模式是否相同
相同則構造節點 Node 入隊,并阻塞當前執行緒,元素 e 和執行緒賦值給 Node 屬性
不同則將元素 e(不為 null) 回傳給取資料執行緒,隊首或堆疊頂執行緒被喚醒,出隊
【6】公平模式:TransferQueue,隊尾匹配(判斷模式),隊頭出隊,先進先出
【7】非公平模式(默認策略):TransferStack,堆疊頂匹配,堆疊頂出堆疊,后進先出
【8】應用場景
SynchronousQueue非常適合傳遞性場景做交換作業,生產者的執行緒和消費者的執行緒同步傳遞某些資訊、事件或者任務,
SynchronousQueue的一個使用場景是在執行緒池里,如果我們不確定來自生產者請求數量,但是這些請求需要很快的處理掉,那么配合SynchronousQueue為每個生產者請求分配一個消費執行緒是處理效率最高的辦法,Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)創建新的執行緒,如果有空閑執行緒則會重復使用,執行緒空閑了60秒后會被回收,
Transferer
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/514061.html
標籤:Java
