歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
- 快速入門
- Disruptor類分析
- 環形佇列的基礎操作(不用Disruptor類)
- 事件消費知識點小結
- 事件消費實戰
- 常見場景
- 等待策略
- 知識點補充(終篇)
本篇概覽
本文是《disruptor筆記》的第七篇,咱們一起閱讀原始碼,學習一個重要的知識點:等待策略,由于Disruptor的原始碼短小精干、簡單易懂,因此本篇是個輕松愉快的原始碼學習之旅;
提前小結
如果您時間不充裕,可以通過以下提前小結的內容,對等待策略有個大體的認識:
- BlockingWaitStrategy:用了ReentrantLock的等待&&喚醒機制實作等待邏輯,是默認策略,比較節省CPU
- BusySpinWaitStrategy:持續自旋,JDK9之下慎用(最好別用)
- DummyWaitStrategy:回傳的Sequence值為0,正常環境是用不上的
- LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作,但是作者說測驗不充分,不建議使用
- TimeoutBlockingWaitStrategy:帶超時的等待,超時后會執行業務指定的處理邏輯
- LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作
- SleepingWaitStrategy:三段式,第一階段自旋,第二階段執行Thread.yield交出CPU,第三階段睡眠執行時間,反復的的睡眠
- YieldingWaitStrategy:二段式,第一階段自旋,第二階段執行Thread.yield交出CPU
- PhasedBackoffWaitStrategy:四段式,第一階段自旋指定次數,第二階段自旋指定時間,第三階段執行Thread.yield交出CPU,第四階段呼叫成員變數的waitFor方法,這個成員變數可以被設定為BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個
關于等待策略
- 回顧一下前面的文章中實體化Disruptor的代碼:
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
- 展開上述構造方法,會見到創建RingBuffer的代碼,默認使用了BlockingWaitStrategy作為等待策略:
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
- 繼續展開上面的createMultiProducer方法,可見每個Sequencer(注意不是Sequence)都有自己的watStrategy成員變數:

- 這個waitStrategy的最終用途是創建SequenceBarrier的時候,傳給SequenceBarrier做成員變數:

- 在看看SequenceBarrier是如何使用waitStrategy的,一共兩處用到,第一處如下圖紅框,原來是waitFor方法內部會用到,這個waitFor咱們前面已經了解過,對消費者來說,等待環形佇列的指定位置有可用資料時,就是呼叫SequenceBarrier的waitFor完成的:

- SequenceBarrier第二處用到waitStrategy是喚醒的時候:
@Override
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
- 現在咱們知道了WaitStrategy的使用場景,接下來看看這個介面有哪些具體實作吧,這樣咱們在編程中就知道如何選擇才最適合自己
BlockingWaitStrategy
- 作為默認的等待策略,BlockingWaitStrategy還有個特點就是代碼量小(不到百行),很容易理解,其實就是用ReentrantLock+Condition來實作等待和喚醒操作的,如下圖紅框:

- 如果您更傾向于節省CPU資源,對高吞吐量和低延時的要求相對低一些,那么BlockingWaitStrategy就適合您了;
BusySpinWaitStrategy(慎用)
- 前面的BlockingWaitStrategy有個特點,就是一旦環形佇列指定位置來了資料,由于執行緒是等待狀態(底層呼叫了native的UNSAFE.park方法),因此還要喚醒后才能執行業務邏輯,在一些場景中希望資料一到就盡快消費,此時BusySpinWaitStrategy就很合適了,代碼太簡單,全部貼出:
public final class BusySpinWaitStrategy implements WaitStrategy
{
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
}
- 上述代碼顯示,整個while回圈的關鍵就是ThreadHints.onSpinWait做了什么,原始碼如下,這里要格外注意,如果ON_SPIN_WAIT_METHOD_HANDLE為空,意味著外面的while回圈是個非常消耗CPU的自旋:
public static void onSpinWait()
{
if (null != ON_SPIN_WAIT_METHOD_HANDLE)
{
try
{
ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
}
catch (final Throwable ignore)
{
}
}
}
- ON_SPIN_WAIT_METHOD_HANDLE為空是很可怕的事情,咱們來看看它是何方神圣?代碼還是在ThreadHints.java中,如下所示,真相一目了然,它就是Thread類的onSpinWait方法,如果Thread類沒有onSpinWait方法,那么使用BusySpinWaitStrategy作為等待策略就有很高的代價了,環形佇列里沒有資料時消費執行緒會執行自旋,很耗費CPU:
static
{
final MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle = null;
try
{
methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
}
catch (final Exception ignore)
{
}
ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
}
-
好吧,還剩兩個問題:Thread類有沒有onSpinWait方法還不能確定嗎?這個onSpinWait方法是何方神圣?
-
去看JDK官方檔案,如下圖,原來這方法是從JDK9才有的,所以對于JDK8使用者來說來說,選用BusySpinWaitStrategy就意味著要面對沒做啥事兒的while回圈了:

- 第二個問題,onSpinWait方法干了些啥?前面的官方檔案,以欣宸的英語水平顯然是無法理解的,去看stackoverflow吧,如下圖,簡單的說,就是告訴CPU當前執行緒處于回圈查詢的狀態,CPU得知后就會調度更多CPU資源給其他執行緒:

-
至此真像大白:環形佇列的條件就緒后,BusySpinWaitStrategy策略是通過whlie死回圈來做到快速回應的,如果JDK是9或者更高版本,這個死回圈帶來的CPU損耗由Thread.onSpinWait幫助緩解,如果JDK版本低于9,這里就是個簡單的while死回圈,至于這種死回圈有多消耗CPU,您可以寫段簡單代碼感受一下...
-
難怪Disruptor原始碼中會提醒最好是將使用此實體的執行緒系結到指定CPU核:

DummyWaitStrategy
固定回傳0,個人覺得這個策略在正常開發中用不上,因為環形佇列可用位置始終是0的話,不論是生產還是消費都難以實作:

LiteBlockingWaitStrategy
- 看名字,LiteBlockingWaitStrategy是BlockingWaitStrategy策略的輕量級實作,在鎖沒有競爭的時候(例如獨立消費的場景),會省略掉喚醒操作,不過如下圖紅框所示,作者說他沒有充分驗證過正確性,因此建議只用于體驗,太好了,這個策略我不學了!!!

TimeoutBlockingWaitStrategy
- 顧名思義,TimeoutBlockingWaitStrategy表示只等待某段時長,超過了就算超時,其代碼和BlockingWaitStrategy類似,只是等待的時候有個時長限制,如下圖,一目了然:

- 其實我對拋出例外后的處理很感興趣,去看看吧,外面是熟悉的BatchEventProcessor類,熟悉的processEvents方法,如下圖,每次超時例外都交給notifyTimeout處理,而外部的主流程不受影響,依舊不斷的從環形佇列中等待和獲取資料:

- 進入notifyTImeout方法,可見實際上是交給成員變數timeoutHandler去處理的,而且處理程序中發生的任何例外都會被捕獲,不會拋出去影響外部呼叫:

- 再來看看成員變數是哪來的,如下圖,真相大白,咱們開發的EventHandler實作類,如果也實作了Timeouthandler,就被當做成員變數timeoutHandler了:

- 至此TimeoutBlockingWaitStrategy也搞清楚了:用于有時間限制的場景,每次等待超時后都會呼叫業務定制的超時處理邏輯,這個邏輯寫到EventHandler實作類中,這個實作類要實作Timeouthandler介面
LiteTimeoutBlockingWaitStrategy
- LiteTimeoutBlockingWaitStrategy與TimeoutBlockingWaitStrategy的關系,就像BlockingWaitStrategy與LiteBlockingWaitStrategy的關系:作為TimeoutBlockingWaitStrategy的變體,有TimeoutBlockingWaitStrategy的超時處理特性,而且沒有鎖競爭的時候,省略掉喚醒操作;
- 作者說LiteBlockingWaitStrategy可用于體驗,但正確性并未經過充分驗證,但是在LiteTimeoutBlockingWaitStrategy的注釋中沒有看到這種說法,看樣子這是個靠譜的等待策略,可以用,用在有超時處理的需求,而且沒有鎖競爭的場景(例如獨立消費)
SleepingWaitStrategy
- 和前面幾個不同的是,SleepingWaitStrategy沒有用到鎖,這意味這無需呼叫signalAllWhenBlocking方法做喚醒處理,相當于省去了生產執行緒的通知操作,官方原始碼注釋有這么句話引起了我的興趣,如下圖紅框,大意是該策略在性能和CPU資源消耗之間取得了平衡,接下來去看看關鍵代碼,來了解這個特性:

- 如下圖,等到可用資料的程序是個死回圈:

- 接下來是關鍵代碼了,如下圖,可見整個等待程序分為三段:計數器高于100時就只有一個減一的操作(最快回應),計數器在100到0之間時每次都交出CPU執行時間(最省資源),其他時候就睡眠固定時間:

YieldingWaitStrategy
- 看過SleepingWaitStrategy之后,再看YieldingWaitStrategy就很容易理解了,和SleepingWaitStrategy相比,YieldingWaitStrategy先做指定次數的自旋,然后不斷的交出CPU時間:

- 由于在不斷的執行Thread.yield()方法,因此該策略雖然很消耗CPU,不過一旦其他執行緒有CPU需求,很容易從這個執行緒得到;
PhasedBackoffWaitStrategy
- 最后是PhasedBackoffWaitStrategy,該策略的特點是將整個等待程序分成下圖的四段,四個方塊代表一個時間線上的四個階段:

- 這里說明一下上圖的四個階段:
- 首先是自旋指定的次數,默認10000次;
- 自旋過后,開始帶計時的自旋,執行的時長是spinTimeoutNanos的值;
- 執行時長達到spinTimeoutNanos的值后,開始執行Thread.yield()交出CPU資源,這個邏輯的執行時長是yieldTimeoutNanos-spinTimeoutNanos;
- 執行時長達到yieldTimeoutNanos-spinTimeoutNanos的值后,開始呼叫fallbackStrategy.waitFor,這個呼叫沒有時間或者次數限制;
- 現在問題來了fallbackStrategy是何方神圣?PhasedBackoffWaitStrategy類準備了三個靜態方法,咱們可以按需選用,讓fallbackStrategy是BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個:
public static PhasedBackoffWaitStrategy withLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new BlockingWaitStrategy());
}
public static PhasedBackoffWaitStrategy withLiteLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new LiteBlockingWaitStrategy());
}
public static PhasedBackoffWaitStrategy withSleep(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new SleepingWaitStrategy(0));
}
- 至此,Disruptor的九種等待策略就全部分析完畢了,除了選用等待策略的時候更加得心應手,還有個識訓就是積攢了閱讀優秀原始碼的經驗,在讀原始碼的路上更加有信心了;
你不孤單,欣宸原創一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 資料庫+中間件系列
- DevOps系列
歡迎關注公眾號:程式員欣宸
微信搜索「程式員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/304444.html
標籤:其他
