歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
- 快速入門
- Disruptor類分析
- 環形佇列的基礎操作(不用Disruptor類)
- 事件消費知識點小結
- 事件消費實戰
- 常見場景
- 等待策略
- 知識點補充(終篇)
本篇概覽
- 本文是《disruptor筆記》系列的第三篇,主要任務是編碼實作訊息生產和消費,與《disruptor筆記之一:快速入門》不同的是,本次開發不使用Disruptor類,和Ring Buffer(環形佇列)相關的操作都是自己寫代碼實作;
- 這種脫離Disruptor類操作Ring Buffer的做法,不適合用在生產環境,但在學習Disruptor的程序中,這是種高效的學習手段,經過本篇實戰后,在今后使用Disruptor時,您在開發、除錯、優化等各種場景下都能更加得心應手;
- 簡單的訊息生產消費已不能滿足咱們的學習熱情,今天的實戰要挑戰以下三個場景:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
前文回顧
為了完成本篇的實戰,前文《disruptor筆記之二:Disruptor類分析》已做了充分的研究分析,建議觀看,這里簡單回顧以下Disruptor類的幾個核心功能,這也是咱們編碼時要實作的:
- 創建環形佇列(RingBuffer物件)
- 創建SequenceBarrier物件,用于接收ringBuffer中的可消費事件
- 創建BatchEventProcessor,負責消費事件
- 系結BatchEventProcessor物件的例外處理類
- 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
- 啟動獨立執行緒,用來執行消費事件的業務邏輯
- 理論分析已經完成,接下來開始編碼;
原始碼下載
- 本篇實戰中的完整原始碼可在GitHub下載到,地址和鏈接資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 鏈接 | 備注 |
|---|---|---|
| 專案主頁 | https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
- 這個git專案中有多個檔案夾,本次實戰的原始碼在disruptor-tutorials檔案夾下,如下圖紅框所示:

- disruptor-tutorials是個父工程,里面有多個module,本篇實戰的module是low-level-operate,如下圖紅框所示:

開發
- 進入編碼階段,今天的任務是挑戰以下三個場景:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
- 咱們先把工程建好,然后撰寫公共代碼,例如事件定義、事件工廠等,最后才是每個場景的開發;
- 在父工程disruptor-tutorials新增名為low-level-operate的module,其build.gradle如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
- 然后是springboot啟動類:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LowLevelOperateApplication {
public static void main(String[] args) {
SpringApplication.run(LowLevelOperateApplication.class, args);
}
}
- 事件類,這是事件的定義:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class StringEvent {
private String value;
}
- 事件工廠,定義如何在記憶體中創建事件物件:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
- 事件生產類,定義如何將業務邏輯的事件轉為disruptor事件發布到環形佇列,用于消費:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class StringEventProducer {
// 存盤資料的環形佇列
private final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String content) {
// ringBuffer是個佇列,其next方法回傳的是下最后一條記錄之后的位置,這是個可用位置
long sequence = ringBuffer.next();
try {
// sequence位置取出的事件是空事件
StringEvent stringEvent = ringBuffer.get(sequence);
// 空事件添加業務資訊
stringEvent.setValue(content);
} finally {
// 發布
ringBuffer.publish(sequence);
}
}
}
- 事件處理類,收到事件后具體的業務處理邏輯:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {
public StringEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實作類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這里延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 定義一個介面,外部通過呼叫介面的方法來生產訊息,再放幾個常量在里面后面會用到:
package com.bolingcavalry.service;
public interface LowLevelOperateService {
/**
* 消費者數量
*/
int CONSUMER_NUM = 3;
/**
* 環形緩沖區大小
*/
int BUFFER_SIZE = 16;
/**
* 發布一個事件
* @param value
* @return
*/
void publish(String value);
/**
* 回傳已經處理的任務總數
* @return
*/
long eventCount();
}
- 以上就是公共代碼了,接下來逐個實作之前規劃的三個場景;
100個事件,單個消費者消費
- 這是最簡單的功能了,實作發布訊息和單個消費者消費的功能,代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong();
private ExecutorService executors;
@PostConstruct
private void init() {
// 準備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數列印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
// 創建環形佇列實體
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
// 準備執行緒池
executors = Executors.newFixedThreadPool(1);
//創建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// 創建事件處理的作業類,里面執行StringEventHandler處理事件
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
sequenceBarrier,
new StringEventHandler(eventCountPrinter));
// 將消費者的sequence傳給環形佇列
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// 在一個獨立執行緒中取事件并消費
executors.submit(batchEventProcessor);
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代碼有以下幾處需要注意:
- 自己創建環形佇列RingBuffer實體
- 自己準備執行緒池,里面的執行緒用來獲取和消費訊息
- 自己動手創建BatchEventProcessor實體,并把事件處理類傳入
- 通過ringBuffer創建sequenceBarrier,傳給BatchEventProcessor實體使用
- 將BatchEventProcessor的sequence傳給ringBuffer,確保ringBuffer的生產和消費不會出現混亂
- 啟動執行緒池,意味著BatchEventProcessor實體在一個獨立執行緒中不斷的從ringBuffer中獲取事件并消費;
- 為了驗證上述代碼能否正常作業,我這里寫了個單元測驗類,如下所示,邏輯很簡單,呼叫OneConsumerServiceImpl.publish方法一百次,產生一百個事件,再檢查OneConsumerServiceImpl記錄的消費事件總數是不是等于一百:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {
@Autowired
@Qualifier("oneConsumer")
LowLevelOperateService oneConsumer;
private static final int EVENT_COUNT = 100;
private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
for(int i=0;i<eventCount;i++) {
log.info("publich {}", i);
service.publish(String.valueOf(i));
}
// 異步消費,因此需要延時等待
Thread.sleep(10000);
// 消費的事件總數應該等于發布的事件數
assertEquals(expectEventCount, service.eventCount());
}
@Test
public void testOneConsumer() throws InterruptedException {
log.info("start testOneConsumerService");
testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
}
- 注意,如果您是直接在IDEA上點擊圖示來執行單元測驗,記得勾選下圖紅框中選項,否則可能出現編譯失敗:

- 執行上述單元測驗類,結果如下圖所示,訊息的生產和消費都符合預期,并且消費邏輯是在獨立執行緒中執行的:

- 繼續挑戰下一個場景;
100個事件,三個消費者,每個都獨自消費這個100個事件
- 這個場景在kafka中也有,就是三個消費者的group不同,這樣每一條訊息,這兩個消費者各自消費一次;
- 因此,100個事件,3個消費者每人都會獨立消費這100個事件,一共消費300次;
- 代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong();
/**
* 生產一個BatchEventProcessor實體,并且啟動獨立執行緒開始獲取和消費訊息
* @param executorService
*/
private void addProcessor(ExecutorService executorService) {
// 準備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數列印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
ringBuffer.newBarrier(),
new StringEventHandler(eventCountPrinter));
// 將當前消費者的sequence實體傳給ringBuffer
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// 啟動獨立執行緒獲取和消費事件
executorService.submit(batchEventProcessor);
}
@PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
// 創建多個消費者,并在獨立執行緒中獲取和消費事件
for (int i=0;i<CONSUMER_NUM;i++) {
addProcessor(executorService);
}
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
-
上述代碼和前面的OneConsumerServiceImpl相比差別不大,主要是創建了多個BatchEventProcessor實體,然后分別在執行緒池中提交;
-
驗證方法依舊是單元測驗,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testLowLevelOperateService的第三個引數是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示預期的被消費訊息數為300:
@Autowired
@Qualifier("multiConsumer")
LowLevelOperateService multiConsumer;
@Test
public void testMultiConsumer() throws InterruptedException {
log.info("start testMultiConsumer");
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
}
- 執行單元測驗,如下圖所示,一共消費了300個事件,并且三個消費者在不動執行緒:

100個事件,三個消費者共同消費這個100個事件
-
本篇的最后一個實戰是發布100個事件,然后讓三個消費者共同消費100個(例如A消費33個,B消費33個,C消費34個);
-
前面用到的BatchEventProcessor是用來獨立消費的,不適合多個消費者共同消費,這種多個消費共同消費的場景需要借助WorkerPool來完成,這個名字還是很形象的:一個池子里面有很多個作業者,把任務放入這個池子,作業者們每人處理一部分,大家合力將任務完成;
-
傳入WorkerPool的消費者需要實作WorkHandler介面,于是新增一個實作類:
package com.bolingcavalry.service;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {
public StringWorkHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實作類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(StringEvent event) throws Exception {
log.info("work handler event : {}", event);
// 這里延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 新增服務類,實作共同消費的邏輯,有幾處要注意的地方稍后會提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong();
@PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];
// 創建多個StringWorkHandler實體,放入一個陣列中
for (int i=0;i < CONSUMER_NUM;i++) {
handlers[i] = new StringWorkHandler(o -> {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
});
}
// 創建WorkerPool實體,將StringWorkHandler實體的陣列傳進去,代表共同消費者的數量
WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);
// 這一句很重要,去掉就會出現重復消費同一個事件的問題
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executorService);
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代碼中,要注意的有以下兩處:
-
StringWorkHandler陣列傳入給WorkerPool后,每個StringWorkHandler實體都放入一個新的WorkProcessor實體,WorkProcessor實作了Runnable介面,在執行workerPool.start時,會將WorkProcessor提交到執行緒池中;
-
和前面的獨立消費相比,共同消費最大的特點在于只呼叫了一次ringBuffer.addGatingSequences方法,也就是說三個消費者共用一個sequence實體;
- 驗證方法依舊是單元測驗,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testWorkerPoolConsumer的第三個引數是EVENT_COUNT,表示預期的被消費訊息數為100:
@Autowired
@Qualifier("workerPoolConsumer")
LowLevelOperateService workerPoolConsumer;
@Test
public void testWorkerPoolConsumer() throws InterruptedException {
log.info("start testWorkerPoolConsumer");
testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
}
- 執行單元測驗如下圖所示,三個消費者一共消費100個事件,且三個消費者在不同執行緒:

- 至此,咱們在不用Disruptor類的前提下完成了三種常見場景的訊息生產消費,相信您對Disruptor的底層實作也有了深刻認識,今后不論是使用還是優化Disruptor,一定可以更加得心應手;
你不孤單,欣宸原創一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 資料庫+中間件系列
- DevOps系列
歡迎關注公眾號:程式員欣宸
微信搜索「程式員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/303152.html
標籤:Java
