一、RabbitMQ簡介
RabbitMQ 是一個在AMQP(高級訊息佇列協議)基礎上完成的,可復用的企業訊息系統,是當前最主流的訊息中間件之一,
在使用RabbitMQ的程序當中,經常會遇到的例外場景有:
- 消費者啟動后,未顯示在佇列的Consumers;
- 消費者啟動后,消費入庫時報錯;
- 消費者啟動后,輸入正確的json,重復入庫;
- 消費者啟動后,消費但未入庫;
- 消費者啟動后,消費者剛開始顯示,但后來消失(消費者假死);
- 消費者啟動后,輸入錯誤的json,消費失敗;
- 消費者啟動后,消費者堵塞(佇列阻塞,無法繼續添加資料,可能導致服務掛掉),
二、RabbitMQ常見測驗場景
1. RabbitMQ弄丟了資料
測驗場景:簡而言之,就是RabbitMQ自己弄丟了資料,
解決辦法:這個需要開啟RabbitMQ的持久化,就是訊息寫入之后會持久化到磁盤中,哪怕是RabbitMQ自己掛了,恢復之后會自動讀取之前存盤的資料,一般資料不會丟,
持久化的步驟有兩個,第一個是創建queue的時候將其設定為持久化的,這樣就可以保證RabbitMQ持久化queue的元資料,但是不會持久化queue里的資料,
第二個是發送訊息的時候將訊息的deliveryMode設定為2,就是將訊息設定為持久化的,此時RabbitMQ就會將訊息持久化到磁盤上去,必須要同時設定這兩個持久化才行,RabbitMQ哪怕是掛了,再次重啟,也會從磁盤上重啟恢復queue,恢復這個queue里的資料,
2. 生產者弄丟了資料
測驗場景:生產者給RabbitMQ發送資料的時候,可能由于網路原因,資料沒遇發送到RabbitMQ,
解決辦法:這個時候我們可以選擇RabbitMQ的事務功能,在生產者發送資料之前開啟事務(channel.txSelect),然后發送訊息,如果訊息沒有順利的被RabbitMQ接收,這個時候生產者就會收到例外的報錯資訊,可以回滾事務(channel.txRollback),重試訊息發送,
這個事務有個缺陷,就是比較損耗RabbitMQ服務器的性能,
第二種方法:開啟RabbitMQ的confirm模式,每次生產者給RabbitMQ發送訊息的時候,都會分配一個唯一的id,如果寫入RabbitMQ中,RabbitMQ會回傳一個ack訊息,告訴你訊息發送成功;如果寫入RabbitMQ失敗,會回呼一個nack介面,告訴你訊息發送失敗,可以進行重試,
事務機制和cnofirm機制不同之處在于,事務是同步的,你提交一個事務之后會阻塞在那兒,等待事務處理才能進行下一個事務,但是confirm機制是異步的,發送這個訊息之后就可以發送下一個訊息,然后訊息被RabbitMQ接收了之后,會異步回呼通知你這個訊息接收到了,
所以生產者為了避免資料丟失,都是用confirm機制的,
3.消費端弄丟了資料
測驗場景:RabbitMQ如果丟失了資料,主要是因為消費資料的時候,還沒處理,結果服務掛了,比如服務進行了重啟,但是RabbitMQ認為你都消費了,這種情況資料就丟了,
解決辦法:這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來呼叫就行,然后每次你自己代碼里確保處理完的時候,再程式里ack一把,
這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,訊息是不會丟的,
4.資料積壓
大量訊息在RabbitMQ里積壓,沒有被及時消費,
測驗場景:上萬條資料在RabbitMQ里積壓四個小時,從下午的5點持續到晚上的9點,這個是我真實遇到過的一個場景,主要是因為線上系統故障,
解決辦法:擴容服務器集群,新建一個topic,partition是之前的十倍,將現有的consumer都停止,修復問題之后,新建一個新的consumer服務,部署上去消費積壓的資料,消費不做耗時處理,之間輪詢寫入新的queue,
5.RabbitMQ與JAVA
生產者連接rabbitMQ的代碼:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672; // RabbitMQ服務端默認埠號為5672
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("zifeiy");
factory.setPassword("passwd");
Connection connection = factory.newConnection(); // 建立連接
Channel channel = connection.createChannel(); // 創建信道
// 創建一個type="direct"、持久化的、非自動洗掉的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 創建一個持久化、非排他的、非自動洗掉的佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 將交換器和佇列通過路由系結
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 發送一條持久化的訊息:hello world!
String message = "hello,world!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 關閉資源
channel.close();
connection.close();
}
}
消費者連接rabbitMQ的代碼:
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[] {
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("zifeiy");
factory.setPassword("passwd");
// 這里的連接方式與生產者的demo略有不同,注意區分
Connection connection = factory.newConnection(addresses); // 創建連接
final Channel channel = connection.createChannel(); // 創建信道
channel.basicQos(64); // 設定客戶端最多接受未被ack的訊息的個數
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
// 等待回呼函式執行完畢后,關閉資源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}

注:僅供參考,有疑問或沒疑問都可私信博主哦,共同學習,一起進步,奧利給!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/340685.html
標籤:其他
