MyDisruptor V5版本介紹
在v4版本的MyDisruptor實作多執行緒生產者后,按照計劃,v5版本的MyDisruptor需要支持更便于用戶使用的DSL風格的API,
由于該文屬于系列博客的一部分,需要先對之前的博客內容有所了解才能更好地理解本篇博客
- v1版本博客:從零開始實作lmax-Disruptor佇列(一)RingBuffer與單生產者、單消費者作業原理決議
- v2版本博客:從零開始實作lmax-Disruptor佇列(二)多消費者、消費者組間消費依賴原理決議
- v3版本博客:從零開始實作lmax-Disruptor佇列(三)多執行緒消費者WorkerPool原理決議
- v4版本博客:從零開始實作lmax-Disruptor佇列(四)多執行緒生產者MultiProducerSequencer原理決議
為什么Disruptor需要DSL風格的API
通過前面4個版本的迭代,MyDisruptor已經實作了disruptor的大多數功能,但對程式可讀性有要求的讀者可能會注意到,之前給出的demo示例代碼中對于構建多個消費者之間的依賴關系時細節有點多,
構建一個有上游消費者依賴的EventProcessor消費者一般來說需要通過以下幾步完成:
- 獲得所要依賴的上游消費者序列集合,并在創建EventProcessor時通過引數傳入
- 獲得所創建的EventProcessor對應的消費者序列物件
- 將獲得的消費者序列物件注冊到RingBuffer中
- 通過執行緒池或者start等方式啟動EventProcessor執行緒,開始監聽消費
目前的版本中,每創建一個消費者都需要寫一遍上述的模板代碼,對于理解Disruptor原理的人來說還勉強能接受,但還是很繁瑣且容易在細節上犯錯,更遑論對disruptor底層不大了解的普通用戶,
基于上述原因,disruptor提供了更加簡單易用的DSL風格API,使得對disruptor底層各組件間互動不甚了解的用戶也能很方便的使用disruptor,去構建不同消費者組間的依賴關系,
什么是DSL風格的API?
DSL即Domain Specific Language,領域特定語言,DSL是針對特定領域抽象出的一個特定語言,通過進一層的抽象來代替大量繁瑣的通用代碼段,如sql、shell等都是常見的dsl,
而DSL風格的API最大的特點就是介面的定義貼合業務場景,因此易于理解和使用,
MyDisruptor DSL風格API實作詳解
Disruptor
首先要介紹的就是Disruptor類,disruptor類主要用于創建一個符合用戶需求的RingBuffer,并提供一組易用的api以屏蔽底層組件互動的細節,
MyDisruptor類的建構式有五個引數,分別是:
- 用戶自定義的事件生產器(EventFactory)
- RingBuffer的容量大小
- 消費者執行器(juc的Executor實作類)
- 生產者型別列舉(指定單執行緒生產者 or 多執行緒生產者)
- 消費者阻塞策略實作(WaitStrategy)
以上都是需要用戶自定義或者指定的核心引數,構建好的disruptor的同時,也生成了RingBuffer和指定型別的生產者序列器,
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
public MyDisruptor(
final MyEventFactory<T> eventProducer,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final MyWaitStrategy myWaitStrategy) {
this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
this.executor = executor;
}
/**
* 獲得當親Disruptor的ringBuffer
* */
public MyRingBuffer<T> getRingBuffer() {
return ringBuffer;
}
// 注意:省略了大量無關代碼
}
EventHandlerGroup
創建好Disruptor后,便可以按照需求編排各種消費者的依賴邏輯了,創建消費者時除了用戶自定義的消費量邏輯介面(EventHandler/WorkHandler),還有兩個關鍵要素需要指定,一是指定是單執行緒生產者還是多執行緒,二是指定當前消費者的上游消費者序列集合(或者沒有),
兩兩組合四種情況,為此Disruptor類一共提供了四個方法用于創建消費者:
- handleEventsWith(創建無上游消費者依賴的單執行緒消費者)
- createEventProcessors(創建有上游消費者依賴的單執行緒消費者)
- handleEventsWithWorkerPool(創建無上游消費者依賴的多執行緒消費者)
- createWorkerPool(創建有上游消費者依賴的多執行緒消費者)
這四個方法的回傳值都是EventHandlerGroup物件,其中提供了關鍵的then/thenHandleEventsWithWorkerPool方法用來鏈式的編排多個消費者組,
實際上disruptor中的EventHandlerGroup還提供了等更多的dsl風格的方法(如and),限于篇幅MyDisruptor中只實作了最關鍵的幾個方法,
/**
* DSL事件處理器組(仿Disruptor.EventHandlerGroup)
* */
public class MyEventHandlerGroup<T> {
private final MyDisruptor<T> disruptor;
private final MyConsumerRepository<T> myConsumerRepository;
private final MySequence[] sequences;
public MyEventHandlerGroup(MyDisruptor<T> disruptor,
MyConsumerRepository<T> myConsumerRepository,
MySequence[] sequences) {
this.disruptor = disruptor;
this.myConsumerRepository = myConsumerRepository;
this.sequences = sequences;
}
@SafeVarargs
public final MyEventHandlerGroup<T> then(final MyEventHandler<T>... myEventHandlers) {
return handleEventsWith(myEventHandlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> thenHandleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) {
return handleEventsWithWorkerPool(handlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) {
return disruptor.createWorkerPool(sequences, handlers);
}
}
MyDisruptor完整代碼
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
public MyDisruptor(
final MyEventFactory<T> eventProducer,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final MyWaitStrategy myWaitStrategy) {
this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
this.executor = executor;
}
/**
* 注冊單執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){
return createEventProcessors(new MySequence[0], myEventHandlers);
}
/**
* 注冊單執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myEventHandlers 用戶自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createEventProcessors(
final MySequence[] barrierSequences,
final MyEventHandler<T>[] myEventHandlers) {
final MySequence[] processorSequences = new MySequence[myEventHandlers.length];
final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
int i=0;
for(MyEventHandler<T> myEventConsumer : myEventHandlers){
final MyBatchEventProcessor<T> batchEventProcessor =
new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier);
processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence();
i++;
// consumer物件都維護起來,便于后續start時啟動
consumerRepository.add(batchEventProcessor);
}
// 更新當前生產者注冊的消費者序列
updateGatingSequencesForNextInChain(barrierSequences,processorSequences);
return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences);
}
/**
* 注冊多執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) {
return createWorkerPool(new MySequence[0], myWorkHandlers);
}
/**
* 注冊多執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myWorkHandlers 用戶自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createWorkerPool(
final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) {
final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers);
// consumer都保存起來,便于start啟動
consumerRepository.add(workerPool);
final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences);
}
private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) {
if (processorSequences.length != 0) {
// 這是一個優化操作:
// 由于新的消費者通過ringBuffer.newBarrier(barrierSequences),已經是依賴于之前ringBuffer中已有的消費者序列
// 消費者即EventProcessor內部已經設定好了老的barrierSequences為依賴,因此可以將ringBuffer中已有的消費者序列去掉
// 只需要保存,依賴當前消費者鏈條最末端的序列即可(也就是最慢的序列),這樣生產者可以更快的遍歷注冊的消費者序列
for(MySequence sequenceV4 : barrierSequences){
ringBuffer.removeConsumerSequence(sequenceV4);
}
for(MySequence sequenceV4 : processorSequences){
// 新設定的就是當前消費者鏈條最末端的序列
ringBuffer.addConsumerSequence(sequenceV4);
}
}
}
/**
* 啟動所有已注冊的消費者
* */
public void start(){
// cas設定啟動標識,避免重復啟動
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Disruptor只能啟動一次");
}
// 遍歷所有的消費者,挨個start啟動
this.consumerRepository.getConsumerInfos().forEach(
item->item.start(this.executor)
);
}
/**
* 獲得當親Disruptor的ringBuffer
* */
public MyRingBuffer<T> getRingBuffer() {
return ringBuffer;
}
}
Disruptor內部消費者依賴編排的性能小優化
- 在上面完整的MyDisruptor實作中可以看到,在每次構建消費者后都執行了updateGatingSequencesForNextInChain這個方法,方法中將當前消費者序列號注冊進RingBuffer的同時,還將傳入的上游barrierSequence集合從當前RingBuffer中移除,
這樣做主要是為了提高生產者在獲取當前最慢消費者時的性能, - 在沒有這個優化之前,所有的消費者的序列號都會被注冊到RingBuffer中,而生產者通過getMinimumSequence方法遍歷所有注冊的消費者序列集合獲得其中最小的序列值(最慢的消費者),
- 我們知道,通過Disruptor的DSL介面創建的消費者之間是存在依賴關系的,每個消費者的實作內部保證了其自身的序列號不會超過上游的消費者序列,所以在存在上下游依賴關系的、所有消費者序列的集合中,最慢的消費者必然是處于下游的消費者序列號,
所以在RingBuffer中就可以不再維護更上游的消費者序列號,從而加快getMinimumSequence方法中遍歷陣列的速度,
MyDisruptorV5版本demo示例
下面通過一個簡單但不失一般性的示例,來展示一下DSL風格API到底簡化了多少復雜度,
不使用DSL風格API的示例
public class MyRingBufferV5DemoOrginal {
/**
* 消費者依賴關系圖(簡單起見都是單執行緒消費者):
* A -> BC -> D
* -> E -> F
* */
public static void main(String[] args) {
// 環形佇列容量為16(2的4次方)
int ringBufferSize = 16;
// 創建環形佇列
MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createSingleProducer(
new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy());
// 獲得ringBuffer的序列屏障(最上游的序列屏障內只維護生產者的序列)
MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
// ================================== 基于生產者序列屏障,創建消費者A
MyBatchEventProcessor<OrderEventModel> eventProcessorA =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
// RingBuffer監聽消費者A的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者B
MySequenceBarrier mySequenceBarrierB = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorB =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerB"), mySequenceBarrierB);
MySequence consumeSequenceB = eventProcessorB.getCurrentConsumeSequence();
// RingBuffer監聽消費者B的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceB);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者C
MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorC =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);
// ================================== 消費者D依賴上游的消費者B,C,通過消費者B、C的序列號創建序列屏障(構成消費的順序依賴)
MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(consumeSequenceB,consumeSequenceC);
// 基于序列屏障,創建消費者D
MyBatchEventProcessor<OrderEventModel> eventProcessorD =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
// RingBuffer監聽消費者D的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者E
MySequenceBarrier mySequenceBarrierE = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorE =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerE"), mySequenceBarrierE);
MySequence consumeSequenceE = eventProcessorE.getCurrentConsumeSequence();
// RingBuffer監聽消費者E的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceE);
// ================================== 通過消費者E的序列號創建序列屏障(構成消費的順序依賴),創建消費者F
MySequenceBarrier mySequenceBarrierF = myRingBuffer.newBarrier(consumeSequenceE);
MyBatchEventProcessor<OrderEventModel> eventProcessorF =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerF"), mySequenceBarrierF);
MySequence consumeSequenceF = eventProcessorF.getCurrentConsumeSequence();
// RingBuffer監聽消費者F的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceF);
Executor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
// 啟動消費者執行緒A
executor.execute(eventProcessorA);
// 啟動消費者執行緒B
executor.execute(eventProcessorB);
// 啟動消費者執行緒C
executor.execute(eventProcessorC);
// 啟動消費者執行緒D
executor.execute(eventProcessorD);
// 啟動消費者執行緒E
executor.execute(eventProcessorE);
// 啟動消費者執行緒F
executor.execute(eventProcessorF);
// 生產者發布100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者發布事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
使用DSL風格APi的示例
public class MyRingBufferV5DemoUseDSL {
/**
* 消費者依賴關系圖(簡單起見都是單執行緒消費者):
* A -> BC -> D
* -> E -> F
* */
public static void main(String[] args) {
// 環形佇列容量為16(2的4次方)
int ringBufferSize = 16;
MyDisruptor<OrderEventModel> myDisruptor = new MyDisruptor<>(
new OrderEventProducer(), ringBufferSize,
new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()),
ProducerType.SINGLE,
new MyBlockingWaitStrategy()
);
MyEventHandlerGroup<OrderEventModel> hasAHandlerGroup = myDisruptor.handleEventsWith(new OrderEventHandlerDemo("consumerA"));
hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerB"),new OrderEventHandlerDemo("consumerC"))
.then(new OrderEventHandlerDemo("consumerD"));
hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerE"))
.then(new OrderEventHandlerDemo("consumerF"));
// 啟動disruptor中注冊的所有消費者
myDisruptor.start();
MyRingBuffer<OrderEventModel> myRingBuffer = myDisruptor.getRingBuffer();
// 生產者發布100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者發布事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
- 可以看到實作同樣的業務邏輯時,使用DSL風格的API由于減少了大量的模板代碼,代碼量大幅減少的同時還增強了程式的可讀性,這證明了disruptor的DSL風格API設計是很成功的,
總結
- 本篇博客介紹了Disruptor的DSL風格的API最核心的實作邏輯,并且通過對比展示了相同業務下DSL風格的API簡單易理解的特點,
- 限于篇幅,自己實作的MyDisruptor中并沒有將disruptor中DSL風格的API功能全部實作,而僅僅實作了最常用、最核心的一部分,
感興趣的讀者可以在理解當前v5版本MyDisruptor的基礎之上,通過閱讀disruptor的原始碼做進一步了解, - 目前v5版本的MyDisruptor已經實作了disruptor的絕大多數功能,最后的v6版本中將會對MyDisruptor中已有的缺陷進行進一步的優化,
v6版本的MyDisruptor將會解決偽共享、優雅終止等關鍵問題并進行對應原理的決議,敬請期待,
disruptor無論在整體設計還是最終代碼實作上都有很多值得反復琢磨和學習的細節,希望能幫助到對disruptor感興趣的小伙伴,
本篇博客的完整代碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab5
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/499312.html
標籤:Java
上一篇:final關鍵字簡介說明
