文章目錄
- 1. LinkedBlockingQueue 原理
- 1.1 基本入隊出隊
- 入隊
- 出隊
- 1.2 加鎖分析
- put 操作
- take 操作
- 1.3 與ArrayBlockingQueue的性能比較
- 2. ConcurrentLinkedQueue 原理
- 3. CopyOnWriteArrayList
- get 弱一致性
- 迭代器弱一致性
1. LinkedBlockingQueue 原理
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// Node為LinkedBlockingQueue內部的節點
static class Node<E> { E item;
/**
* 下列三種情況之一
* - 真正的后繼節點
* - 自己, 發生在出隊時
* - null, 表示是沒有后繼節點, 是最后了
*/
Node<E> next;
Node(E x) { item = x; }
}
}
初始化鏈表 last = head = new Node(null); Dummy 節點用來占位,item 為 null

1.1 基本入隊出隊
入隊
當一個節點入隊 last = last.next = node

再來一個節點入隊 last = last.next = node;

出隊
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
h = head

first = h.next

h.next = h

head = first

E x = first.item;
first.item = null;
return x;

1.2 加鎖分析
高明之處在于用了兩把鎖和 dummy 節點
- 用一把鎖,同一時刻,最多只允許有一個執行緒(生產者或消費者,二選一)執行
- 用兩把鎖,同一時刻,可以允許兩個執行緒同時(一個生產者與一個消費者)執行
- 消費者與消費者執行緒仍然串行
- 生產者與生產者執行緒仍然串行
- 執行緒安全分析
- 當節點總數大于 2 時(包括
dummy節點),putLock保證的是last節點的執行緒安全,takeLock保證的是head 節點的執行緒安全,兩把鎖保證了入隊和出隊沒有競爭 - 當節點總數等于 2 時(即一個
dummy節點,一個正常節點)這時候,仍然是兩把鎖鎖兩個物件,不會競爭 - 當節點總數等于 1 時(就一個
dummy節點)這時take執行緒會被notEmpty條件阻塞,有競爭,會阻塞
- 當節點總數大于 2 時(包括
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
// 用戶 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();
put 操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// count 用來維護元素計數
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 滿了等待
while (count.get() == capacity) {
// 倒過來讀就好: await notFull
notFull.await();
}
// 有空位, 入隊且計數加一
enqueue(node);
// 回傳的是c加1之前的數值
c = count.getAndIncrement();
// 除了自己 put 以外, 如果佇列還有空位, 由自己叫醒其他 put 執行緒
// 這里與其它地方不同的是,put執行緒的喚醒是由其它put執行緒喚醒的;
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果佇列中只有一個元素, 叫醒 take 執行緒
if (c == 0)
// 這里呼叫的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是為了減少競爭
signalNotEmpty();
}
take 操作
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 喚醒其它take執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果佇列中只有一個空位時, 叫醒 put 執行緒
// 如果有多個執行緒進行出隊, 第一個執行緒滿足 c == capacity, 但后續執行緒 c < capacity
if (c == capacity)
// 這里呼叫的是 notFull.signal() 而不是 notFull.signalAll() 是為了減少競爭
signalNotFull();
return x;
}
1.3 與ArrayBlockingQueue的性能比較
主要列舉 LinkedBlockingQueue 與 ArrayBlockingQueue 的性能比較
- Linked 支持有界,Array 強制有界
- Linked 實作是鏈表,Array 實作是陣列
- Linked 是懶惰的,而 Array 需要提前初始化 Node 陣列
- Linked 每次入隊會生成新 Node,而 Array 的 Node 是提前創建好的
- Linked 兩把鎖,Array 一把鎖
2. ConcurrentLinkedQueue 原理
ConcurrentLinkedQueue 的設計與 LinkedBlockingQueue 非常像,也是
- 兩把【鎖】,同一時刻,可以允許兩個執行緒同時(一個生產者與一個消費者)執行
- dummy 節點的引入讓兩把【鎖】將來鎖住的是不同物件,避免競爭
- 只是這【鎖】使用了 cas 來實作
事實上,ConcurrentLinkedQueue 應用還是非常廣泛的
例如之前講的 Tomcat 的 Connector 結構時,Acceptor 作為生產者向 Poller 消費者傳遞事件資訊時,正是采用了ConcurrentLinkedQueue 將 SocketChannel 給 Poller 使用

3. CopyOnWriteArrayList
CopyOnWriteArraySet 是它的馬甲
底層實作采用了 寫入時拷貝 的思想,增刪改操作會將底層陣列拷貝一份,更改操作在新陣列上執行,這時不影響其它執行緒的并發讀,讀寫分離, 以新增為例
public boolean add(E e) {
synchronized (lock) {
// 獲取舊的陣列
Object[] es = getArray();
int len = es.length;
// 拷貝新的陣列(這里是比較耗時的操作,但不影響其它讀執行緒)
es = Arrays.copyOf(es, len + 1);
// 添加新元素
es[len] = e;
// 替換舊的陣列
setArray(es);
return true;
}
}
這里的原始碼版本是 Java 11,在 Java 1.8 中使用的是可重入鎖而不是 synchronized
其它讀操作并未加鎖,例如:
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
for (Object x : getArray()) {
@SuppressWarnings("unchecked") E e = (E) x;
action.accept(e);
}
}
適合『讀多寫少』的應用場景
get 弱一致性

| 時間點 | 操作 |
|---|---|
| 1 | Thread-0 getArray() |
| 2 | Thread-1 getArray() 先得到陣列再進行拷貝 |
| 3 | Thread-1 setArray(arrayCopy) 修改完陣列再將陣列設定回去 |
| 4 | Thread-0 array[index] 讀到的還是舊陣列的資訊,就是說array[0]雖然被Thread-1洗掉了,但是還是能Thread-0可以讀取到 |
不容易測驗,但問題確實存在
迭代器弱一致性
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator<Integer> iter = list.iterator();
new Thread(() -> {
list.remove(0);
System.out.println(list);
}).start();
sleep1s();
while (iter.hasNext()) {
System.out.println(iter.next());
}
不要覺得弱一致性就不好
- 資料庫的 MVCC 都是弱一致性的表現
- 并發高和一致性是矛盾的,需要權衡
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/252083.html
標籤:其他
下一篇:pta linux 題目
