一、背景
單機節點下,WebSocket連接成功后,可以直接發送訊息,而多節點下,連接時通過nginx會代理到不同節點,
假設一開始用戶連接了node1的socket服務,觸發訊息發送的條件的時候也通過nginx進行代理,假如代理轉到了node2節點上,那么node2節點的socket服務就發送不了訊息,因為一開始用戶注冊的是node1節點,這就導致了訊息發送失敗,

為了解決這一方案,訊息發送時,就需要一個中間件來記錄,這樣,三個節點都可以獲取訊息,然后在根據條件進行訊息推送,
二、解決方案(springboot 基于 Redis發布訂閱)
1、依賴
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、創建業務處理類 Demo.class,該類可以實作MessageListener介面后重寫onMessage方法,也可以不實作,自己寫方法,
import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
* @program:
* @description: redis訊息訂閱-業務處理
* @author: zhang yi
* @create: 2021-01-25 16:46
*/
@Component
public class Demo implements MessageListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(Message message, byte[] pattern) {
logger.info("訊息訂閱成功---------");
logger.info("內容:"+message.getBody());
logger.info("交換機:"+message.getChannel());
}
}
3、創建PubSubConfig配置類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @program:
* @description: redis發布訂閱配置
* @author: zhang yi
* @create: 2021-01-25 16:49
*/
@Configuration
@EnableCaching
public class PubSubConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
//如果是多個交換機,則引數為(RedisConnectionFactory connectionFactory,
// MessageListenerAdapter listenerAdapter,
// MessageListenerAdapter listenerAdapter2)
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多個 messageListener,配置不同的交換機
container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
//container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
return container;
}
/**
* 訊息監聽器配接器,系結訊息處理器,利用反射技術呼叫訊息處理器的業務方法
* @param demo 第一步的業務處理類
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(Demo demo) {
logger.info("----------------訊息監聽器加載成功----------------");
// onMessage 就是方法名,基于反射呼叫
return new MessageListenerAdapter(demo, "onMessage");
}
/**
* 多個交換機就多寫一個
* @param subCheckOrder
* @return
*/
//@Bean
//MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
// logger.info("----------------訊息監聽器加載成功----------------");
// return new MessageListenerAdapter(subCheckOrder, "onMessage");
//}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
4、訊息發布
@Autowired
private RedisTemplate<String, Object> redisTemplate;
redisTemplate.convertAndSend("channel:demo", "我是內容");
三、具體用法
- socket連接成功,
- socket訊息推送時,把資訊發布到redis中,
- socket服務訂閱redis的訊息,訂閱成功后進行推送,集群下的socket都能訂閱到訊息,但是只有之前連接成功的節點能推送成功,其余的無法推送,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/253084.html
標籤:java
下一篇:一小時教會你單執行緒爬取微博熱搜
