1.訊息概述
可通過訊息服務中間件來提升系統異步通信、擴展解耦能力
訊息服務中兩個重要概念:訊息代理(message broker)和目的地(destination)當訊息發送者發送訊息以后,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地,
訊息佇列主要有兩種形式的目的地:
佇列(queue):點對點訊息通信(point-to-point)
主題(topic):發布(publish)/訂閱(subscribe)訊息通信
注:通過ActiveMQ的學習即可知道以上的概念

在未引入訊息中間件的情況下,回應時間并不能降到最低;在引入訊息中間件時,回應時間由150ms降低為55ms;

在秒殺系統中,我們可以引入訊息佇列進行流量削峰,如,5件商品,100人搶購,如果搶購完了,則后面搶購的訊息全部拒絕,
(1).點對點式(佇列)
訊息發送者發送訊息,訊息代理將其放入一個佇列中,訊息接收者從佇列中獲取訊息內容,訊息讀取后被移出佇列
訊息只有唯一的發送者和接受者,但并不是說只能有一個接收者
(2).發布訂閱式(主題)
發送者(發布者)發送訊息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在訊息到達時同時收到訊息.類比微信公眾號
(3).JMS(Java Message Service)
JAVA訊息服務(Java Message Service),基于JVM訊息代理的規范,ActiveMQ、HornetMQ是JMS實作
(4).AMQP(Advanced Message Queuing Protocol)
高級訊息佇列協議(Advanced Message Queuing Protocol),也是一個訊息代理的規范,兼容JMS.RabbitMQ是AMQP的實作;
(5).JMS和AMQP區別

(6).Spring支持
spring-jms提供了對JMS的支持;
spring-rabbit提供了對AMQP的支持;
需要ConnectionFactory的實作來連接訊息代理;
提供JmsTemplate、RabbitTemplate來發送訊息;
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上監聽訊息代理發布的訊息;
@EnableJms、@EnableRabbit開啟支持;
(7).Spring Boot自動配置
JmsAutoConfiguration
RabbitAutoConfiguration
2.RabbitMQ簡介
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實作,
(1).核心概念
Message:訊息,訊息是不具名的,它由訊息頭和訊息體組成,訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他訊息的優先權)、delivery-mode(指出該訊息可能需要持久性存盤)等,
Publisher:訊息的生產者,也是一個向交換器發布訊息的客戶端應用程式,
Exchange:交換器,用來接收生產者發送的訊息并將這些訊息路由給服務器中的佇列,Exchange有4種型別:direct(默認),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別.
Queue:訊息佇列,用來保存訊息直到發送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直在佇列里面,等待消費者連接到這個佇列將其取走,
Binding:系結,用于訊息佇列和交換器之間的關聯,一個系結就是基于路由鍵將交換器和訊息佇列連接起來的路由規則,所以可以將交換器理解成一個由系結構成的路由表,Exchange 和Queue的系結可以是多對多的關系,
Connection:網路連接,比如一個TCP連接,
Channel:信道,多路復用連接中的一條獨立的雙向資料流通道,信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過信道完成,因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接,
Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式,
Virtual Host虛擬主機,表示一批交換器、訊息佇列和相關物件,虛擬主機是共享相同的身份認證和加密環境的獨立服務器域,每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的佇列、交換器、系結和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是/,
Broker:表示訊息佇列服務器物體,

(2).RabbitMQ運行機制
[1].AMQP(Advanced Message Queuing Protocol)訊息路由
AMQP 中訊息的路由程序和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色,生產者把訊息發布到 Exchange 上,訊息最終到達佇列并被消費者接收,而 Binding 決定交換器的訊息應該發送到那個佇列,

[2].Direct Exchange型別
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種型別:direct、fanout、topic、headers;headers 匹配 AMQP 訊息的 header 而不是路由鍵, headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種型別:

[3].Fanout Exchange型別 
[4].Topic Exchange型別

如,usa.news和usa.weather會匹配到usa.#上;而usa.weather和europe.weather就會匹配到#.weather;類似于模糊匹配
3.RabbitMQ整合
(1).搭建docker
[1].安裝docker
|
https://www.cnblogs.com/HOsystem/p/13789551.html |
[2].加速docker
|
https://www.cnblogs.com/HOsystem/p/13789551.html |
(2).搭建rabbitmq
在docker上pull rabbitmq
|
[root@hosystem ~]# docker pull rabbitmq Using default tag: latest latest: Pulling from library/rabbitmq 171857c49d0f: Pull complete 419640447d26: Pull complete 61e52f862619: Pull complete 856781f94405: Pull complete 125d5ee3d600: Pull complete 42de77c4d197: Pull complete 4d65f87814dd: Pull complete f6c0bf06039f: Pull complete 01671add1b7b: Pull complete 088ff84cf8cb: Pull complete Digest: sha256:3da3bcd2167a1fc9bdbbc40ec0ae2b195df5df05e3c10c64569c969cb3d86435 Status: Downloaded newer image for rabbitmq:latest docker.io/library/rabbitmq:latest [root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE redis latest 62f1d3402b78 4 days ago 104MB rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB hello-world latest bf756fb1ae65 10 months ago 13.3kB |
通過docker啟動rabbitmq
|
[root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB [root@hosystem ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq ea2bf0a30abf e687835a6ea784d55717dc402d5d447d62e486e78f6c770ec703dfdec3d64f16 [root@hosystem ~]# |
-d:表示后臺啟動
-p:進行埠映射
--name:重新名,修改成我們想要的名字 
訪問rabbitmq,因為我的ip為192.168.188.198所以只要在瀏覽器上輸入192.168.188.198:15672即可;賬號:guest 密碼:guest 

注:rabbitMQ啟動后用web訪問顯示服務器拒絕訪問,用以下方法解決
|
#添加防火墻規則 [root@hosystem ~]# firewall-cmd --permanent --zone=public --add-port=15672/tcp success [root@hosystem ~]# firewall-cmd --reload success |
|
#https://blog.csdn.net/tl1242616458/article/details/105586984 [root@hosystem ~]# docker exec -it myrabbitmq /bin/bash [root@e687835a6ea7:/]# rabbitmq-plugins enable rabbitmq_management |
(3).rabbitmq web操作
[1].添加exchange
按照上圖依次添加,exchange.direct、exchange.fanout、exchange.topic這三個exchange.效果如下

[2].添加queues

[3].系結關系
點擊需要的exchange,進去后在bingdings里填寫與之系結的queues,
①.direct bindings

②.fanout bindings

③.topic bindings

[4].發送訊息
①exchange.direct
②exchange.fanout

③.exchange.topic

我們發送key為hello.news的訊息,因為我們topic有#.news,所以只要有#.news都可以接收

[5].獲取訊息
①.hos queues

②.hosystem.news queues


(4).IDEA整合RabbitMQ
[1].創建工程
①.引入rabbit
|
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> |
②.application.yml
|
#rabbitmq配置資訊 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
③.測驗RabbitMQ
AmqpAdmin:管理組件
RabbitTemplate:訊息發送處理組件
[2].查看RabbitAutoConfiguration

[3].查看CachingConnectionFactory

[4].查看RabbitProperties
RabbitProperties是封裝RabbitMQ相關配置的類

[5].查看RabbitTemplate
RabbitTemplate是用于RabbitMQ發送和接收訊息

[6].查看AmqpAdmin
AmqpAdmin是RabbitMQ系統管理功能組件

[7].application.properties
配置rabbitmq引數
|
#rabbitmq配置資訊 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
[8].發送訊息
發送訊息到rabbitmq,默認使用java-serialized序列化
|
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 單播(點對點) */ @Test void contextLoads() { //message需要自己定義;定義訊息體內容和訊息頭(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默認當成訊息體,只需要傳入要發送的物件,自動序列化發送給rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //物件被默認序列化(java-serialized-object)后發送 // rabbitTemplate.convertAndSend("exchange.direct","hos.news",map);
} |

[9].接收訊息
|
/** * 接收rabbitmsq訊息 * 將資料轉為json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); Object o1 = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o1.getClass()); System.out.println(o1);
} |

[10].資料轉json
將資料轉為json發送出去MessageConverter messageConverter = new SimpleMessageConverter();(org.springframework.amqp.support.converter.MessageConverter)
①.MyAMQPConfig.java
|
package com.hosystem.amqp.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MyAMQPConfig {
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } } |
②.Book.java
|
package com.hosystem.amqp.bean;
public class Book {
private String bookName; private String author;
public Book() { }
public Book(String bookName, String author) {
this.bookName = bookName; this.author = author; }
public String getBookName() { return bookName; }
public String getAuthor() { return author; }
public void setBookName(String bookName) { this.bookName = bookName; }
public void setAuthor(String author) { this.author = author; } } |
③.自定義物件
使用自定義物件發送給rabbitmq
|
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 單播(點對點) */ @Test void contextLoads() { //message需要自己定義;定義訊息體內容和訊息頭(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默認當成訊息體,只需要傳入要發送的物件,自動序列化發送給rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //發送自定義物件 rabbitTemplate.convertAndSend("exchange.direct","hos.news",new Book("Linux","linux"));
} } |
④.接收json資料
|
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 接收rabbitmsq訊息 * 將資料轉為json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); } |

[11].廣播發送訊息
|
package com.hosystem.amqp;
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 廣播 */ @Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("python書籍","python作者")); } } |
[12].@EnableRabbit + @RabbitListener
@EnableRabbit + @RabbitListener 監聽訊息佇列內容
①.BookService.java
|
package com.hosystem.amqp.service;
import com.hosystem.amqp.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
@Service public class BookService {
@RabbitListener(queues = "hos.news") public void receive(Book book){ System.out.println("收到訊息:"+book); }
@RabbitListener(queues = "hos") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } } |
②.Springboot02AmqpApplication.java
|
@EnableRabbit //開啟基于注解的rabbitmq模式 @SpringBootApplication public class Springboot02AmqpApplication {
public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); }
} |
[13].AmqpAdmin
AmqpAdmin(org.springframework.amqp.core.AmqpAdmin):RabbitMQ系統管理功能組件,
AmqpAdmin:創建和洗掉Queue、exchange、binding
|
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
// @Bean // @ConditionalOnSingleCandidate(ConnectionFactory.class) // @ConditionalOnProperty( // prefix = "spring.rabbitmq", // name = {"dynamic"}, // matchIfMissing = true // ) // @ConditionalOnMissingBean // public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { // return new RabbitAdmin(connectionFactory); // } @Autowired AmqpAdmin amqpAdmin;
//創建exchange queues binding @Test public void createExchange(){ //org.springframework.amqp.core.Exchange //org.springframework.amqp.core.DirectExchange //創建exchange // amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); // System.out.println("創建成功");
//創建queues //org.springframework.amqp.core.AmqpAdmin // amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
//org.springframework.amqp.core.Binding //創建binding amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); } } |
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/226531.html
標籤:其他
上一篇:6、Sping Boot訊息
下一篇:誰幫幫忙寫一下程式呀


