作者:海向
來源:https://www.cnblogs.com/haixiang/p/10826710.html
RabbitMQ 簡述
RabbitMQ是一個訊息代理:它接受并轉發訊息, 您可以將其視為郵局:當您將要把寄發的郵件投遞到郵箱中時,您可以確信Postman 先生最侄訓將郵件發送給收件人, 在這個比喻中,RabbitMQ是一個郵箱,郵局和郵遞員,用來接受,存盤和轉發二進制資料塊的訊息,
佇列就像是在RabbitMQ中扮演郵箱的角色, 雖然訊息經過RabbitMQ和應用程式,但它們只能存盤在佇列中, 佇列只受主機的記憶體和磁盤限制的限制,它本質上是一個大的訊息緩沖區, 許多生產者可以發送到一個佇列的訊息,許多消費者可以嘗試從一個佇列接收資料,
producer即為生產者,用來產生訊息發送給佇列,consumer是消費者,需要去讀佇列內的訊息,producer,consumer和broker(rabbitMQ server)不必駐留在同一個主機上;確實在大多數應用程式中它們是這樣分布的,
簡單佇列
簡單佇列是最簡單的一種模式,由生產者、佇列、消費者組成,生產者將訊息發送給佇列,消費者從佇列中讀取訊息完成消費,
在下圖中,“P”是我們的生產者,“C”是我們的消費者, 中間的框是佇列 - RabbitMQ代表消費者的訊息緩沖區,

java 方式
生產者
package com.anqi.mq.nat;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MyProducer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. 創建一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接工廠來創建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創建 Channel
Channel channel = connection.createChannel();
//實際場景中,訊息多為json格式的物件
String msg = "hello";
//4. 發送三條資料
for (int i = 1; i <= 3 ; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Send message" + i +" : " + msg);
}
//5. 關閉連接
channel.close();
connection.close();
}
}
/**
* Declare a queue
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
消費者
package com.anqi.mq.nat;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. 創建一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接工廠來創建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創建 Channel
Channel channel = connection.createChannel();
//4. 宣告一個佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/*
true:表示自動確認,只要訊息從佇列中獲取,無論消費者獲取到訊息后是否成功消費,都會認為訊息已經成功消費
false:表示手動確認,消費者獲取訊息后,服務器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一
直沒有反饋,那么該訊息將一直處于不可用狀態,并且服務器會認為該消費者已經掛掉,不會再給其發送訊息,
直到該消費者反饋,
*/
//5. 創建消費者并接收訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設定 Channel 消費者系結佇列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Send message1 : hello
Send message2 : hello
Send message3 : hello
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hello'
[x] Received 'hello'
[x] Received 'hello'
當我們啟動生產者之后查看RabbitMQ管理后臺可以看到有一條訊息正在等待被消費,

當我們啟動消費者之后再次查看,可以看到積壓的一條訊息已經被消費,

總結
佇列宣告queueDeclare的引數:第一個引數表示佇列名稱、第二個引數為是否持久化(true表示是,佇列將在服務器重啟時生存)、第三個引數為是否是獨占佇列(創建者可以使用的私有佇列,斷開后自動洗掉)、第四個引數為當所有消費者客戶端連接斷開時是否自動洗掉佇列、第五個引數為佇列的其他引數,
basicConsume的第二個引數autoAck: 應答模式,true:自動應答,即消費者獲取到訊息,該訊息就會從佇列中洗掉掉,false:手動應答,當從佇列中取出訊息后,需要程式員手動呼叫方法應答,如果沒有應答,該訊息還會再放進佇列中,就會出現該訊息一直沒有被消費掉的現象,
這種簡單佇列的模式,系統會為每個佇列隱式地系結一個默認交換機,交換機名稱為" (AMQP default)",型別為直連 direct,當你手動創建一個佇列時,系統會自動將這個佇列系結到一個名稱為空的 Direct 型別的交換機上,系結的路由鍵 routing key 與佇列名稱相同,相當于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實體沒有顯式宣告交換機,但是當路由鍵和佇列名稱一樣時,就會將訊息發送到這個默認的交換機中,這種方式比較簡單,但是無法滿足復雜的業務需求,所以通常在生產環境中很少使用這種方式,
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認交換機隱式系結到每個佇列,其中路由鍵等于佇列名稱,不可能顯式系結到,或從預設交換中解除系結,它也不能被洗掉,
——引自 RabbitMQ 官方檔案
spring-amqp方式
引入 Maven 依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
spring 組態檔
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"
username="guest" password="guest"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="MY-QUEUE"/>
</beans>
使用測驗
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");
AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("MY-QUEUE", "Item");
String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");
System.out.println(msg);
}
}
參考方法
/**
* Convert a Java object to an Amqp {@link Message} and send it to a specific exchange
* with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
/**
* Receive a message if there is one from a specific queue and convert it to a Java
* object. Returns immediately, possibly with a null value.
*
* @param queueName the name of the queue to poll
* @return a message or null if there is none waiting
* @throws AmqpException if there is a problem
*/
@Nullable
Object receiveAndConvert(String queueName) throws AmqpException;
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/288948.html
標籤:Java
