一、簡介
- 1、發布訂閱
SUBSCRIBE, UNSUBSCRIBE 和 PUBLISH 實作了 發布/訂閱訊息范例,發送者 (publishers) 不用編程就可以向特定的接受者發送訊息 (subscribers). Rather, 發布的訊息進入通道,不需要知道有沒有訂閱者. 訂閱者發表感興趣的一個或多個通道,并且只接受他們感興趣的訊息,不管發布者是不是存在. 發布者和訂閱者的解耦可以允許更大的伸縮性和更多動態的網路拓撲, - 2、說明
本篇文章是繼:
【SpringBoot】三十四、SpringBoot整合Redis實作序列化存盤Java物件
以及
【SpringBoot】三十五、SpringBoot整合Redis監聽Key過期事件
其中涉及到的知識及代碼,本篇文章不再進行贅述
二、注入訊息發布/訂閱
- 1、添加訊息監聽器
/**
* 訊息監聽
* <p>
* 可以傳入多個 MessageListenerAdapter
*/
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 監聽所有庫的key過期事件
container.setConnectionFactory(connectionFactory);
// 可以添加多個 messageListener,配置不同的通道
container.addMessageListener(messageListenerAdapter, new PatternTopic("user"));
return container;
}
所有的訂閱訊息,都需要在這里進行注冊系結,new PatternTopic(“user”),表示發布的主題資訊
- 小插曲
前面我們學習了監聽 key 過期事件,如果我們只需要監聽當前庫的 key 過期事件,可以這樣寫:
@Value("${spring.redis.database}")
public String redisDatabaseIndex;
先拿到我們專案中使用的 Redis 的庫索引
// 監聽當前庫的key過期
container.addMessageListener(messageListenerAdapter, new PatternTopic("__keyevent@" + redisDatabaseIndex + "__:expired"));
然后使用發布/訂閱模式,訂閱主題為:keyevent@0:expired 的訊息,則表示訂閱資料庫索引為 0 的 key 過期事件,監聽所有的庫則為:keyevent@*:expired
- 2、系結訊息處理器
/**
* 訊息監聽器配接器,系結訊息處理器
* <p>
* 可以配置多個 listenerAdapter,監聽不同的通道
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
也就是說,當我們訂閱的頻道,當有訊息進來時,指定它的處理類以及處理方法
三、注入訊息處理器
上面我們已經注入了 RedisMessageListener 訊息處理器,并指定了處理方法 onMessage(),代碼如下:
package com.zyxx.common.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* Redis 訊息接收
*
* @Author Lizhou
**/
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 接收的topic
log.info("channel:" + new String(pattern));
// 訊息的POJO
log.info("message:" + message.toString());
}
}
需要實作 MessageListener 介面,重寫 onMessage() 方法,然后就可以獲取到通道以及訊息了,從而進行我們的一些業務邏輯處理
四、操作API
在 RedisUtils 中,我們增加一個操作方法
/**
* 向通道發布訊息
*/
public boolean convertAndSend(String channel, Object message) {
if (StringUtils.isBlank(channel)) {
return false;
}
try {
template.convertAndSend(channel, message);
log.info("發送訊息成功,channel:{},message:{}", channel, message);
return true;
} catch (Exception e) {
log.info("發送訊息失敗,channel:{},message:{}", channel, message);
e.printStackTrace();
}
return false;
}
這里的 channel 相當于 我們存入資料的時候的 key,如果該通道不存在,則會新建一個通道
五、測驗
- 1、測驗用例
package com.zyxx.redistest;
import com.zyxx.redistest.common.RedisUtils;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisTestApplicationTests {
@Autowired
private RedisUtils redisUtil;
@Test
void contextLoads() {
String message = "Hello World!";
// 發送訊息
redisUtil.convertAndSend("user", message);
}
}
我們向通道 user 發送了一條 “Hello World!” 的訊息
- 2、測驗結果

可以看出,我們的訊息發送成功,再看控制臺

我們接收到通道 user 發送了一條 “Hello World!” 的訊息
如您在閱讀中發現不足,歡迎留言!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/189912.html
標籤:其他
