1、定義多個LinkedBlockingQueue
private List<LinkedBlockingQueue<KpiSendInfo>> createLinkedBlockingQueues() {
List<LinkedBlockingQueue<KpiSendInfo>> kpiInfoQueueList = new ArrayList<>();
for (int index = 0; index < maxPoolSize; index++) {
//10000佇列容量,如果構造時未指定則為Integer.MAX_VALUE
LinkedBlockingQueue<KpiSendInfo> linkedBlockingQueue = new LinkedBlockingQueue<KpiSendInfo>(10000);
kpiInfoQueueList.add(index, linkedBlockingQueue);
}
return kpiInfoQueueList;
}
2、一個執行緒對應一個LinkedBlockingQueu,Queu內訊息順序poll
private void init() {
try {
List<LinkedBlockingQueue<KpiSendInfo>> kpiInfoQueueList = createLinkedBlockingQueues();
KafkaSyncProducer kafkaSyncProduce = producer();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new RecMsgTask(msgManageService, kpiInfoQueueList));
for (int i = 0; i < kpiInfoQueueList.size(); i++) {
taskExecutor.execute(new SendMsgTask(kpiInfoQueueList.get(i), alarmService));
}
} catch (Exception e) {
log.error("AlarmThread init err ...", e);
}
}
3、訊息按ID哈希入不同LinkedBlockingQueu
int mod = (int) Math.floorMod(Math.abs(Id), kpiInfoQueueList.size());
kpiInfoQueueList.get(mod).put(kpiSendInfo);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/212637.html
標籤:其他
