簡單理解阻塞佇列(BlockingQueue)中的take/put方法以及Condition存在的作用
-
Condition:可以理解成一把鎖的一個鑰匙,它既可以解鎖(通知放行),又可以加鎖(阻塞)
-
notFull:當佇列元素滿了時,阻塞生產,當佇列元素存在元素但是沒有滿時,去通知消費
-
notEmpty:當佇列中不存在元素時,阻塞消費,當佇列元素存在元素時,去通知生產
while (count >= datas.length) {...}
while (count <= 0) {...}
兩個while回圈判斷,而不用if,是因為執行緒不安全,
導致多執行緒場景下每個執行緒獲取到的回圈條件count的值存在差異,
導致代碼執行例外,用while可以使當前執行緒重新重繪判斷條件count的值,
- 用處:
ThreadPoolExecutor中使用到了阻塞佇列,阻塞佇列中又使用到了ReentrantLock,而ReentrantLock基于AQS,
package com.zhuyz.foundation.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyArrayBlockingQueue<T> {
// 陣列元素
private T[] datas;
// 實際的元素個數(也有索引下標的作用)
private volatile int count;
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
// 通知消費,暫停生產【當陣列full時await(put時),當陣列notFull時signal(take時)】
private Condition notFull = putLock.newCondition();
// 通知生產,暫停消費【當陣列empty時await(take時),當陣列notEmpty時signal(put時)】
private Condition notEmpty = takeLock.newCondition();
public MyArrayBlockingQueue(int cap) {
this.datas = (T[]) new Object[cap];
}
private void put(T t) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 執行緒不安全,需要回圈判斷,插入值之前判斷一下陣列長度是否達到最大長度
while (count >= datas.length) {
System.out.println("datas is full, please waiting take");
notFull.await();
}
datas[count++] = t;
System.out.println("put: " + t);
} catch (Exception e) {
System.out.println("例外" + e);
} finally {
putLock.unlock();
}
// 通知獲取元素的執行緒繼續執行(take_thread)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private T take() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
T t = null;
try {
// 執行緒不安全,需要回圈判斷,插入值之前判斷一下陣列中元素個數是否為0
while (count <= 0) {
System.out.println("datas is empty, please waiting put");
notEmpty.await();
}
// 獲取元素
t = datas[--count];
System.out.println("take: " + t);
} catch (Exception e) {
System.out.println("例外" + e);
} finally {
takeLock.unlock();
}
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 通知插入元素的執行緒繼續執行(put_thread)
notFull.signal();
} finally {
putLock.unlock();
}
return t;
}
public static void main(String[] args) throws InterruptedException {
MyArrayBlockingQueue<Integer> myArrayBlockingQueue = new MyArrayBlockingQueue<>(5);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> myArrayBlockingQueue.put(finalI)).start();
}
TimeUnit.SECONDS.sleep(5L);
for (int i = 0; i < 5; i++) {
new Thread(() -> myArrayBlockingQueue.take()).start();
}
}
}
結果如下:
從結果中可以看出,先put了5個元素,然后另外五個元素被阻塞住了,沒有進去
take消費5個之后,另外五個被阻塞的元素就被put進去了
put: 0
put: 1
put: 2
put: 3
put: 4
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
take: 4
put: 5
take: 5
take: 3
put: 6
put: 7
take: 7
put: 8
take: 8
put: 9
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/229092.html
標籤:其他
上一篇:斯坦佛密碼學十三講:1.概論
