測驗環境是window7系統,disruptor3.4.2,jdk為1.8。
單生產者、10個消費者,處理1000000個資料
測驗代碼:
1.disruptor:
//DisruptorTest.java
package iot.cmcc.test.disruptor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
public class DisruptorTest {
private static Long time = System.currentTimeMillis();
public static void main(String[] args) {
RingBuffer<TradeTransaction> ringBuffer;
Producer producer = null;
// 創建緩沖池
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 創建工廠
EventFactory<TradeTransaction> factory = new EventFactory<TradeTransaction>() {
@Override
public TradeTransaction newInstance() {
return new TradeTransaction();
}
};
// 創建bufferSize ,也就是RingBuffer大小,必須是2的N次方
int ringBufferSize = 1024 * 1024; //
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
// 創建ringBuffer
ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, ringBufferSize, YIELDING_WAIT);
SequenceBarrier barriers = ringBuffer.newBarrier();
// 創建10個消費者來處理同一個生產者發的訊息(這10個消費者不重復消費訊息)
// Consumer[] consumers = new Consumer[10];
WorkHandler<TradeTransaction>[] workHandlers = new WorkHandler[10];
for (int i = 0; i < workHandlers.length; i++) {
workHandlers[i] = new Consumer();
}
WorkerPool<TradeTransaction> workerPool = new WorkerPool<TradeTransaction>(ringBuffer, barriers,
new IgnoreExceptionHandler(), workHandlers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executor);
producer = new Producer(ringBuffer);
for (int i = 0; i < 1000000; i++) {
producer.onData(time);
}
// executor.shutdown();
}
}
//TradeTransaction.java
package iot.cmcc.test.disruptor;
public class TradeTransaction {
public Long value;
public String seq;
public String getSeq() {
return seq;
}
public void setSeq(String seq) {
this.seq = seq;
}
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
//Producer.java
package iot.cmcc.test.disruptor;
import com.lmax.disruptor.RingBuffer;
public class Producer {
private RingBuffer<TradeTransaction> ringBuffer;
public Producer(RingBuffer<TradeTransaction> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(Long time) {
// 可以把ringBuffer看做一個事件佇列,那么next就是得到下面一個事件槽
long sequence = ringBuffer.next();
ringBuffer.get(sequence).setValue(time);
ringBuffer.get(sequence).setSeq(sequence + "");
ringBuffer.publish(sequence);
}
}
//Consumer.java
package iot.cmcc.test.disruptor;
import com.lmax.disruptor.WorkHandler;
public class Consumer implements WorkHandler<TradeTransaction> {
@Override
public void onEvent(TradeTransaction event) throws Exception {
// TODO Auto-generated method stub
System.out.println("消費者C1消費了一條訊息:" + event.getSeq());
System.out.println("花費時間 :" + (System.currentTimeMillis() - event.getValue()));
}
}
//IntEventExceptionHandler.java
package iot.cmcc.test.disruptor;
import org.apache.log4j.Logger;
public class IntEventExceptionHandler {
private static final Logger logger = Logger.getLogger(IntEventExceptionHandler.class);
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("handleEventException", ex);
}
public void handleOnStartException(Throwable ex) {
logger.error("handleOnStartException", ex);
}
public void handleOnShutdownException(Throwable ex) {
logger.error("handleOnShutdownException", ex);
}
}
2.jdk的LinkedBlockingQueue佇列
//jdkTest.java
package iot.cmcc.test.jdkseq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class jdkTest {
private static Long time = System.currentTimeMillis();
public static void main(String[] args) {
// Creating BlockingQueue of size 10
BlockingQueue<TradeTransaction> queue = new LinkedBlockingQueue<>(10000);
Producer producer = new Producer(queue);
producer.setTime(time);
Consumer consumer = new Consumer(queue);
// starting producer to produce Strings in queue
new Thread(producer).start();
// starting consumer to consume Strings from queue
for (int i = 0; i < 10; i++) {
new Thread(consumer).start();
}
System.out.println("Producer and Consumer has been started");
}
}
//TradeTransaction.java
package iot.cmcc.test.jdkseq;
public class TradeTransaction {
public Long value;
public String seq;
public String getSeq() {
return seq;
}
public void setSeq(String seq) {
this.seq = seq;
}
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
//Producer.java
package iot.cmcc.test.jdkseq;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<TradeTransaction> queue;
public Producer(BlockingQueue<TradeTransaction> q) {
this.queue = q;
}
private static Long time;
public static Long getTime() {
return time;
}
public static void setTime(Long time) {
Producer.time = time;
}
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
try {
TradeTransaction msg = new TradeTransaction();
msg.setValue(getTime());
msg.setSeq(i + "");
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//Consumer.java
package iot.cmcc.test.jdkseq;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue<TradeTransaction> queue;
public Consumer(BlockingQueue<TradeTransaction> q) {
this.queue = q;
}
@Override
public void run() {
while (true) {
try {
String msg;
TradeTransaction tt = queue.take();
Long l = tt.getValue();
System.out.println("消費者消費了一條訊息 :" + tt.getSeq());
System.out.println("花費時間 :" + (System.currentTimeMillis() - l));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
////////////////////////////////////////////////////////////
最后的結果是:jdk的LinkedBlockingQueue比disruptor快將近4倍。這個結果跟預期差很遠,是我的測驗代碼有問題嗎?
uj5u.com熱心網友回復:
不是, 是因為佇列要有一個預熱轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/46679.html
標籤:其他技術討論專區
上一篇:搭建HA的時候,zkfc初始化報錯,有沒有大佬知道怎么辦
下一篇:搶紅包演算法(公平版和手速版)
