大家好,我是陶朱公Boy,又跟大家見面了,
前言
今天跟大家分享一款基于“生產者消費者模式”下實作的組件,
該組件是作者偶然在翻閱公司一中間件原始碼的時候碰到的,覺得設計的非常精美、巧妙,花了點時間整理成文分享給大家,
生產者和消費者彼此之間不進行通信,中間通過一個容器(如阻塞佇列)來解決強解耦問題,
阻塞佇列起到了一定的資料緩沖作用,平衡了生產者和消費者對資料的處理能力,by—《Java并發編程的藝術》
組件介紹
該組件基于生產者消費者模式來編碼實作,是一款本地化解決流量削峰、解耦、異步的利器,
此組件由以下知識點構成:執行緒池、阻塞佇列、LockSupport、Executor框架、final、volatile,此外你還能接觸到hash取模演算法、介面回呼等機制,
組件本身代碼量并不大,但知識點比較密集,所以希望大家能花一點時間認真看完,我將從適用場景、架構設計、原始碼決議這三個角度給大家講介紹這款組件,
適用場景
☆場景一:報表下載
現在很多后臺下載功能,普適的做法是先篩選轉換資料,然后對接云存盤平臺進行保存,最后生成一個可訪問的檔案地址,整個程序非常耗時,
其實完全可以生產者發送一個下載請求就結束回應,服務端異步的去消費這個任務請求,處理完生成地址后,再進行通知(比如更新對應資料庫檔案欄位)這是一種異步體現,也解耦了生產者與消費者原來的同步互動方式,整體效率會更高,
☆場景二:日志埋點
有些應用它的QPS非常高,產生的資料本身并不是特別重要比如埋點的日志,如果實時呼叫埋點平臺可能給平臺側造成非常大的訪問壓力,所以這個時候中間的阻塞佇列就起到了一定的緩沖作用,等一段時間或佇列資料量達到一定量(參賽可動態配置)再一次性拿出來轉換后,最后批量傳遞出去,
☆場景三:Yana(阿里內部一款基于郵件分享技術文章的工具)
《Java并發編程的藝術》作者方騰飛有分享過他們基于生產者消費者模式實作的一個案例,
他們團隊早期有一個習慣,大家如果在平時作業當中遇到比較好的文章,會通過郵件轉發到專屬郵箱進行內部分享,這樣其他成員就能看到這篇文章,甚至大家會在底部評論、回復、交流,
但期間遇到一個問題:一旦時間一長,以前的文章很難被檢閱,郵件串列的可視化太差,也不能進行歸類,有些新入職員工也看不到以往其他成員分享過的文章,
基于這些問題,有幾個小伙伴自發的趁業余時間開發了一個簡易工具--yana,該工具功能就是:生產者執行緒會先往郵箱里將所有分享的郵件下載下來(包括附件、圖片、郵件回復等內容),下載完成后,通過confluence的Web Service介面,把文章保存到confluence中去,這樣不僅好維護,而且留存問題也得到了解決,
不過隨著這款工具在其他部門的推廣,發現系統回應時間越來越長,只要單位時間內積累郵件一多,一次處理完可能就要花費幾分鐘,
于是他們升級了方案,把架構演進到了V2.0版本,整體思路是使用了生產者消費者模式來處理,
思路如下:生產者執行緒去郵件系統下載完郵件后,不會立即呼叫confluence的web service介面,而是選擇把下載的內容放入阻塞佇列后立即回傳,而消費者啟動CPU*2個執行緒數來并行處理佇列中的郵件,從之前的單執行緒演變成了多執行緒處理,生產者和消費者實作了異步、解耦,經過觀察,比起V1.0同步處理,速度比之前要快好了幾倍,
...
架構設計
☆物件圖
該組件支持“多生產者多消費者”場景,在多核時代充分利用CPU多核機制,消費者多執行緒并行處理阻塞佇列中的資料,加快任務處理速度,
☆邏輯架構圖
該組件內部持有一個作業執行緒物件陣列,當生產者提交資料的時候,會先經過一個route組件(采用hash取模演算法),動態路由到其中一個執行緒物件內的阻塞佇列中存盤起來,等到滿足一定條件,作業執行緒就會將自身執行緒物件內阻塞佇列中的資料轉換成指定容量的List物件(BlockQueue的drainto方法有支持),然后呼叫已經注冊的回呼函式把資料傳遞出去,
☆流程圖
我們一起來看下這張作業執行緒內部運行流程圖:
首選我們說此組件物件內部持有一個作業執行緒物件陣列,每個作業執行緒物件內部持有一個有界阻塞佇列實體物件(ArrayBlockingQueue),方法有run(),add(),timeout()方法,
生產者呼叫組件自身的add方法后,add方法內部通過hash取模演算法動態路由到某個作業執行緒物件內部的blockingQueue中去,
timeout方法是這款組件設計的一個亮點(容錯性設計)??,
假如實際運行程序中,作業執行緒內部的阻塞佇列內一直只占少許幾個物件,如果僅僅只判斷佇列中的元素個數是否超出指定閾值,再去處理佇列中的資料,一旦長時間未超出,作業執行緒就會一直被阻塞,也將導致佇列中的資料長時間堆積,
所以新增的這個timeout()這個機制能應對一旦佇列中的資料長時間積壓,它會根據時間差即判斷當前時間距離上次任務處理時間是否超出指定閾值(可配置),如果超出了也會強制處理佇列中的資料,
原始碼賞析
public class ProducerAndConsumerComponet {
private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);
//組件持有一個作業執行緒物件陣列
private final WorkThread<T>[] workThreads;
private AtomicInteger index;
private static final Random r = new Random();
//任務定時器
private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
//組件初始化完成作業執行緒的新建
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 構造器
* @param threadNum 默認新建的消費者執行緒個數
* @param limitSize 佇列長度閾值;超過將喚醒阻塞的執行緒
* @param period 前后兩個任務的執行周期 (for example :200ms 代表前面一次任務執行完畢后,200毫秒后下一個任務繼續執行)
* @param capacity 作業執行緒內部的有界阻塞佇列的初始容量大小
* @param processor 回呼介面(初始化組價實體的時候需要傳遞)
*/
public ProducerAndConsumerComponet(int threadNum,int limitSize, int period, int capacity, Processor<T> processor) {
this.workThreads = new WorkThread[threadNum];
if (threadNum > 1) {
this.index = new AtomicInteger();
}
for(int i = 0; i < threadNum; ++i) {
WorkThread<T> workThread = new WorkThread("workThread"+ "_" + i, limitSize, period, capacity, processor);
this.workThreads[i] = workThread;
executorService.submit(workThread);
//呼叫scheduleAtFixedRate時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實作了RunableScheduleFuture介面的
//ScheduleFutureTask
scheduleThreadPool.scheduleAtFixedRate(workThread::timeout, r.nextInt(50), period, TimeUnit.MILLISECONDS);
}
}
/**
* 生產者執行緒將物件添加到對應消費者執行緒物件內部的阻塞佇列中去<br>
* 內部采用HASH取模演算法進行動態路由
* @param item 待添加的物件
* @return true:添加成功 false:添加失敗
*/
public boolean add(T item) {
// log.info("add item={}",item);
int len = this.workThreads.length;
//log.info("add len..."+len);
if (len == 1) {
return this.workThreads[0].add(item);
} else {
int mod = this.index.incrementAndGet() % len;
// log.info("路由到this.workThreads[mod]={}",mod);
return this.workThreads[mod].add(item);
}
}
/**
* 消費者執行緒
*/
private static class WorkThread<T> implements Runnable {
/**
* 作業執行緒命名
*/
private final String threadName;
/**
* 佇列中允許存放元素個數限制<br>
* 超出將從佇列中取出此大小的元素轉成List物件
*/
private final int queueSizeLimit;
/**
* 前后兩個任務的執行周期
*/
private int period;
/**
* 用來記錄任務的即時處理時間
*/
private volatile long lastFlushTime;
/**
* 當前作業執行緒物件
*/
private volatile Thread currentThread;
/**
* 作業執行緒物件內部的阻塞佇列
*/
private final BlockingQueue<T> queue;
/**
* 回呼介面
*/
private final Processor<T> processor;
/**
* 消費者執行緒構造器
* @param threadName 執行緒名
* @param queueSizeLimit 指定佇列閾值(可配置)
* @param period 前后兩個任務的執行周期(可配置)
* @param capacity 阻塞佇列初始容量
* @param processor 回呼介面
*/
public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
this.threadName = threadName;
this.queueSizeLimit = queueSizeLimit;
this.period = period;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue(capacity);
}
/**
* 往阻塞佇列中添加元素
* @param item 添加的物件
* @return true:添加成功 false:添加失敗
*/
public boolean add(T item) {
// log.info("add result:"+item);
boolean result = this.queue.offer(item);
// log.info("resultP{}",result);
this.checkQueueSize();
return result;
}
/**
* 當前時間與上次任務處理時間差是否超過指定閾值;如果超過觸發start方法
*/
public void timeout() {
// log.info("{}====check timeout",currentThread.getName());
if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
log.info("當前時間距離上次任務處理時間周期={}超出指定閾值={}",System.currentTimeMillis() - this.lastFlushTime ,period);
this.start();
}
}
/**
* 喚醒被阻塞的作業執行緒
*/
private void start() {
log.info("執行start方法,喚醒被阻塞的執行緒"+currentThread.getName());
LockSupport.unpark(this.currentThread);
}
/**
* 判斷佇列實際長度是否超過指定閾值;如果超過觸發start方法
*/
private void checkQueueSize() {
if (this.queue.size() > this.queueSizeLimit) {
log.info("{}佇列大小={}超出指定閾值={}",currentThread.getName(),this.queue.size() ,queueSizeLimit);
this.start();
}
}
/**
* 將佇列中的元素通過呼叫<code>drainTo</code>方法,轉成List物件(容量受queueSizeLimit限制),最后呼叫回呼函式傳遞List物件
*/
public void flush() {
if(queue.isEmpty()){
return;
}
this.lastFlushTime = System.currentTimeMillis();
List<T> temp = new ArrayList(this.queueSizeLimit);
int size = this.queue.drainTo(temp, this.queueSizeLimit);
if (size > 0) {
log.info("{}被喚醒后,開始執行任務:從佇列中騰出大小為{}的資料且轉成List物件",currentThread.getName(),size);
try {
//執行回呼函式
this.processor.process(temp);
} catch (Throwable var4) {
System.out.println("process error");
}
}
}
/**
* 判斷佇列實際大小是否超過指定閾值亦或距離上次任務處理時間差是否超過指定閾值
* @return true:滿足觸發條件 false:不滿足觸發條件
*/
private boolean canFlush() {
return this.queue.size() > this.queueSizeLimit || System.currentTimeMillis() - this.lastFlushTime > (long)this.period;
}
@Override
public void run() {
this.currentThread = Thread.currentThread();
this.currentThread.setName(this.threadName);
//當前執行緒沒有被其他執行緒打斷
while(!this.currentThread.isInterrupted()) {
//死回圈的判斷是否滿足觸發條件(佇列實際大小是否超出指定閾值或距離上次任務時間是否超出指定閾值),如果未滿足將阻塞當前執行緒,避免死回圈給系統帶來性能開銷
while(!this.canFlush()) {
//當前作業執行緒被阻塞
log.info("執行緒被阻塞...");
LockSupport.park(this);
}
//一旦add方法執行的時候判斷存放的阻塞佇列元素大小超出自定制閾值亦或距離上次任務處理時間差超出指定閾值,就會呼叫LockSupport.unpark方法解除阻塞的執行緒
//一旦執行緒被解除阻塞,就會觸發此方法,將佇列元素轉成List物件且呼叫已經注冊的回呼函式
// log.info("阻塞執行緒被喚醒");
this.flush();
}
}
}
復制代碼
測驗用例
/**
* 前置條件:
* #主件初始化默認新增兩個作業執行緒
* config.threadNum=2
* config.period=12000
* config.queueSizeLimit=3
* config.capacity=10
*/
@DisplayName("佇列大小超出指定閾值")
@Test
void add() {
for(int i=0;i<10;i++){
producerAndConsumerComponet.add("1");
}
try {
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
}
結果列印:
2022-10-29 21:04:51,656 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_1佇列大小=4超出指定閾值=3
2022-10-29 21:04:51,658 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_1
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被喚醒后,開始執行任務:從佇列中騰出大小為3的資料且轉成List物件
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_0佇列大小=4超出指定閾值=3
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_0
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被喚醒后,開始執行任務:從佇列中騰出大小為3的資料且轉成List物件
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 當前時間距離上次任務處理時間周期=1714超出指定閾值=1000
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_0
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被喚醒后,開始執行任務:從佇列中騰出大小為2的資料且轉成List物件
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 當前時間距離上次任務處理時間周期=1720超出指定閾值=1000
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_1
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被喚醒后,開始執行任務:從佇列中騰出大小為2的資料且轉成List物件
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
復制代碼
可能有部分小伙伴對在生產環境如何正確使用這款組件有疑慮,對此完整版我已經開源到GitHub上(基于springboot構建內含如何正確初始化此組件以及完整的測驗用例),有興趣的小伙伴可以自取,
github地址:https://github.com/TaoZhuGongBoy/ProducerAndConsumerComponet
總結
好了這款組件介紹已接近尾聲,接下來讓我們一起做下總結:
☆是什么
我們說這款工具是一款基于“生產者-消費者模式”實作的組件,以前生產者必須同步呼叫、等待相關業務操作的處理結果后才能回傳(一旦有些業務場景生產者生產的速度過快,方法內部自身業務處理又比較耗時)這時如果同步等待呼叫結果回傳,系統整體吞吐量會極具降低,現在換了一種思路即生產者不需要同步等待業務的處理結果,當它發送一個請求后立即回傳,耗時的處理由一致多個消費者執行緒來異步處理,加快任務整體處理速度,(異步、解耦)
☆適用場景
它比較適合處理一些重要程度不是很高的資料(比如埋點日志、下載請求等),當生產者生產資料過快,業務本身處理又比較耗時,那用這個方案是比較合適的,
為什么在這里要強調重要程度不是很高這句話呢?因為BlockQueue畢竟是基于記憶體的資料結構,極端情況下是存在資料丟失風險的,像埋點日志、下載請求這種資料小部分丟失其實對業務影響不大,
看到這里可能有部分小伙伴會產生一個疑問:怎么看這個組件的功能跟MQ這么像,對的,功能是相似的,但這款組件是一個本地化的解決方案,目的就是為了降低引入訊息佇列的復雜度才設計(設計意圖),
☆怎么實作的
每個消費者執行緒物件內部持有一個有界阻塞佇列,當外部生產者呼叫組件的add方法后,add方法內部實作路由,最終保存到指定的阻塞佇列內,
消費者執行緒本身死回圈來判斷阻塞佇列中的元素是否滿足條件,如果不滿足,執行緒就會被阻塞(避免死回圈給系統造成性能影響;通過Locksupport.park實作), 一旦消費者執行緒物件內部的add、timeout方法滿足觸發條件后,被阻塞的執行緒就會被喚醒,然后執行緒繼續執行余下業務邏輯:從阻塞佇列中取出資料,然后轉換成有初始容量限制的List物件后,呼叫回呼介面傳遞資料,
寫到最后
這款組件幾乎囊括了并發編程領域半壁江山的技術點,能把這些散的點串起來,用好用對,著實不容易,也體現出組件作者深厚的基礎技術功底,
如果你是《并發編程》的初學者亦或有幾年經驗的老兵,都建議好好揣摩與學習一下這款組件的架構設計與編碼實作;如果在你的生產場景中你剛好也碰到類似問題與場景,那么這款組件也許能幫助你,
本文完!
關注我
如果這篇文章你看了對你有幫助或啟發,麻煩點贊、關注一下作者,你的肯定是作者創作源源不斷的動力,
公眾號

里面不僅匯集了硬核的干貨技術、還匯集了像左耳朵耗子、張朝陽總結的高效學習方法論、職場升遷竅門、軟技能,希望能輔助你達到你想夢想之地!
公眾號內回復關鍵字“電子書”下載pdf格式的電子書籍(JAVAEE、Spring、JVM、并發編程、Mysql、Linux、kafka、分布式等)、“開發手冊”獲取阿里開發手冊2本、"面試"獲取面試PDF資料,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/523077.html
標籤:Java
