本文部分摘自《Java 并發編程的藝術》
模式概述
在執行緒的世界里,生產者就是生產資料的執行緒,消費者就是消費資料的資料,生產者和消費者彼此之間不直接通信,而是通過阻塞佇列進行通信,所以生產者生產完資料后不用等待消費者處理,而是直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列取,阻塞佇列相當于一個緩沖區,平衡了生產者和消費者的處理能力
模式實戰
假設現有需求:把各部門的郵件收集起來,統一處理歸納,可以使用生產者 - 消費者模式,啟動一個執行緒把所有郵件抽取到佇列中,消費者啟動多個執行緒處理郵件,Java 代碼如下:
public class QuickCheckEmailExtractor {
private final ThreadPoolExecutor threadsPool;
private final BlockingQueue<EmailDTO> emailQueue;
private final EmailService emailService;
public QuickCheckEmailExtractor() {
emailQueue = new LinkedBlockingQueue<>();
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 101,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
emailService = new EmailService();
}
public void extract() {
// 抽取所有郵件到佇列里
new ExtractEmailTask().start();
// 處理佇列里的郵件
check();
}
private void check() {
try {
while (true) {
// 兩秒內取不到就退出
EmailDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
if (email == null) {
break;
}
threadsPool.submit(new CheckEmailTask());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected void extractEmail() {
List<EmailDTO> allEmails = emailService.queryAllEmails();
if (allEmails == null) {
return;
}
for (EmailDTO emailDTO : allEmails) {
emailQueue.offer(emailDTO);
}
}
protected void checkEmail(EmailDTO email) {
System.out.println("郵件" + email.getId() + "已處理");
}
public class ExtractEmailTask extends Thread {
@Override
public void run() {
extractEmail();
}
}
public class CheckEmailTask extends Thread {
private EmailDTO email;
@Override
public void run() {
checkEmail(email);
}
public CheckEmailTask() {
super();
}
public CheckEmailTask(EmailDTO email) {
super();
this.email = email;
}
}
}
多生產者和多消費者場景
在多核時代,多執行緒并發處理速度比單執行緒處理速度更快,所以可以使用多個執行緒來生產資料,多個執行緒來消費資料,更復雜的情況是,消費者消費完的資料,可能還要交給其他消費者繼續處理,如圖所示:

我們在一個長連接服務器中使用這種模式,生產者 1 負責將所有客戶端發送的訊息存放在阻塞佇列 1 里,消費者 1 從佇列里讀訊息,然后通過訊息 ID 進行散列得到 N 個佇列中的一個,然后根據編號將訊息存放在不同的佇列里,每個阻塞佇列會分配一個執行緒來阻塞佇列里的資料,如果消費者 2 無法消費訊息,就將訊息再拋回阻塞佇列 1 中,交給其他消費者處理
public class MsgQueueManager {
/**
* 訊息總佇列
*/
private final BlockingQueue<Message> messageQueue;
/**
* 訊息子佇列集合
*/
private final List<BlockingQueue<Message>> subMsgQueues;
private MsgQueueManager() {
messageQueue = new LinkedBlockingQueue<>();
subMsgQueues = new ArrayList<>();
}
public static MsgQueueManager getInstance() {
return new MsgQueueManager();
}
public void put(Message msg) {
try {
messageQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Message take() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
/**
* 消費者執行緒獲取子佇列
*/
public BlockingQueue<Message> addSubMsgQueue() {
BlockingQueue<Message> subMsgQueue = new LinkedBlockingQueue<>();
subMsgQueues.add(subMsgQueue);
return subMsgQueue;
}
/**
* 訊息分發執行緒,負責把訊息從大佇列塞到小佇列里
*/
class DispatchMessageTask implements Runnable {
/**
* 控制訊息分發開始與結束
*/
private boolean flag = true;
public void setFlag(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
BlockingQueue<Message> subQueue;
while (flag) {
// 如果沒有資料,則阻塞在這里
Message msg = take();
// 如果為空,表示沒有Session連接,需要等待Session連接上來
while ((subQueue = getSubQueue()) == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 把訊息放到小佇列里
try {
subQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* 均衡獲取一個子佇列
*/
public BlockingQueue<Message> getSubQueue() {
List<BlockingQueue<Message>> subMsgQueues = getInstance().subMsgQueues;
if (subMsgQueues.isEmpty()) {
return null;
}
int index = (int) (System.nanoTime() % subMsgQueues.size());
return subMsgQueues.get(index);
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/272751.html
標籤:其他
