阻塞佇列
- 1. 什么是阻塞佇列
- 2. 阻塞佇列的代碼使用
- 3. 生產者消費者模型
- (1)應用一:解耦合
- (2)應用二:削峰填谷
- (3)相關代碼
- 4.阻塞佇列和生產者消費者模型功能的實作
1. 什么是阻塞佇列
- 阻塞佇列是一種特殊的佇列,和資料結構中普通的佇列一樣,也遵守先進先出的原則
- 同時,阻塞佇列是一種能保證執行緒安全的資料結構,并且具有以下兩種特性:當佇列滿的時候,繼續向佇列中插入元素就會讓佇列阻塞,直到有其他執行緒從佇列中取走元素;當佇列為空的時候,繼續出佇列也會讓佇列阻塞,直到有其他執行緒往佇列中插入元素
補充:執行緒阻塞的意思指代碼此時不會被執行,即作業系統在此時不會把這個執行緒調度到CPU上去執行了
2. 阻塞佇列的代碼使用
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
public class Test {
public static void main(String[] args) throws InterruptedException {
//不能直接newBlockingDeque,因為它是一個介面,要向上轉型
//LinkedBlockingDeque內部是基于鏈表方式來實作的
BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此處可以指定一個具體的數字,這里的的10代表佇列的最大容量
queue.put("hello");
String elem=queue.take();
System.out.println(elem);
elem=queue.take();
System.out.println(elem);
}
}
注意: put方法帶有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是 BlockingDeque繼承了Queue)

列印結果如上所示,當列印了hello后,佇列為空,代碼執行到elem=queue.take();就不會繼續往下執行了,此時執行緒進入阻塞等待狀態,什么也不會列印了,直到有其他執行緒給佇列中放入新的元素為止
3. 生產者消費者模型
生產者消費者模型是在服務器開發和后端開發中比較常用的編程手段,一般用于解耦合和削峰填谷,
高耦合度:兩個代碼模塊的關聯關系比較高
高內聚:一個代碼模塊內各個元素彼此結合的緊密
因此,我們一般追求高內聚低耦合,這樣會加快執行效率,而使用生產者消費者模型就可以解耦合
(1)應用一:解耦合
我們以實際生活中的情況為例,這里有兩臺服務器:A服務器和B服務器,當A服務器傳輸資料給B時,要是直接傳輸的話,那么不是A向B推送資料,就是B從A中拉取資料,都是需要A和B直接互動,所以A和B存在依賴關系(A和B的耦合度比較高),未來如果服務器需要擴展,比如加一個C服務器,讓A給C傳資料,那么改動就比較復雜,且會降低效率,這時我們可以加一個佇列,這個佇列為阻塞佇列,如果A把資料寫到佇列里,B從中取,那么佇列相當于是中轉站(或者說交易場所),A相當于生產者(提供資料),B相當于消費者(接收資料),此時就構成了生產者消費者模型,這樣會讓代碼耦合度更低,維護更方便,執行效率更高,

在計算機中,生產者充當其中一組執行緒,而消費者充當另一組執行緒,而交易場所就可以使用阻塞佇列了
(2)應用二:削峰填谷

實際生活中
在河道中大壩算是一個很重要的組成部分了,如果沒有大壩,大家試想一下結果:當汛期來臨后上游的水很大時,下游就會涌入大量的水發生水災讓莊稼被淹沒;而旱期的話下游的水會很少可能會引發旱災,若有大壩的話,汛期時大壩把多余的水存到大壩中,關閘蓄水,讓上游的水按一定速率往下流,避免突然一波大雨把下游淹了,這樣下游不至于出現水災,旱期時大壩把之前儲存好的水放出來,還是讓讓水按一定速率往下流,避免下流太缺水,這樣既可以避免汛期發生洪澇又可以避免旱期發生旱災了,
峰:相當于汛期
谷:相當于旱期
計算機中
這樣的情況在計算機中也是很典型的,尤其是在服務器開發中,網關通常會把互聯網中的請求轉發給業務服務器,比如一些商品服務器,用戶服務器,商家服務器(存放商家的資訊),直播服務器,但因為互聯網過來的請求數量是多是少不可控,相當于上游的水,如果突然來了一大波請求,網關即使能扛得住,后續的很多服務器收到很多請求也就會崩潰(處理一個請求涉及到一系列的資料庫操作,因為資料庫相關操作效率本身比較低,這樣請求多了就處理不過來了,因此就會崩潰)
所以實際情況中網關和業務服務器之間往往用一個佇列來緩沖,這個佇列就是阻塞佇列(交易場所),用這個佇列來實作生產者(網關)消費者(業務服務器)模型,把請求快取到佇列中,后面的消費者(業務服務器)按照自己固定的速率去讀請求,這樣當請求很多時,雖然佇列服務器可能會稍微受到一定壓力,但能保證業務服務器的安全,
(3)相關代碼
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestDemo {
public static void main(String[] args) {
// 使用一個 BlockingQueue 作為交易場所
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
// 此執行緒作為消費者
Thread customer = new Thread() {
@Override
public void run() {
while (true) {
// 取隊首元素
try {
Integer value = queue.take();
System.out.println("消費元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
customer.start();
// 此執行緒作為生產者
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 1; i <= 10000; i++) {
System.out.println("生產了元素: " + i);
try {
queue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
try {
customer.join();
producer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

列印如上(此代碼是讓生產者通過sleep每過1秒生產一個元素,而消費者不使用sleep,所以每當生產一個元素時,消費者都會立馬消費一個元素)
4.阻塞佇列和生產者消費者模型功能的實作
在學會如何使用BlockingQueue后,那么如何自己去實作一個呢?
主要思路:
1.利用陣列
2.head代表隊頭,tail代表隊尾
3.head和tail重合后到底是空的還是滿的判斷方法:專門定義一個size記錄當前佇列元素個數,入佇列時size加1出佇列時size減1,當size為0表示空,為陣列最大長度就是滿的(也可以浪費一個陣列空間用head和tail重合表示空,用tail+1和head重合表示滿,但此方法較為麻煩,上一個方法較為直觀,因此我們使用上一個方法)
public class Test2 {
static class BlockingQueue {
private int[] items = new int[1000]; // 此處的1000相當于佇列的最大容量, 此處暫時不考慮擴容的問題.
private int head = 0;//定義隊頭
private int tail = 0;//定義隊尾
private int size = 0;//陣列大小
private Object locker = new Object();
// put 用來入佇列
public void put(int item) throws InterruptedException {
synchronized (locker) {
while (size == items.length) {
// 佇列已經滿了,阻塞佇列開始阻塞
locker.wait();
}
items[tail] = item;
tail++;
// 如果到達末尾, 就回到起始位置.
if (tail >= items.length) {
tail = 0;
}
size++;
locker.notify();
}
}
// take 用來出佇列
public int take() throws InterruptedException {
int ret = 0;
synchronized (locker) {
while (size == 0) {
// 對于阻塞佇列來說, 如果佇列為空, 再嘗試取元素, 就要阻塞
locker.wait();
}
ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
// 此處的notify 用來喚醒 put 中的 wait
locker.notify();
}
return ret;
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new BlockingQueue();
// 消費者執行緒
Thread consumer = new Thread() {
@Override
public void run() {
while (true) {
try {
int elem = queue.take();
System.out.println("消費元素: " + elem);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
consumer.start();
// 生產者執行緒
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 1; i < 10000; i++) {
System.out.println("生產元素: " + i);
try {
queue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
consumer.join();
producer.join();
}
}

運行結果如上,
注意:
1.wait和notify的正確使用
2.put和take都會產生阻塞情況,但阻塞條件是對立的,wait不會同時觸發(put喚醒take阻塞,take喚醒put阻塞)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/308751.html
標籤:java
