package com.dong.mytest.demo.client; import cn.hutool.extra.spring.SpringUtil; import com.dong.mytest.demo.common.dto.DelayMessage; import com.dong.mytest.demo.common.util.DateUtil; import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author dong */ @Slf4j @Component public class RedissonDelayQueueClient implements InitializingBean { @Resource private RedissonClient redissonClient; private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16); public void addDelayMessage(DelayMessage delayMessage) { log.info("delayMessage={}", delayMessage); if (delayQueueMap.get(delayMessage.getQueueName()) == null) { log.warn("queueName={},該延遲佇列不存在,請確認后再試...", delayMessage.getQueueName()); return; } delayMessage.setCreateTime(DateUtil.getNowFormatStr()); RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName()); rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit()); } @Override public void afterPropertiesSet() throws Exception { // 有新的延遲佇列在這里添加,佇列消費類需要繼承DelayQueueConsumer,并且service名稱為 ${queueName}Consumer List<String> queueNameList = Lists.newArrayList("orderAutoCancelDelayQueue"); // 加載延遲佇列 for (String queueName : queueNameList) { DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueName + "Consumer"); if (delayQueueConsumer == null) { throw new RuntimeException("queueName=" + queueName + ",delayQueueConsumer=null,請檢查配置..."); } // Redisson的延時佇列是對另一個佇列的再包裝,使用時要先將延時訊息添加到延時佇列中,當延時佇列中的訊息達到設定的延時時間后,該延時訊息才會進行進入到被包裝佇列中,因此,我們只需要對被包裝佇列進行監聽即可, RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueName); RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue); delayQueueMap.put(queueName, rDelayedQueue); // 訂閱新元素的到來,呼叫的是takeAsync(),異步執行 rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute); } } }
package com.dong.mytest.demo.common.dto; import com.alibaba.fastjson.JSON; import lombok.Data; import java.io.Serializable; import java.util.concurrent.TimeUnit; /** * @author dong */ @Data public class DelayMessage implements Serializable { private String queueName; private Long delayTime; private TimeUnit timeUnit; private String msgBody; private String createTime; @Override public String toString() { return JSON.toJSONString(this); } }
package com.dong.mytest.demo.service.delayqueue; import com.dong.mytest.demo.common.dto.DelayMessage; /** * @author dong */ public interface DelayQueueConsumer { /** * 執行延遲訊息 * * @param delayMessage delayMessage */ void execute(DelayMessage delayMessage); }
package com.dong.mytest.demo.service.delayqueue.impl; import com.dong.mytest.demo.common.dto.DelayMessage; import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * @author dong */ @Service("orderAutoCancelDelayQueueConsumer") @Slf4j public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer { @Override public void execute(DelayMessage delayMessage) { log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage); } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/413687.html
標籤:Java
