MQ,中文是訊息佇列(MessageQueue),字面來看就是存放訊息的佇列,也就是事件驅動架構中的Broker,
快速入門
1.publisher實作
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連接引數,分別是:主機名、埠號、vhost、用戶名、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("rabbit");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創建通道Channel
Channel channel = connection.createChannel();
// 3.創建佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發送訊息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發送訊息成功:【" + message + "】");
// 5.關閉通道和連接
channel.close();
connection.close();
}
}
2.consumer實作
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連接引數,分別是:主機名、埠號、vhost、用戶名、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創建通道Channel
Channel channel = connection.createChannel();
// 3.創建佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱訊息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理訊息
String message = new String(body);
System.out.println("接收到訊息:【" + message + "】");
}
});
System.out.println("等待接收訊息,,,,");
}
}
SpringAMQP-集成SpringBoot
1.Basic Queue 簡單佇列模型
a.匯入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
b.訊息發送
首先配置MQ地址,在publisher服務的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: rabbit # 用戶名
password: 123321 # 密碼
然后在publisher服務中撰寫測驗類SpringAmqpTest,并利用RabbitTemplate實作訊息發送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 佇列名稱
String queueName = "simple.queue";
// 訊息
String message = "hello, spring amqp!";
// 發送訊息
rabbitTemplate.convertAndSend(queueName, message);
}
}
c.訊息接收
首先配置MQ地址,在consumer服務的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: rabbit # 用戶名
password: 123321 # 密碼
然后在consumer服務的cn.itcast.mq.listener包中新建一個類SpringRabbitListener,代碼如下:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者接收到訊息:【" + msg + "】");
}
}
2.WorkQueue-任務佇列
Work queues,也被稱為(Task queues),任務模型,簡單來說就是讓多個消費者系結到一個佇列,共同消費佇列中的訊息,
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大于訊息的消費速度,長此以往,訊息就會堆積越來越多,無法及時處理,
此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了,
但在默認情況下,訊息是平均分配給每個消費者,并沒有考慮到消費者的處理能力,這樣顯然是有問題的,
在spring中有一個簡單的配置,可以解決這個問題,我們修改consumer服務的application.yml檔案,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能獲取一條訊息,處理完成才能獲取下一個訊息
3.Fanout-廣播
在廣播模式下,訊息發送流程是這樣的:
- 1) 可以有多個佇列
- 2) 每個佇列都要系結到Exchange(交換機)
- 3) 生產者發送的訊息,只能發送到交換機,交換機來決定要發給哪個佇列,生產者無法決定
- 4) 交換機把訊息發送給系結過的所有佇列
- 5) 訂閱佇列的消費者都能拿到訊息
a.宣告佇列和交換機
Spring提供了一個介面Exchange,來表示所有不同型別的交換機,
在consumer中創建一個類,宣告佇列和交換機:
@Configuration
public class FanoutConfig {
/**
* 宣告交換機
* @return Fanout型別交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("rabbit.fanout");
}
/**
* 第1個佇列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 系結佇列和交換機
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個佇列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 系結佇列和交換機
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
b.訊息發送
在publisher服務的SpringAmqpTest類中添加測驗方法:
@Test
public void testFanoutExchange() {
// 佇列名稱
String exchangeName = "rabbit.fanout";
// 訊息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
c.訊息接收
在consumer服務的SpringRabbitListener中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費者1接收到Fanout訊息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費者2接收到Fanout訊息:【" + msg + "】");
}
4.Direct-指定
在Fanout模式中,一條訊息,會被所有訂閱的佇列都消費,但是,在某些場景下,我們希望不同的訊息被不同的佇列消費,這時就要用到Direct型別的Exchange,
在Direct模型下:
- 佇列與交換機的系結,不能是任意系結了,而是要指定一個
RoutingKey(路由key) - 訊息的發送方在 向 Exchange發送訊息時,也必須指定訊息的
RoutingKey, - Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的
Routing Key進行判斷,只有佇列的Routingkey與訊息的Routing key完全一致,才會接收到訊息
案例需求如下:
-
利用@RabbitListener宣告Exchange、Queue、RoutingKey
-
在consumer服務中,撰寫兩個消費者方法,分別監聽direct.queue1和direct.queue2
-
在publisher中撰寫測驗方法,向itcast. direct發送訊息
a.基于注解宣告佇列和交換機
基于@Bean的方式宣告佇列和交換機比較麻煩,Spring還提供了基于注解方式來宣告,
在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來宣告佇列和交換機:
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/V-Notes/archive/2023/02/15/@Queue(name ="direct.queue1"),
exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/V-Notes/archive/2023/02/15/@Queue(name ="direct.queue2"),
exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue2的訊息:【" + msg + "】");
}
b.訊息發送
在publisher服務的SpringAmqpTest類中添加測驗方法:
@Test
public void testSendDirectExchange() {
// 交換機名稱
String exchangeName = "rabbit.direct";
// 訊息
String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
5.Topic-主題
Topic型別的Exchange與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列,只不過Topic型別Exchange可以讓佇列在系結Routing key 的時候使用通配符!
案例需求:
實作思路如下:
-
并利用@RabbitListener宣告Exchange、Queue、RoutingKey
-
在consumer服務中,撰寫兩個消費者方法,分別監聽topic.queue1和topic.queue2
-
在publisher中撰寫測驗方法,向itcast. topic發送訊息
a.訊息發送
在publisher服務的SpringAmqpTest類中添加測驗方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交換機名稱
String exchangeName = "rabbit.topic";
// 訊息
String message = "喜報!孫悟空大戰哥斯拉,勝!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
b.訊息接收
在consumer服務的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/V-Notes/archive/2023/02/15/@Queue(name ="topic.queue1"),
exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/V-Notes/archive/2023/02/15/@Queue(name ="topic.queue2"),
exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的訊息:【" + msg + "】");
}
6.訊息轉換器
Spring會把你發送的訊息序列化為位元組發送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件,
只不過,默認情況下Spring采用的序列化方式是JDK序列化,眾所周知,JDK序列化存在下列問題:
- 資料體積過大
- 有安全漏洞
- 可讀性差
因此可以使用JSON方式來做序列化和反序列化,
在publisher和consumer兩個服務中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.5</version>
</dependency>
配置訊息轉換器,
在啟動類中添加一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
#安裝(Docker)
1.拉取鏡像
docker pull rabbitmq:3-management
2.部署容器
docker run \
-e RABBITMQ_DEFAULT_USER=rabbit \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
--restart=always \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/543981.html
標籤:其他
上一篇:Maven
下一篇:資料型別之字串、資料型別之串列、資料型別之字典、資料型別之布林值、資料型別之元組、資料型別之集合、與用戶互動、格式化輸出、基本運算子
