RabbitMQ高級內容介紹
RabbitMQ高級特性
- 訊息可靠性投遞
- Consumer ACK
- 消費端限流
- TTL
- 死信佇列
- 延遲佇列
- 日志與監控
- 訊息可靠性分析與追蹤
- 管理
RabbitMQ應用問題
- 訊息可靠請保障
- 訊息冪等性處理
RabbitMQ集群搭建
- RabbitMQ高可用集群
1、RabbitMQ高級特性
1.1、訊息的可靠投遞
1、定義
在使用RabbitMQ的時候,作為訊息發送方希望杜絕任何訊息丟失或投遞失敗場景,RabbitMQ為我們提供了兩種方式用來控制訊息的投遞可靠性模式,
- confirm 確認模式
- return退回模式
rabbitMQ整個訊息投遞的路徑為:
pruducer —>rabbitMQ broker ---->exchange------>queue ----> consumer
- 訊息從producer到exchange則會回傳一個confirmCallback,
- 訊息從exchange–>queue投遞失敗則會回傳一個returnCallback,
我們將利用這兩個callback控制訊息的可靠性投遞,
2、生產者端代碼實作
1)新建spring工程rabbitmq-producer-spring
2) 配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitmq-producer-spring</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3) 配置rabbitmq相關資訊
rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
4)spring組態檔,定義佇列和交換機
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義rabbitMqTemplate物件操作可以在代碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--訊息可靠性投遞生產端-->
<rabbit:queue id="test_queue_confirm" name="queue_confirm_test"></rabbit:queue>
<rabbit:direct-exchange name="exchange_confirm_test">
<rabbit:bindings>
<rabbit:binding queue="queue_confirm_test" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
5) 撰寫測驗類進行測驗
ProducerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/*
確認模式:
步驟:
1、確認模式開啟:ConnectionFactory中開啟,publisher-confirms="true"
2、在rabbitTamplate中定義ConfirmCallback回呼函式
*/
@Test
public void testConfirm(){
//2、定義回呼
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/***
*
* @param correlationData 相關配置資訊,在convertAncSend中引數配置資訊
* @param ack exchange交換機,是否成功收到了訊息,true成功,false代表失敗
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被執行了...");
if(ack){
//接收成功
System.out.println("接收成功:"+cause);
}else{
//接收失敗
System.out.println("接收失敗訊息:"+cause);
//做一些處理,讓訊息再次發送
}
}
});
//3、發送訊息
rabbitTemplate.convertAndSend("exchange_confirm_test111","confirm","message confirm...");
}
/**
* 回退模式:當訊息發送給exchange后,exchange路由到Queue失敗時,才會執行returnCallback
* 步驟:
* 1、開啟回退模式:也是在ConnectionFactory中開啟,publisher-returns="true"
* 2、設定ReturnCallback
* 3、設定Exchange處理訊息的模式:
* ①如果訊息沒有路由到Queue,則丟棄訊息(默認)
* ②如果訊息沒有路由到Queue,回傳給訊息發送方ReturnCallback
*
*/
@Test
public void testReturn(){
//設定交換機處理失敗訊息的模式
//不設定該項的話,默認將訊息丟棄,也不會觸發回呼
rabbitTemplate.setMandatory(true);
//設定ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 訊息物件
* @param replyCode 錯誤碼
* @param replyText 錯誤資訊
* @param exchange 交換機
* @param routingKey 路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("執行了return");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
//3、發送訊息
rabbitTemplate.convertAndSend("exchange_confirm_test","confirm111","message confirm...");
}
}
6)測驗結果
- 確認模式測驗結果:提示沒有該交換機

- 退回模式測驗結果:
注:當沒有設定交換機處理失敗訊息的模式,為默認模式,會將訊息丟棄,也不會進行呼叫回呼函式,

小結:
- 設定ConnectionFactory的publisher-confirms=“true” 開啟確認模式,
- 使用rabbitTemplate.setConfirmCallback設定回呼函式,當訊息發送到exchange后回呼confirm方法,在方法中判斷ack,如果為true,則發送成功,如果為false,則發送失敗,需要處理,
- 設定ConnectionFactory的publisher-returns=“true” 開啟回退模式,
- 使用rabbitTemplate.setReturnCallback設定退回函式,當訊息從exchange路由到queue失敗后,如果設定rabbitTemplate.setMandatory(true)引數,則會將訊息退回給producer,并執行回呼函式returnedMessage,
- 在RabbitMQ中也提供了事務機制,但是性能較差,此處不作講解,
使用channel下列方法,完成事務控制
- txSelect(),用于將當前channel設定成transaction模式
- txCommit(),用于提交事務
- txRollback(),用于回滾事務
1.2、Consumer Ack
1、定義
ack指Acknowledge,確認,表示消費端收到訊息后的確認方式,
有三種確認方式:
- 自動確認:
acknowledge = "none" - 手動確認:
acknowledge = "manual" - 根據例外情況確認:
acknowledge = "auto",(這種方式使用麻煩,不作講解)
其中自動確認是指,當訊息一旦被Consumer接收到,則自動確認收到,并將相應message從RabbitMQ的訊息快取中移除,
但在實際業務處理中,很可能訊息接收到, 業務處理出現例外,那么該訊息就會丟失,
如果設定了手動確認方式,則需要在業務處理成功后,呼叫channel.basicAck(),手動簽收,如果出現例外,則呼叫channel.basicNack()方法,讓其自動重新發送訊息,
2、消費端代碼實作
1)新建rabbitmq-consumer-spring工程
2)撰寫pom.xml匯入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitmq-consumer-spring</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3)MQ連接配置rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
4)spring組態檔,設定監聽容器
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--掃描com.itheima.listener包下的所有類-->
<context:component-scan base-package="com.itheima.listener"></context:component-scan>
<!--定義監聽器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>
</rabbit:listener-container>
</beans>
5)撰寫監聽類AckListener.java
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/*
* Consumer ACK機制:
* 1、設定手動簽收,acknowledge=“manual”
* 2、讓監聽器類實作ChannelAwareMessageListener介面
* 3、如果訊息成功處理,呼叫channel的basicAck()簽收
* 4、如果訊息處理失敗,則呼叫channel的basicNack()拒絕簽收,broker重新發送給consumer
* */
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
//1.接收轉換訊息
System.out.println(new String(message.getBody()));
//2.處理業務邏輯
System.out.println("處理業務邏輯...");
int i = 3/0;//出現錯誤
//3.手動簽收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//4.拒絕簽收
/*
* 第三個引數:requeue:重回佇列,如果設定為true,則訊息重新回到queue,broker會重新發送該訊息給消費端
*
* */
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);只允許單挑確認
}
}
}
6)撰寫測驗類進行測驗ConsumerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test1(){
while(true){
}
}
}
7)測驗結果
- 目前queue中有一條訊息

- 消費者運行起來之后,以為業務出錯例外,導致訊息沒有被簽收,并且訊息重新回到queue,broker會重新發送該訊息給消費端,如此往復,


Consumer Ack 小結
- 在
rabbit:listener-container標簽中設定acknowledge屬性,設定ack方式none:自動確認,manual:手動確認- 如果在消費端沒有出現例外,則呼叫channel.basicAck(deliveryTag,false);方法確認簽收訊息
- 如果出現例外,則在catch中呼叫basicNack或basicReject,拒絕訊息,讓MQ重新發訊息,
訊息可靠性總結
- 持久化
? - exchange要持久化
? - queue要持久化
? - message要持久化 - 生產方要確認confirm
- 消費方要確認Ack
- Broker高可用
1.3、消費端限流
1、定義
- 為什么限流?
減輕系統處理請求壓力,

2、代碼實作:
1)新建QosListener.java
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/*
* Consumer 限流機制
* 1、確保Ack機制為手動確認
* 2、listener-container配置屬性
* preFetch = 1,表示消費端每次從mq中拉取1條訊息來消費,直到手動確認消費完畢后,才會繼續拉取下一條訊息,
* */
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1、獲取訊息
System.out.println(new String(message.getBody()));
//2、處理業務邏輯
//3、簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
2)配置監聽容器spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.itheima.listener"></context:component-scan>
<!--定義監聽器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
<rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>
</rabbit:listener-container>
</beans>
3)測驗類ConsumerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test1(){
while(true){
}
}
}
4)生產端發送10條訊息測驗
@Test
public void testSend(){
for (int i = 0; i < 10; i++) {
//3、發送訊息
rabbitTemplate.convertAndSend("exchange_confirm_test","confirm","message confirm...");
}
}
5)測驗結果
①當消費端未進行手動簽收訊息,preFetch = 1時,消費端只會接收一條訊息,


②當消費端未設定preFetch 數量,并且沒有進行手動簽收時,消費端一次性將10條訊息都獲取到了

③當prefetch=1,并且設定消費端手動簽收時,訊息端會一條一條的拉取訊息

消費端限流小結
- 在
<rabbit:listener-container>中配置prefetch屬性設定消費端一次拉取多少訊息- 消費端的確認模式一定為手動確認,
acknowledge="manual"
1.4、TTL
1、定義
- TTL全稱 Time To Live(存活時間/過期時間),
- 當訊息到達存活時間后,還沒有被消費,會被自動清除,
- RabbitMQ可以對訊息設定過期時間,也可以對整個佇列(Queue)設定過期時間,

小結:
① 設定佇列過期時間使用引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期
② 設定訊息過期時間使用引數:expiration,單位:ms(毫秒),當該訊息在對列頭部時(消費時),會單獨判斷這一訊息是否過期,
③ 如果兩者都進行了設定,以時間段的為準,
2、代碼實作:
1)新建工程rabbitmq-ttl-producer-spring
2)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitmq-producer-spring</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3)配置spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義rabbitMqTemplate物件操作可以在代碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--訊息可靠性投遞生產端-->
<rabbit:queue id="test_queue_confirm" name="queue_confirm_test"></rabbit:queue>
<rabbit:direct-exchange name="exchange_confirm_test">
<rabbit:bindings>
<rabbit:binding queue="queue_confirm_test" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--TTL-->
<rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
<!--設定queue的引數-->
<rabbit:queue-arguments>
<!--x-message-ttl指佇列的過期時間-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test-exchange-ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
4)進行測驗:
ProducerTest.java
/*
* TTL:過期時間
* 1、佇列統一過期時間
*
* 2、訊息單獨過期時間
*
* 注意:
* ①如果設定了訊息的過期時間,也設定了佇列的過期時間,以短的為準
* ②佇列過期后,會將佇列所有訊息全部移除
* ③訊息過期后,只有訊息在佇列頂端,才會判斷其是否過期(移除掉)
* */
@Test
public void testTTl(){
/*
1、佇列統一過期時間
for (int i = 0; i < 10; i++) {
//3、發送訊息
rabbitTemplate.convertAndSend("test-exchange-ttl","ttl.hehe","message ttl...");
}*/
//訊息的后處理物件
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1、設定message的訊息
message.getMessageProperties().setExpiration("5000");
//2、回傳該訊息
return message ;
}
};
/*2、訊息單獨過期時間*/
rabbitTemplate.convertAndSend("test-exchange-ttl","ttl.hehe","message ttl...",messagePostProcessor);
}
5)結果:
①佇列統一過期:

10秒中后,統一過期,訊息被移除

②訊息單獨過期

5秒后過期,訊息位于佇列頂端(等待消費),被移除

1.5、死信佇列
1、定義
死信佇列,英文縮寫:DLX,Dead Letter Exchange(死信交換機),當訊息成為Dead Message后,可以被重新發送到另一個交換機,這個交換機就是DLX,

2、訊息成為死信的三種情況:
1、佇列訊息長度達到限制;
2、消費者拒接消費訊息,basicNack/basicReject,并且不把訊息放入原目標佇列,requeue = false;
3、原佇列存在訊息過期設定,訊息到達超時時間未被消費;
3、佇列系結死信交換機:
給佇列設定引數:x-dead-lette-exchange和x-dead-letter-routing-key

4、代碼實作
1)spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義rabbitMqTemplate物件操作可以在代碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--
死信佇列:
1、宣告正常的佇列(test_queue_dlx)和交換機(test_exchange_dlx)
2、宣告死信佇列(queue_dle)和死信交換機(exchange_dlx)
3、正常佇列系結死信交換機
設定兩個引數:
x-dead-letter-exchange:死信交換機名稱
x-dead-letter-routing-key:發給死信交換機的routingkey
-->
<!--
1、宣告正常的佇列(test_queue_dlx)和交換機(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3、正常佇列系結死信交換機-->
<rabbit:queue-arguments>
<!--3.1、x-dead-letter-exchange:死信交換機名稱-->
<entry key="x-dead-letter-exchange" value="exchange_dlx"></entry>
<!--3.1、x-dead-letter-routing-key:發給死信交換機的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry>
<!--4.1 設定佇列的過期時間 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
<!--4.2 設定的佇列的長度限制 max-length-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2、宣告死信佇列(queue_dle)和死信交換機(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
2)Producer.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/*
* 發送測驗死信訊息
* 1、過期時間
* 2、長度限制
* 3、訊息拒收
*
* */
@Test
public void testDlx(){
//1、測驗過期時間,死信訊息
//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測驗過期時間,死信訊息");
//2、測驗長度限制后,訊息死信
/* for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測驗過期時間,死信訊息");
}*/
//3、測驗訊息拒收,死信訊息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測驗過期時間,死信訊息");
}
}
3)消費端:DlxListener.java
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
//1.接收轉換訊息
System.out.println(new String(message.getBody()));
//2.處理業務邏輯
System.out.println("處理業務邏輯...");
int i = 3/0;//出現錯誤
//3.手動簽收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//4.拒絕簽收
/*
* 第三個引數:requeue:重回佇列,如果設定為true,則訊息重新回到queue,broker會重新發送該訊息給消費端
*
* */
System.out.println("出現例外,拒絕接收!");
//拒絕接收,不重回佇列requeue=false
channel.basicNack(deliveryTag,true,false);
//channel.basicReject(deliveryTag,true);只允許單挑確認
}
}
}
4) 消費端:spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.itheima.listener"></context:component-scan>
<!--定義監聽器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
<!--<rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>-->
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
</beans>
5)結果
a)訊息過期生成死信佇列


b)長度限制生成死信佇列
因長度限制,后面發送的10條訊息變成死信

過期時間到達后,正常佇列中的10條訊息也變成死信:

c)訊息拒收生成死信

小結:
1、死信交換機和死信佇列和普通的沒有區別
2、當訊息成為死信后,如果該佇列系結了死信交換機,則訊息會被死信交換機重新路由到死信佇列
3、訊息成為死信的三種情況
(1)佇列訊息長度達到限制
(2)消費者拒絕消費訊息,并且不重回佇列
(3)原佇列存在訊息過期設定,訊息到達超時時間未被消費
1.6、延遲佇列
1、定義
延遲佇列,即訊息進入佇列后不會立即被消費,只有到達制定時間后,才會被消費,
需求:
?1、下單后,30分鐘未支付,取消訂單,回滾庫存,
?2、新用戶注冊7天后,發送短信問候,
實作方式:
?1、定時器
?2、延遲佇列

很可惜,在rabbitMQ中并未提供延遲佇列功能,
但是可以使用:TTL+死信佇列組合實作延遲佇列的效果,

2、代碼實作
1)生產端:spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義rabbitMqTemplate物件操作可以在代碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--
延遲佇列:
1、定義正常交換機(order_exchange)和佇列(order_queue)
2、定義死信交換機(order_exchange_dlx)和佇列(order_queue_dlx)
3、系結,設定正常佇列過期時間為30分鐘
-->
<!--1、定義正常交換機(order_exchange)和佇列(order_queue)-->
<rabbit:queue name="order_queue" id="order_queue">
<!--3、系結,設定正常佇列過期時間為30分鐘-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2、定義死信交換機(order_exchange_dlx)和佇列(order_queue_dlx)-->
<rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
2)生產端:ProducerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDelay() throws InterruptedException {
//1、發送訂單訊息,將來是在訂單系統中,下單成功后,發送訊息
rabbitTemplate.convertAndSend("order_exchange","order.msg","訂單資訊:id=1,time=333333");
//2、列印倒計時
for (int i = 0; i < 10; i++) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
}
3)消費端:OrderListener.java
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
//1.接收轉換訊息
System.out.println(new String(message.getBody()));
//2.處理業務邏輯
System.out.println("處理業務邏輯...");
System.out.println("根據訂單id查詢其狀態...");
System.out.println("判斷狀態是否為支付成功?");
System.out.println("取消訂單,回滾庫存!");
//3.手動簽收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//4.拒絕簽收
/*
* 第三個引數:requeue:重回佇列,如果設定為true,則訊息重新回到queue,broker會重新發送該訊息給消費端
*
* */
System.out.println("出現例外,拒絕接收!");
//拒絕接收,不重回佇列requeue=false
channel.basicNack(deliveryTag,true,false);
//channel.basicReject(deliveryTag,true);只允許單挑確認
}
}
}
4)消費端:spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.itheima.listener"></context:component-scan>
<!--定義監聽器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
<!--<rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>-->
<!--定義監聽器,監聽正常的佇列-->
<!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
<!--注意實作延遲佇列功能時,消費端監聽的是死信佇列-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
</beans>
5)消費端:ConsumerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test1(){
while(true){
}
}
}
6)結果


小結
1、延遲佇列,指訊息進入佇列后,可以被延遲一定時間,再進行消費
2、RabbitMQ沒有提供延遲佇列功能,但是可以使用:TTL+死信佇列組合實作延遲佇列的效果,
2、RabbitMQ應用問題
2.1、訊息可靠性保障——訊息補償

2.2、訊息冪等性保障
1、定義
冪等性指一次和多次請求某一資源,對于資源本身應該具有同樣的結果,也就是說,其任意多次執行對資源本身所產生的影響均與第一次執行的影響相同,
在MQ中是指,消費多條相同的訊息,得到與消費該訊息一次相同的結果,
2、訊息冪等性保障——樂觀鎖機制

轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/277100.html
標籤:區塊鏈
