ArrayBlockingQueue介紹
ArrayBlockingQueue是最典型的有界阻塞佇列,其內部是用陣列存盤元素的,初始化時需要指定容量大小,利用 ReentrantLock 實作執行緒安全,
在生產者-消費者模型中使用時,如果生產速度和消費速度基本匹配的情況下,使用ArrayBlockingQueue是個不錯選擇;當如果生產速度遠遠大于消費速度,則會導致佇列填滿,大量生產執行緒被阻塞,
使用獨占鎖ReentrantLock實作執行緒安全,入隊和出隊操作使用同一個鎖物件,也就是只能有一個執行緒可以進行入隊或者出隊操作;這也就意味著生產者和消費者無法并行操作,在高并發場景下會成為性能瓶頸,
ArrayBlockingQueue的原始碼分析
【1】屬性值
/** 佇列元素陣列 */ final Object[] items; /** 下一個被take,poll,peek,remove的元素位置 */ int takeIndex; /** 插入位置包含put,offer,add */ int putIndex; /** 佇列元素的數量 */ int count; /** 重入鎖 */ final ReentrantLock lock; /** 等待獲取的條件佇列 */ private final Condition notEmpty; /** 等待插入的條件佇列 */ private final Condition notFull; //迭代器的共享狀態 transient Itrs itrs = null;
【2】建構式
//默認采用非公平鎖 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) { //初始化阻塞佇列 this(capacity, fair); //加鎖將陣列元素填入阻塞佇列(主要是考慮到重排序和可見性問題,因為Object[] items 并沒有加上 volatile 屬性) final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //將插入位置下變更 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
【3】核心方法分析
1)入隊put方法
public void put(E e) throws InterruptedException { //檢查是否為空 checkNotNull(e); final ReentrantLock lock = this.lock; //加鎖,如果執行緒中斷拋出例外 lock.lockInterruptibly(); try { //阻塞佇列已滿,則將生產者掛起,等待消費者喚醒 //設計注意點: 用while不用if是為了防止虛假喚醒 while (count == items.length) notFull.await(); //佇列滿了,使用notFull等待(生產者阻塞) // 入隊 enqueue(e); } finally { lock.unlock(); // 喚醒消費者執行緒 } } private void enqueue(E x) { final Object[] items = this.items; //入隊 使用的putIndex items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //設計的精髓: 環形陣列,putIndex指標到陣列盡頭了,回傳頭部 count++; //notEmpty條件佇列轉同步佇列,準備喚醒消費者執行緒,因為入隊了一個元素,肯定不為空了 notEmpty.signal(); }
2)出隊take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加鎖,如果執行緒中斷拋出例外 lock.lockInterruptibly(); try { //如果佇列為空,則消費者掛起 while (count == 0) notEmpty.await(); //出隊 return dequeue(); } finally { lock.unlock();// 喚醒生產者執行緒 } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //取出takeIndex位置的元素 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; //設計的精髓: 環形陣列,takeIndex 指標到陣列盡頭了,回傳頭部 count--; if (itrs != null) itrs.elementDequeued(); //notFull條件佇列轉同步佇列,準備喚醒生產者執行緒,此時佇列有空位 notFull.signal(); return x; }
3)其余offer&poll&peek&remove方法
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } //本質區別在于設定了超時時間,超時后選擇不加入,回傳false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //生產執行緒堵塞nanos時間,也有可能被喚醒,如果超過nanos時間還未被喚醒,則nanos=0,再次回圈,就會回傳false nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //本質區別在于設定了超時時間,超時后選擇不獲取,回傳null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //通過下標查找直接回傳 return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; } public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } void removeAt(final int removeIndex) { final Object[] items = this.items; if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); }
ArrayBlockingQueue總結
【1】有界阻塞佇列,先進先出,存取相互排斥
【2】資料結構:靜態陣列(容量固定須指定長度,沒有擴容機制,沒有元素的位置也占用空間,被null占位)
【3】ReentrantLock鎖保證互斥性:存取都是同一把鎖,操作的是同一個陣列物件,存取相互排斥
【4】阻塞物件(notEmpty【出隊:佇列count=0,無元素可取時,阻塞在該物件上】,notFull【入隊:佇列count=length,放不進元素時,阻塞在該物件上】)
【5】入隊,從隊首開始添加元素,記錄putIndex(到隊尾時設定為0),喚醒notEmpty
【6】出隊,從隊首開始添加元素,記錄takeIndex(到隊尾時設定為0),喚醒notFull
【7】兩個指標都是從隊首向隊尾移動,保證佇列的先進先出原則(亮點:利用指標和陣列,形成環狀結構,重復利用記憶體空間)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/513663.html
標籤:其他
上一篇:MyBatisPlus筆記
下一篇:選擇排序演算法步驟分析
