作為一名程式員,作業也7、8年了,這是我第一次寫文章,實在是慚愧,之前因作業需要一直都是百度、google,而沒有想過靜下心來自己研究原理、原始碼,導致現在還是一只菜鳥,,,
好了,言歸正傳,本次實踐的話是基于nacos作為配置、注冊中心的,各位大佬可以去了解一下nacos的官網,雖然最近這個組件爆出了漏洞,但不可否認其功能還真的是很強大的,附上官網地址:https://nacos.io/zh-cn/
專案使用jdk1.8進行開發,使用maven做jar包管理,分了很多模塊子專案,包括基礎服務的common模塊(Redis、MongoDB、RocketMQ組件集成,UUID、DateUtil、加解密工具類等),服務的提供者demo-provider,服務的消費者demo-consumer,websocket服務等,demo專案的結構如下:

所有子模塊的Parent專案的POM資訊如下,雖然我參考了zipkin和sleuth,但還未實作相關功能,后續有集成了,我再發文章出來,各位不需要的,可以自行洗掉:
ParentPOM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>XX.XX</groupId> <artifactId>platform</artifactId> <version>0.0.1-SNAPSHOT</version> <name>platform</name> <description>XX中臺服務</description> <packaging>pom</packaging> <properties> <spring-boot.version>2.0.4.RELEASE</spring-boot.version> <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> </properties> <modules> <module>common</module> <module>gateway</module> <module>servers/demo-provider</module> <module>servers/demo-consumer</module> <module>servers/taskinput</module> <module>servers/imchat</module> </modules> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-starter-security</artifactId>--> <!-- </dependency>--> <!-- 引入Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
如下是GateWay內容:
POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>XX.XX</groupId> <artifactId>platform</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>XX.XX.platform</groupId> <artifactId>gateway</artifactId> <version>0.0.1-SNAPSHOT</version> <name>gateway</name> <description>XX中臺服務網關</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Finchley.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-ribbon</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>0.2.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>0.2.1.RELEASE</version> </dependency> <dependency> <groupId>XX.XX.platform</groupId> <artifactId>common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <finalName>../../docker/XX-gateway</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.smartgis.GatewayStarterApplication</mainClass> <layout>ZIP</layout> </configuration> <executions> <execution> <goals> <goal>repackage</goal><!--可以把依賴的包都打包到生成的Jar包中--> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
GateWay組態檔資訊(application.yml):
server: port: 8001 spring: application: name: demo-gateway cloud: nacos: discovery: namespace: 0ee7047e-90db-45d9-a85c-b3aa206aba5b server-addr: 127.0.0.1:8848 gateway: # 此處不注掉,會導致gateway轉發websocket服務的時候出現自動斷開的情況,報504錯誤 # default-filters: # - name: Hystrix # args: # name: myfallback # fallbackUri: forward:/defaultfallback globalcors: corsConfigurations: '[/**]': allowedHeaders: "*" allowedOrigins: "*" allowedMethods: "*" # 此處不注掉,會導致gateway轉發websocket服務的時候出現自動斷開的情況,報504錯誤 # discovery: # locator: # enabled: true # 開啟從注冊中心動態創建路由的功能,利用微服務名稱進行路由 routes: - id: taskinput uri: lb://taskinput order: 0 predicates: - Path=/taskinput/** filters: - StripPrefix=1 #去掉前綴,具體實作參考StripPrefixGatewayFilterFactory - AddResponseHeader=X-Response-Default-Foo, Default-Bar - name: Hystrix args: name: myfallback fallbackUri: forward:/defaultfallback - id: demo-provider uri: lb://demo-provider order: 0 predicates: - Path=/provider/** filters: - StripPrefix=1 #去掉前綴,具體實作參考StripPrefixGatewayFilterFactory - AddResponseHeader=X-Response-Default-Foo, Default-Bar - id: demo-consumer uri: lb://demo-consumer order: 0 predicates: - Path=/consumer/** filters: - StripPrefix=1 #去掉前綴,具體實作參考StripPrefixGatewayFilterFactory - AddResponseHeader=X-Response-Default-Foo, Default-Bar #實際的超時時間是(ReadTimeout+ConnectTimeout)*(MaxAutoRetries+1)*(MaxAutoRetriesNextServer+1) #如果MaxAutoRetries和MaxAutoRetriesNextServer都設為0,那么實際超時就是(ReadTimeout+ConnectTimeout)了 - id: imchat uri: lb:ws://imchat predicates: - Path=/ws/** filters: - StripPrefix=1 # - id: imchat # uri: lb://imchat # predicates: # - Path=/ws/info/** # - id: imchat # uri: lb:ws://imchat # predicates: # - Path=/ws/** - id: bulletscreen uri: lb://bullet predicates: - Path=/bullet/info/** - id: bulletscreen uri: lb:ws://bullet predicates: - Path=/bullet/** ribbon: eureka: enabled: false eager-load: enabled: true #饑餓加載,系統啟動時創建好ribbon客戶端而不是在使用時去創建 ConnectTimeout: 2000 #單位ms,請求連接超時時間 ReadTimeout: 4000 #單位ms,請求處理的超時時間 OkToRetryOnAllOperations: false #對所有操作請求都進行重試 MaxAutoRetriesNextServer: 0 #切換實體的重試次數 MaxAutoRetries: 0 #對當前實體的重試次數 ServerListRefreshInterval: 2000 #Interval to refresh the server list from the source NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RetryRule management: endpoints: web: exposure: include: refresh
Hystrix斷路器:
import XX.XX.platform.common.base.Result; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class DefaultHystrixController { @RequestMapping("/defaultfallback") @ResponseBody public Result defaultFallBack() { return Result.createFail("服務不可用,請稍后重試,"); } }
啟動類:
@SpringBootApplication @EnableDiscoveryClient @RefreshScope public class GatewayApplication { public static void main(String[] args) { SpringApplication.run(GatewayApplication.class, args); } }
如下是IMChat服務:
為啥叫IMChat不叫websocket呢,因為最初我想到websocket服務,是想要搭建IM服務的,然后結合MQ服務,我想著可以通過websocket服務進行前后端的僑聯,然后訊息通過MQ進行轉發,達到分布式的目的,從demo服務來看,這種想法似乎可行,但后續有啥問題,因為我能力有限,還不清楚,請各位大佬指教
POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>XX.XX</groupId> <artifactId>platform</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>XX.XX.platform</groupId> <artifactId>imchat</artifactId> <version>0.0.1-SNAPSHOT</version> <name>imchat</name> <description>聊天/推送服務</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>0.2.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>0.2.1.RELEASE</version> </dependency> <!--web 組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>XX.XX.platform</groupId> <artifactId>common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <finalName>../../docker/imchat-server</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
組態檔application.yml
server: port: 1999 spring: application: name: imchat cloud: nacos: discovery: namespace: 0ee7047e-90db-45d9-a85c-b3aa206aba5b server-addr: 127.0.0.1:8848 data: mongodb: uri: mongodb://root:root@XX.XX.XX.XX:27017/bhplatform?maxpoolsize=10&minpoolsize=1&maxidletimems=600000&maxlifetimems=1800000 cache: redis: time-to-live: 60s type: redis #redis配置 redis: cluster: nodes: 127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381,127.0.0.1:6382,127.0.0.1:6383,127.0.0.1:6384 timeout: 6000ms lettuce: pool: max-active: 8 max-wait: -1ms max-idle: 8 min-idle: 0 database: 0 jackson: time-zone: GMT+8 apache: rocketmq: consumer: pushConsumer: XXPushConsumer producer: producerGroup: XX namesrvAddr: 127.0.0.1:9876 logging: path: /java-log level: root: info management: endpoints: web: exposure: include: refresh
MyWebSocketHandler
import com.alibaba.fastjson.JSONObject; import XX.XX.common.DateUtil; import XX.XX.common.base.BaseCode; import XX.XX.common.cache.redis.RedisService; import XX.XX.entity.IM.IMChatMessage; import XX.XX.entity.IM.IMChatRequest; import XX.XX.entity.IM.IMChatResponse; import XX.XX.common.mq.producer.Producer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.socket.*; import com.XX.XX.taskinput.entity.UserVO; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Service public class MyWebSocketHandler implements WebSocketHandler { @Autowired private RedisService redisService; @Autowired private Producer rocketProducer; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; private static final ConcurrentHashMap map = new ConcurrentHashMap(); @Override public void afterConnectionEstablished(WebSocketSession session) { log.info("connect websocket successful!"); IMChatMessage chat = new IMChatMessage(); chat.setGmtCreat(DateUtil.getCurrentDateTime()); chat.setData(session.getId()); chat.setType(BaseCode.SINGLELOGIN); String userid = session.getAttributes().get(BaseCode.USERID).toString(); log.info(userid); chat.setFrom(userid); sendMessageToUser(userid, chat); WebSocketSession sessiono = (WebSocketSession) map.get(userid + BaseCode.SESSION); if (sessiono != null) { log.info("close original session-start"); try { sessiono.close(); } catch (IOException e) { log.info("close original session failed"); } } DefaultMQPushConsumer consumero = (DefaultMQPushConsumer) map.get(userid + BaseCode.CONSUMER); if (consumero != null) { log.info("close original consumer-start"); consumero.shutdown(); } //todo 開啟執行緒,讀取群和好友訊息 //需要執行的代碼 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(userid); consumer.setNamesrvAddr(namesrvAddr); try { // 訂閱PushTopic下Tag為push的訊息,都訂閱訊息 consumer.subscribe(BaseCode.IMCHAT, userid); // 程式第一次啟動從訊息佇列頭獲取資料 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //可以修改每次消費訊息的數量,默認設定是每次消費一條 consumer.setConsumeMessageBatchMaxSize(1); //在此監聽中消費資訊,并回傳消費的狀態資訊 consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> { if (!session.isOpen()) { consumer.shutdown(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } for (MessageExt messageExt : list) { try { session.sendMessage(new TextMessage(new String(messageExt.getBody(), "UTF-8"))); log.info("消費回應:MsgId:" + messageExt.getMsgId() + ",msgBody:" + new String(messageExt.getBody(), "UTF-8") + ",tag:" + messageExt.getTags() + ",topic:" + messageExt.getTopic()); } catch (IOException e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; }); log.info("start consumer"); consumer.start(); map.put(userid + BaseCode.CONSUMER, consumer); map.put(userid + BaseCode.SESSION, session); } catch (Exception e) { e.printStackTrace(); log.info("start consumer exception"); } } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message){ log.info("handle message start"); try { IMChatRequest chat = JSONObject.parseObject((String) message.getPayload(), IMChatRequest.class); UserVO user = (UserVO) redisService.get(session.getAttributes().get(BaseCode.TOKEN).toString()); if (user == null) { session.sendMessage(new TextMessage(new IMChatResponse(BaseCode.FAILCODE, BaseCode.FAILSSMSG, "token is invalid").toString())); session.close(); } else { user.setToken(""); IMChatMessage chatMessage = new IMChatMessage(); chatMessage.setGmtCreat(DateUtil.getCurrentDateTime()); chatMessage.setData(chat.getData()); chatMessage.setType(chat.getType()); chatMessage.setFrom(user); if (chat.getTouserid().length > 0) { for (String toid : chat.getTouserid()) { sendMessageToUser(toid, chatMessage); } } } } catch (Exception e) { log.error("e", e); } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.info("handle message start"); if (session.isOpen()) { session.close(); } log.error("connect error", exception); String userid = session.getAttributes().get(BaseCode.USERID).toString(); DefaultMQPushConsumer consumer = (DefaultMQPushConsumer) map.get(userid + BaseCode.CONSUMER); consumer.shutdown(); map.remove(userid + BaseCode.CONSUMER); map.remove(userid + BaseCode.SESSION); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus){ log.error("connection closed: " + closeStatus); String userid = session.getAttributes().get(BaseCode.USERID).toString(); DefaultMQPushConsumer consumer = (DefaultMQPushConsumer) map.get(userid + BaseCode.CONSUMER); consumer.shutdown(); map.remove(userid + BaseCode.CONSUMER); map.remove(userid + BaseCode.SESSION); } @Override public boolean supportsPartialMessages() { return false; } /** * 發送資訊給指定用戶 * * @param clientId * @param message * @return */ public boolean sendMessageToUser(String clientId, IMChatMessage message) { log.info("to userid:" + clientId + ",tomessage:" + message.toString()); rocketProducer.sendMsg(BaseCode.IMCHAT, clientId, message.toString()); return true; } }
WebSocketInterceptor
import XX.XX.common.base.BaseCode; import XX.XX.platform.common.cache.redis.RedisService; import XX.XX.platform.taskinput.entity.UserVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.stereotype.Service; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; @Slf4j @Service public class WebSocketInterceptor implements HandshakeInterceptor { @Autowired private RedisService redisService; //在握手之前執行該方法, 繼續握手回傳true, 中斷握手回傳false. 通過attributes引數設定WebSocketSession的屬性 @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes){ if (request instanceof ServletServerHttpRequest) { String uri = request.getURI().toString(); String token = uri.substring(uri.lastIndexOf("/")+1); UserVO user = (UserVO) redisService.get(token); log.info("current token is:"+token); if(user==null) { return false; } attributes.put(BaseCode.TOKEN,token); attributes.put(BaseCode.USERID,user.getUserid()); log.info(user.getUserid()); } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { log.info("coming webSocketInterceptor afterHandshake method..."); } }
WebSocketConfig
import XX.XX.platform.imchat.interceptor.WebSocketInterceptor; import XX.XX.platform.imchat.listener.MyWebSocketHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; /** * 首先注入一個ServerEndpointExporterBean,該Bean會自動注冊使用@ServerEndpoint注解申明的websocket endpoint */ @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private MyWebSocketHandler myWebSocketHandler; @Autowired private WebSocketInterceptor webSocketInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myWebSocketHandler, "/imchat/{TOKEN}") .setAllowedOrigins("*") .addInterceptors(webSocketInterceptor); } }
ImchatApplication
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @EnableDiscoveryClient @ComponentScan({"XX.XX.platform"}) public class ImchatApplication { public static void main(String[] args) { SpringApplication.run(ImchatApplication.class, args); } }
效果:

以上是本次demo的全部代碼,涉及到其它模塊的,各位有需要可以留言,另外很感謝hasor的作者@哈庫納瑪塔塔 大佬給的幫助和支持,同時也給大家推薦一下大佬的開源框架dataway,我們最近在用,很好用,強烈推薦,附上官網:https://www.hasor.net/doc/
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/253599.html
標籤:其他
上一篇:一致性哈希原理應用
