1.rabbitMQ介紹
rabbitMQ是由erlang語言開發的,基于AMQP協議實作的訊息佇列,他是一種應用程式之間的通信方法,在分布式系統開發中應用非常廣泛,
rabbitMq的有點:
- 使用簡單,功能強大
- 基于AMQP協議
- 社區活躍,檔案完善
- 高并發性能好,erlang語言是專門用于開發高并發程式的
- springBoot默認集成rabbitMq
AMQP(advanced Message Queuing Protocol),是一個提供統一訊息服務的應用標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端中間件的產品不同和開發語言不同的限制,JMS和AMQP的區別在于:JMS是java語言專屬的訊息服務標準,他是在api層定義標準,并且只能用于java應用,而AMQP是在協議層定義的標準,是可以跨語言的,
2.作業流程
發送訊息:
- 生產者和broker建立TCP連接
- 生產者和broker建立通道
- 生產者通過通道訊息發送給broker,由exchange將訊息轉發
- exchange將訊息轉發給指定的queue
接受訊息:
- 消費者和broker建立TCP連接
- 消費者和broker建立通道
- 消費者監聽指定的queue
- 當有訊息到達queue的時候broker默認將訊息推送給消費者
- 消費者接受到訊息并消費
3.安裝

如果不想自己下載,需要我這里的軟體的,可以在下面評論郵箱,我私發給你,
1.安裝erlang的環境,雙擊otp的運行程式,然后一路點擊下一步(next),

配置環境變數

在path中添加erlang的路徑

2.安裝rabbitMq,雙擊rabbitmq的運行程式


安裝完成之后在選單頁面可以看到

安裝完RabbitMQ如果想要訪問管理頁面需要在rabbitmq的sbin目錄中使用cmd執行:rabbitmq-plugins.bat enable rabbitmq_management(管理員身份運行此命令)添加可視化插件,
點擊上圖中的start/stop來開啟/停止服務,然后在瀏覽器上輸入地址查看,rabbitMq的默認埠是15672,默認的用戶名和密碼都是guest

如果安裝失敗,需要卸載重裝的時候或者出現rabbitMq服務注冊失敗時,此時需要進入注冊表清理erlang(搜索rabbitMQ,erlsrv將對應的項洗掉)
4.代碼實作
1.添加依賴
<!--添加rabbitMq的依賴-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
2.生產者代碼實作
package rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeoutException;
/**
* @className: producer
* @description: rabbitmq的生產者代碼實作
* @author: charon
* @create: 2021-01-03 23:10
*/
public class Producer {
/**
* 宣告佇列名
*/
private static final String QUEUE = "hello charon";
public static void main(String[] args) {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
// 創建通道
channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "hello charon good evening";
// 發布訊息(交換機,RoutingKey即佇列名,額外的訊息屬性,訊息內容)
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("發送訊息給mq:" + message);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 關閉資源
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.消費者代碼實作
package rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: Consumer
* @description: 消費者的代碼實作
* @author: charon
* @create: 2021-01-05 08:28
*/
public class Consumer {
/**
* 宣告佇列名
*/
private static final String QUEUE = "hello charon";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE, true, false, false, null);
// 實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消費者標簽
* @param envelope 信封,可以獲取交換機等資訊
* @param properties 訊息屬性
* @param body 消費內容,位元組陣列,可以轉成字串
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// String exchange = envelope.getExchange();
// long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println("收到的訊息是:"+message);
}
};
// 消費訊息(佇列名,是否自動確認,消費方法)
channel.basicConsume(QUEUE,true,defaultConsumer);
}
}
5.rabbitMq的作業模式
- Work queues 作業佇列(資源競爭)

? 生產者將訊息放入到佇列中,消費者可以有多個,同時監聽同一個佇列,如上圖,消費者c1,c2共同爭搶當前訊息佇列的內容,誰先拿到誰負責消費訊息,缺點是在高并發的情況下,默認會產品一個訊息被多個消費者共同使用,可以設定一個鎖開關,保證一條訊息只能被一個消費者使用,
上面的代碼,可以再添加一個消費者,這樣就可以實作作業佇列的作業模式,
2.Publish/Subscribe 發布訂閱(共享資源)

X代表rabbitMq內部組件交換機,生產者將訊息放入交換機,交換機發布訂閱把訊息發送到所有訊息佇列中,對應的消費者拿到訊息進行消費,對比作業佇列而言,發布訂閱可以實作作業佇列的功能,但是比作業佇列更強大,
特點:
1.每個消費者監聽自己的佇列
2.生產者將訊息發送給Broker,由交換機將訊息轉發到系結的此交換機的每個佇列,每個系結交換機的佇列都將接收到訊息;
生產者:
package rabbitmq.publish;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: Producer
* @description: 發布訂閱的生產者
* @author: charon
* @create: 2021-01-07 22:02
*/
public class Producer {
/**郵件的佇列*/
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
/**短信的佇列*/
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
/**交換機*/
public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
// 創建通道
channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
// 交換機(交換機名稱,交換機型別(fanout:發布訂閱,direct:routing,topic:主題,headers:header模式))
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
// 系結交換機(佇列名稱,交換機名稱,routingKey(發布訂閱設定為空))
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
// 發送多條訊息
for (int i = 0; i < 5; i++) {
String message = "hello charon good evening by publish";
// 指定交換機(交換機,RoutingKey即佇列名,額外的訊息屬性,訊息內容)
channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
System.out.println("發送訊息給mq:" + message);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
// 關閉資源
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消費email的消費者:
package rabbitmq.publish;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: EmailConsumer
* @description: 郵件的訊息消費者
* @author: charon
* @create: 2021-01-07 22:14
*/
public class EmailConsumer {
/**郵件的佇列*/
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
/**短信的佇列*/
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
/**交換機*/
public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
// 實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消費者標簽
* @param envelope 信封,可以獲取交換機等資訊
* @param properties 訊息屬性
* @param body 消費內容,位元組陣列,可以轉成字串
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// String exchange = envelope.getExchange();
// long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println("收到的email訊息是:"+message);
}
};
// 消費訊息(佇列名,是否自動確認,消費方法)
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
消費短信的消費者:
package rabbitmq.publish;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: SmsConsumer
* @description:
* @author: charon
* @create: 2021-01-07 22:17
*/
public class SmsConsumer {
/**郵件的佇列*/
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
/**短信的佇列*/
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
/**交換機*/
public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
// 實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消費者標簽
* @param envelope 信封,可以獲取交換機等資訊
* @param properties 訊息屬性
* @param body 消費內容,位元組陣列,可以轉成字串
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("收到的短信訊息是:"+message);
}
};
// 消費訊息(佇列名,是否自動確認,消費方法)
channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
}
}
3.Routing 路由模式

生產者將訊息發送給交換機按照路由判斷,交換機根據路由的key,只能匹配上路由key的對應的訊息佇列,對應的消費者才能消費訊息,
如上圖,rabbitMq根據對應的key,將訊息發送到對應的佇列中,error通知將發送到amqp.gen-S9b上,由消費者c1消費,error,info,warning通知將發送到amqp.gen-Ag1上,由消費者c2消費,
特點:
1.每個消費者監聽自己的佇列,并且設定路由key
2.生產者將訊息發送給交換機,由交換機根據路由key來轉發訊息到指定的佇列
生產者:
package rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: Producer
* @description: 路由模式下的生成者
* @author: charon
* @create: 2021-01-07 22:34
*/
public class Producer {
/**郵件的佇列*/
public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
/**短信的佇列*/
public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
/**交換機*/
public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
/** 設定email的路由key */
public static final String ROUTING_EMAIL = "routing_email";
/** 設定sms的路由key */
public static final String ROUTING_SMS = "routing_sms";
public static void main(String[] args) {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
// 創建通道
channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
// 交換機(交換機名稱,交換機型別(fanout:發布訂閱,direct:routing,topic:主題,headers:header模式))
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
// 系結交換機(佇列名稱,交換機名稱,routingKey)
channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
// 發送多條訊息
for (int i = 0; i < 5; i++) {
String message = "hello charon good evening by routing --email";
// 指定交換機(交換機,RoutingKey,額外的訊息屬性,訊息內容)
channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes());
System.out.println("發送訊息給mq:" + message);
}
// 發送多條訊息
for (int i = 0; i < 5; i++) {
String message = "hello charon good evening by routing --sms";
// 指定交換機(交換機,RoutingKey,額外的訊息屬性,訊息內容)
channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes());
System.out.println("發送訊息給mq:" + message);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
// 關閉資源
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消費email的消費者:
package rabbitmq.routing;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: EmailConsumer
* @description: 路由模式下的email消費者
* @author: charon
* @create: 2021-01-07 22:40
*/
public class EmailConsumer {
/**郵件的佇列*/
public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
/**交換機*/
public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
/** 設定email的路由key */
public static final String ROUTING_EMAIL = "routing_email";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
// 系結佇列并指明路由key
channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
// 實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消費者標簽
* @param envelope 信封,可以獲取交換機等資訊
* @param properties 訊息屬性
* @param body 消費內容,位元組陣列,可以轉成字串
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("收到的email訊息是:"+message);
}
};
// 消費訊息(佇列名,是否自動確認,消費方法)
channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
}
}
消費短信的消費者:
package rabbitmq.routing;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @className: EmailConsumer
* @description: 路由模式下的email消費者
* @author: charon
* @create: 2021-01-07 22:40
*/
public class SmsConsumer {
/**郵件的佇列*/
public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
/**交換機*/
public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
/** 設定email的路由key */
public static final String ROUTING_SMS = "routing_sms";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定ip,埠,因為是本機,所以直接設定為127.0.0.1
connectionFactory.setHost("127.0.0.1");
// web埠默認為15672,通信埠為5672
connectionFactory.setPort(5672);
// 設定用戶名和密碼
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定虛擬ip,默認為/,一個rabbitmq的服務可以設定多個虛擬機,每個虛擬機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
// 宣告佇列(佇列名稱,是否持久化,是否排它,是否自動洗掉,佇列的擴展引數比如設定存活時間等)
channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
// 系結佇列并指明路由key
channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
// 實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消費者標簽
* @param envelope 信封,可以獲取交換機等資訊
* @param properties 訊息屬性
* @param body 消費內容,位元組陣列,可以轉成字串
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("收到的短信訊息是:"+message);
}
};
// 消費訊息(佇列名,是否自動確認,消費方法)
channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
}
}
4.Topic 主題模式

- 星號井號代表通配符
- 星號代表一個單詞,井號代表一個或多個單詞
- 路由功能添加模糊匹配
- 訊息產生者產生訊息,把訊息交給交換機
- 交換機根據key的規則模糊匹配到對應的佇列,由佇列的監聽消費者接收訊息消費
特點:
1.每個消費者監聽自己的佇列,并且設定帶通配符的routingkey
2.生產者將訊息發送給broker,由交換機及根據路由key來轉發訊息到指定的佇列
5.Header 轉發器
取消了路由key,使用header中的key/value(鍵值對)來匹配佇列,
6.RPC 遠程呼叫

基于direct型別交換機實作,生產者將訊息遠程發送給rpc佇列,消費者監聽rpc訊息佇列的訊息并訊息,然后將回傳結果放入到回應佇列中,生產者監聽回應佇列中的訊息,拿到消費者的處理結果,實作遠程RPC遠程呼叫,
參考檔案:
https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/250023.html
標籤:Java
下一篇:[翻譯]正則引擎的幾種分類
