相關文章
面試官:你手寫過堵塞佇列嗎?
ArrayBlockingQueue講解及原始碼決議
1.概述
自java5后,jdk增加了concurrent包,concurrent中的BlockingQueue,也就是堵塞佇列,BlockingQueue只是一個介面,jdk為其提供了豐富的實作類,適用于不同的場景,這篇講的是LinkedBlockingQueue,
2.簡介
LinkedBlockingQueue繼承了AbstractQueue類和實作了BlockingQueue介面,是一個基于內部鏈表的有界佇列,如果初始化不設定大小,默認設定大小為Integer.MAX_VALUE(無界佇列),鎖是基于ReentrantLock實作,和ArrayBlockingQueue不同的是,其有兩個鎖物件,這就可以實作生產/消費并行,
3.應用
- 在實際應用中,應該設定大小,否則變為無界佇列,生產者速度大于消費者,則會導致記憶體溢位,
- 不支持null元素,否則報NullPointerException例外,
- LinkedBlockingQueue相較于ArrayBlockingQueue更適用于處理高并發,因為實作了鎖分離,可以更快的存盤資料,但是因為LinkedBlockingQueue多了Node,導致GC需要額外回收一個Node物件,
主要方法

1.插入資料
(1)offer(E e):如果佇列沒滿,回傳true,如果佇列已滿,回傳false(不堵塞),
(2)offer(E e, long timeout, TimeUnit unit):可以設定等待時間,如果佇列已滿,則進行等待,超過等待時間,則回傳false,
(3)put(E e):無回傳值,一直等待,直至佇列空出位置,
2.獲取資料
(1)poll():如果有資料,出隊,如果沒有資料,回傳null,
(2)poll(long timeout, TimeUnit unit):可以設定等待時間,如果沒有資料,則等待,超過等待時間,則回傳null,
(3)take():如果有資料,出隊,如果沒有資料,一直等待(堵塞),
4.為什么采用鏈表?
LinkedBlockingQueue通過Node實作了鏈表存盤,鏈表是可伸縮的,而陣列大小是不可變的, 在處理大資料量時,如果不可伸縮,佇列已滿后,會進行堵塞,如果通過陣列進行擴容,只能通過新陣列的方式,對資源消耗大,
5.為什么采用單向鏈表?
因為佇列是先進先出的,所以只需要知道下一個節點是誰就行,
6.原始碼決議
AbstractQueue
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
單向鏈表
//單向鏈表
static class Node<E> {
E item;
/**
* 下一個節點
*/
Node<E> next;
Node(E x) {
item = x;
}
}
成員變數
/**
* 佇列容量,默認Integer.MAX_VALUE
*/
private final int capacity;
/**
* 佇列大小
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 鏈表頭部
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* 鏈表尾部
* Invariant: last.next == null
*/
private transient Node<E> last;
/**
* 讀鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 讀條件
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 寫鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 寫條件
*/
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue采用的是雙鎖機制,可以并行執行,多執行緒環境下,必然存在并發問題,所以采用了AtomicInteger,可以實作原子操作,防止并發問題
建構式
/**
* 創建無界佇列
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 創建有界佇列
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* 創建一個無界佇列,并將傳入的集合賦值到當前佇列中
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
put
/**
* 插入元素
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//創建節點
Node<E> node = new Node<E>(e);
//獲取寫鎖
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果佇列已滿
while (count.get() == capacity) {
//堵塞生產者執行緒
notFull.await();
}
//向隊尾添加元素
enqueue(node);
//獲取元素數量
c = count.getAndIncrement();
//如果還可以繼續繼續添加,喚醒下一個消費者執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果佇列原來是空的,現在已經增加元素了,所以喚醒讀執行緒
if (c == 0)
signalNotEmpty();
}
/**
* 出隊信號,put和offer呼叫
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//喚醒消費執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 在佇列尾部增加元素
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
1.當佇列已滿,會進行堵塞
2.添加完元素后,如果佇列還可以繼續添加,則喚醒下一個生產者執行緒
3.如果佇列原來是空的,說明讀執行緒堵塞了,現在已經增加元素了,所以喚醒讀執行緒
offer
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//判斷元素是否為空
if (e == null) throw new NullPointerException();
//獲取等待時間
long nanos = unit.toNanos(timeout);
int c = -1;
//獲取寫鎖
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果佇列已滿
while (count.get() == capacity) {
//如果超過等待時間
if (nanos <= 0)
return false;
//生產執行緒堵塞nanos時間,也有可能被喚醒,如果超過nanos時間還未被喚醒,則nanos=0,再次回圈,就會回傳false
nanos = notFull.awaitNanos(nanos);
}
//向隊尾添加元素
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//如果還可以繼續繼續添加,喚醒下一個生產者執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果佇列原來是空的,現在已經增加元素了,所以喚醒讀執行緒
if (c == 0)
signalNotEmpty();
return true;
}
public boolean offer(E e) {
//判斷元素是否為空
if (e == null) throw new NullPointerException();
//判斷佇列是否已滿
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//獲取寫鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
//寫入元素
enqueue(node);
//獲取當前佇列大小
c = count.getAndIncrement();
//如果還可以繼續繼續添加,喚醒下一個生產者執行緒
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0,其實已經變為1了,所以喚醒消費執行緒
//TODO getAndIncrement()回傳的是原值,incrementAndGet()回傳的是原值+1
if (c == 0)
signalNotEmpty();
return c >= 0;
}
/**
* 出隊信號,put和offer呼叫
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//喚醒消費執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 在佇列尾部增加元素
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
offer方法有兩種,一種是堵塞offer方法,另一種是不堵塞offer方法
堵塞offer方法
1.當佇列已滿,會根據設定的堵塞時間進行堵塞,如果超過堵塞時間,還未被喚醒,則回傳false
2.添加完元素后,如果佇列還可以繼續添加,則喚醒下一個生產者執行緒
3.如果佇列原來是空的,說明讀執行緒堵塞了,現在已經增加元素了,所以喚醒讀執行緒
不堵塞offer方法
1.當佇列已滿 ,直接回傳false
2.添加完元素后,如果佇列還可以繼續添加,則喚醒下一個生產者執行緒
3.如果佇列原來是空的,說明讀執行緒堵塞了,現在已經增加元素了,所以喚醒讀執行緒
take
public E take() throws InterruptedException {
E x;
int c = -1;
//獲取佇列元素數量
final AtomicInteger count = this.count;
//獲取讀鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果佇列為空
while (count.get() == 0) {
//堵塞讀執行緒
notEmpty.await();
}
//從隊尾獲取元素
x = dequeue();
//獲取數量
c = count.getAndDecrement();
//如果佇列還有元素,喚醒下一個消費者執行緒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果原來佇列是滿的,喚醒生產者執行緒
if (c == capacity)
signalNotFull();
return x;
}
/**
*入隊信號,take和poll呼叫
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* 從佇列頭部獲取元素
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
1.當佇列為空 ,進行堵塞,直至被喚醒
2.獲取到元素后,如果佇列還可以繼續獲取,則喚醒下一個消費者執行緒
3.如果佇列原來是滿的,說明寫執行緒堵塞了,現在已經獲取元素,佇列可以繼續添加元素,所以喚醒寫執行緒
poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
//獲取等待時間
long nanos = unit.toNanos(timeout);
//獲取佇列元素數量
final AtomicInteger count = this.count;
//獲取讀鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果佇列為空
while (count.get() == 0) {
//如果超過等待時間
if (nanos <= 0)
return null;
//消費執行緒堵塞nanos時間,也有可能被喚醒,如果超過nanos時間還未被喚醒,則nanos=0,再次回圈,就會回傳false
nanos = notEmpty.awaitNanos(nanos);
}
//從隊尾獲取元素
x = dequeue();
c = count.getAndDecrement();
//如果佇列還有元素,喚醒下一個消費者執行緒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果原來佇列是滿的,喚醒生產者執行緒
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
//獲取佇列元素數量
final AtomicInteger count = this.count;
//如果佇列為空,回傳null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//獲取讀鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//如果佇列元素數量大于0
if (count.get() > 0) {
//從隊尾獲取元素
x = dequeue();
c = count.getAndDecrement();
//如果佇列還有元素,喚醒下一個消費者執行緒
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果原來佇列是滿的,喚醒生產者執行緒
if (c == capacity)
signalNotFull();
return x;
}
/**
*入隊信號,take和poll呼叫
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* 從佇列頭部獲取元素
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
poll方法有兩種,一種是堵塞poll方法,另一種是不堵塞poll方法
堵塞poll方法
1.當佇列為空 ,會根據設定的堵塞時間進行堵塞,如果超過堵塞時間,還未被喚醒,則回傳null
2.獲取到元素后,如果佇列還可以繼續獲取,則喚醒下一個消費者執行緒
3.如果佇列原來是滿的,說明寫執行緒堵塞了,現在已經獲取元素,佇列可以繼續添加元素,所以喚醒寫執行緒
不堵塞poll方法
1.當佇列為空 ,直接回傳null
2.獲取到元素后,如果佇列還可以繼續獲取,則喚醒下一個消費者執行緒
3.如果佇列原來是滿的,說明寫執行緒堵塞了,現在已經獲取元素,佇列可以繼續添加元素,所以喚醒寫執行緒
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/211101.html
標籤:java
上一篇:資料庫管理(填空題)
