文章目錄
- 初識
- 訊息佇列
- 特點
- AMQP協議
- Rabbit核心概念
- RabbitMQ的安裝 配置 啟動
- 安裝Erlang三種方案
- Linux下安裝RabbitMQ
- 安裝erlang
- 安裝RabbitMQ
- RabbitMQ常用命令
- Mac OS下的安裝和啟動
- Windows安裝
- 代碼
- helloworld
- 單個消費者處理(Thread.sleep)
- 多消費者批處理(回圈平均分配,不公平)
- 多消費者批處理(壓力平均分配,公平)
- 交換機模式
- 廣播fanout
- 直接direct
- Topic模式
- SpringBoot整合MQ
- 效果圖
- 代碼
- poducer
- Consumer
初識
語言Erlang
訊息佇列

特性:業務無關、FIFO、容災、性能
使用理由:系統解耦、異步呼叫、流量消峰、
特點

AMQP協議

Rabbit核心概念



RabbitMQ的安裝 配置 啟動
安裝Erlang三種方案

Linux下安裝RabbitMQ
官方安裝指南:https://www.rabbitmq.com/install-rpm.html
安裝erlang
// 創建erlang源
vim /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1
//yum清理
yum clear all
//下載快取的生成
yum makecache
//下載erlang
yum install erlang
解決yum命令出現Loaded plugins: fastestmirror
vi /etc/yum.conf
plugins=0
解決no clear
yum clean all
要確認源是“rabbitmq_erlang
如果你網路不好,也可以使用教輔下載好的直接安裝
yum install安裝包上傳到linux的完整路徑名
安裝RabbitMQ
我們將要安裝的RabbitMQ的版本是3.8.2
匯入密鑰
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
下載rpm安裝包:
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm
如果速度比較慢,就用:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm
或者本地上傳
scp /Users/didi/Desktop/RabbitMQ教輔/rabbitmq-server-3.8.2-1.el7.noarch.rpm root@114.55.219.216:/root
下載完成后,安裝:
yum install rabbitmq-server-3.8.2-1.el7.noarch.rpm
如果出現解壓錯誤,說明下載了多次,用ls -la看一下有幾個檔案,如果有多個安裝包,要把多余的刪掉,把正確的改名為rabbitmq-server-3.8.2-1.el7.noarch.rpm,再執行yum install來安裝
到這里RabbitMQ就安裝好了
RabbitMQ常用命令
停止RabbitMQ
$rabbitmqctl stop
設定開機啟動
$ systemctl enable rabbitmq-server
啟動RabbitMQ
$ systemctl start rabbitmq-server
看看埠有沒有起來,查看狀態
$ rabbitmqctl status
要檢查RabbitMQ服務器的狀態,請運行:
systemctl status rabbitmq-server
開啟web管理界面
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
//開啟安全策略15672
http://xxx:15672/
Mac OS下的安裝和啟動
官方安裝指南:https://www.rabbitmq.com/install-homebrew.html
在Mac OS X中使用brew工具,可以很容易的安裝RabbitMQ的服務端,只需要按如下命令操作即可:
brew的卸載安裝
https://github.com/homebrew/install#uninstall-homebrew
卸載brew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/uninstall.sh)"
安裝鏡像中科大 序列號1 ,y
/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"
本地軟體庫串列:brew ls
查找軟體:brew search google(其中google替換為要查找的軟體關鍵字)
查看brew版本:brew -v 更新brew版本:brew update
brew更新到最新版本,執行:brew update
安裝Erlang,執行:brew install erlang
安裝RabbitMQ Server,執行:brew install rabbitmq
解決mac升級
macOS升級至macOS Big Sur 使用brew upgrade更新軟體報Error: Your CLT does not support macOS 11.
sudo rm -rf /Library/Developer/CommandLineTools
sudo xcode-select --install
mq環境變數
通過上面的命令安裝后,RabbitMQ Server的命令會被安裝到/usr/local/opt/rabbitmq/sbin,并不會自動加到用戶的環境變數中去:
啟動Terminal
進入當前用戶的home目錄
輸入 cd ~
編輯.bash_profile檔案
輸入open -e .bash_profile,這時./bash_profile就會打開,可以在后面加入要寫入的環境變數(注意:從后往前讀,注意覆寫)
所以我們需要在.bash_profile或.profile檔案中增加下面內容:
export PATH=$PATH:/usr/local/Cellar/rabbitmq/3.8.9_1/sbin
更新剛配置的環境變數
輸入source .bash_profile
這樣,我們就可以通過rabbitmq-server命令來啟動RabbitMQ的服務端了,
rabbitmq-server
0## ## RabbitMQ 3.8.2
## ##
########## Copyright (c) 2007-2019 Pivotal Software, Inc.
###### ##
########## Licensed under the MPL 1.1. Website: https://rabbitmq.com
Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html
Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
Config file(s): (none)
Starting broker... completed with 6 plugins.
打開管理后臺
rabbitmq-plugins enable rabbitmq_management
添加admin賬號,賦予administrator權限
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
然后訪問瀏覽器進入管理頁面
Windows安裝
不推薦,因為要求系統用戶名和計算機名必須是英文,而Win10改名比較麻煩,而且可能會有其他坑,而且和未來的實際作業場景嚴重不符,沒有Windows作為服務器的,


官方安裝指南:https://www.rabbitmq.com/install-windows.html
詳細步驟:https://www.cnblogs.com/saryli/p/9729591.html
安裝Erland,通過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開并完成安裝,
安裝RabbitMQ,通過官方下載頁面https://www.rabbitmq.com/install-windows.html獲取exe安裝包并安裝,下載地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2.exe
下載完成后,直接運行安裝程式,
RabbitMQ Server安裝完成之后,會自動的注冊為服務,并以默認配置啟動起來,

用終端cmd輸入:
cd E:\你的RabbitMQ按照地址\sbin
rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
然后就可以用guest訪問http://127.0.0.1:15672/#/
代碼
helloworld

生產者
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/21/20 4:37 PM
*/
public class Send {
private final static String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶
factory.setHost("39.106.75.223");
factory.setUsername("admin");
factory.setPassword("password");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發布訊息
String message = "Hello World 2";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("發送了訊息" + message);
//關閉連接
channel.close();
connection.close();
}
}
消費者
package helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/21/20 5:17 PM
*/
public class Recv {
private final static String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶
factory.setHost("39.106.75.223");
factory.setUsername("admin");
factory.setPassword("password");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//接收訊息
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了訊息" + message);
}
});
}
}
單個消費者處理(Thread.sleep)

package workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 2:38 PM
* 任務有所耗時,多個任務
*/
public class NewTask {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
factory.setHost("localhost");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//發布訊息
for (int i = 0; i < 10; i++) {
String message;
message = i + "...";
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
package workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 2:59 PM
* 消費者 批量處理
*/
public class Work {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
factory.setHost("localhost");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//訂閱訊息 批處理
channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了訊息:" + message);
try {
doWork(message);
}finally {
System.out.println("訊息處理完成");
}
}
});
}
private static void doWork(String task) {
char[] chars = task.toCharArray();
for (char c : chars) {
if (c == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
多消費者批處理(回圈平均分配,不公平)


多消費者批處理(壓力平均分配,公平)
手動ack

package workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 2:38 PM
* 任務有所耗時,多個任務
*/
public class NewTask {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
factory.setHost("localhost");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//發布訊息
for (int i = 0; i < 10; i++) {
String message;
if (i % 2 == 0) {
message = i + "...";
}else {
message = String.valueOf(i);
}
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
package workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 2:59 PM
* 消費者 批量處理
*/
public class Work {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
factory.setHost("localhost");
//建立連接
Connection connection = factory.newConnection();
//獲得信道
final Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//訂閱訊息 批處理
//希望處理的數量
channel.basicQos(1);
//關閉自動ack
channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了訊息:" + message);
try {
doWork(message);
}finally {
//同時多個確認false
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("訊息處理完成");
}
}
});
}
private static void doWork(String task) {
char[] chars = task.toCharArray();
for (char c : chars) {
if (c == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
交換機模式

廣播fanout
日志 (列印,存盤)
佇列訊息不積壓,必先開啟消費者,
當沒有消費者時,自動洗掉佇列,每次重新,新建新的佇列名
創建交換機,系結交換機與佇列


package fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 4:46 PM
* 發送日志
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "info: Hello World";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("發送了訊息:" + message);
channel.close();
connection.close();
}
}
package fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 4:51 PM
* 接收日志:開啟配置并行處理多個佇列,系結交換機獲取相同內容
*/
public class RecvLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//獲取每次自動創建的佇列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("開始接收訊息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到訊息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
直接direct
日志不同級別(記錄error,螢屏列印全部)
根據關鍵字分發


發送
package direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:16 PM
* 日志發送 三種級別
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message1 = "info: Hello World";
String message2 = "error: Hello World";
String message3 = "warn: Hello World";
channel.basicPublish(EXCHANGE_NAME, "info", null, message1.getBytes("UTF-8"));
System.out.println("發送了訊息:" + message1);
channel.basicPublish(EXCHANGE_NAME, "error", null, message2.getBytes("UTF-8"));
System.out.println("發送了訊息:" + message2);
channel.basicPublish(EXCHANGE_NAME, "warn", null, message3.getBytes("UTF-8"));
System.out.println("發送了訊息:" + message3);
channel.close();
connection.close();
}
}
接收三種
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:19 PM
* 接收日志 三種級別
*/
public class RecvLogDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一個隨機的臨時的queue
String queueName = channel.queueDeclare().getQueue();
//一個交換機同時系結三個queue
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "warn");
System.out.println("開始接收訊息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到訊息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
接收一種
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:19 PM
* 接收日志 一種級別
*/
public class RecvLogDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一個隨機的臨時的queue
String queueName = channel.queueDeclare().getQueue();
//一個交換機同時系結1個queue
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("開始接收訊息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到訊息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
Topic模式



package topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:40 PM
* Topic交換機 發送 多種匹配* #匹配
*/
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "Animal World";
String[] routingKeys = new String[9];
routingKeys[0] = "quick.orange.rabbit";
routingKeys[1] = "lazy.orange.elephant";
routingKeys[2] = "quick.orange.fox";
routingKeys[3] = "lazy.brown.fox";
routingKeys[4] = "lazy.pink.rabbit";
routingKeys[5] = "quick.brown.fox";
routingKeys[6] = "orange";
routingKeys[7] = "quick.orange.male.rabbit";
routingKeys[8] = "lazy.orange.male.rabbit";
for (int i = 0; i < routingKeys.length; i++) {
channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKeys[i] + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:45 PM
* 接收1種匹配
*/
public class RecvLogTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//生成一個隨機的臨時的queue
String queueName = channel.queueDeclare().getQueue();
String routingKey = "*.orange.*";
//一個交換機同時系結1個queue
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("開始接收訊息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到訊息:" + message + "roukingKey:" + envelope.getRoutingKey());
}
};
channel.basicConsume(queueName, true, consumer);
}
}
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author bennyrhys
* @Date 12/22/20 5:45 PM
* 接收1種匹配
*/
public class RecvLogTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//生成一個隨機的臨時的queue
String queueName = channel.queueDeclare().getQueue();
String routingKey = "*.*.rabbit";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
String routingKey2 = "lazy.#";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
System.out.println("開始接收訊息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到訊息:" + message + "roukingKey:" + envelope.getRoutingKey());
}
};
channel.basicConsume(queueName, true, consumer);
}
}
SpringBoot整合MQ
效果圖

代碼
pom
<version>2.2.1.RELEASE</version>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
poducer
server.port=8080
spring.application.name=product
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
package com.bennyrhys.rabbitmqproduct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 描述: rabbitmq配置類
*/
@Configuration
public class TopicRabbitConfig {
/**
* 此處queue回傳值和方法名一樣,方便spring識別
* @return
*/
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("bootExchange");
}
@Bean
Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
}
@Bean
Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
}
}
package com.bennyrhys.rabbitmqproduct;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 描述: 發送訊息
*/
@Component
public class MsgSender {
@Autowired
private AmqpTemplate rabbitmqTemplate;
public void send1() {
String message = "This is message 1, routing key is dog.red";
System.out.println("發送了:"+message);
this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.red", message);
}
public void send2() {
String message = "This is message 2, routing key is dog.black";
System.out.println("發送了:"+message);
this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.black", message);
}
}
Consumer
server.port=8081
spring.application.name=consumer
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
package com.bennyrhys.rabbitmqconsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author bennyrhys
* @Date 12/22/20 7:56 PM
* 消費者1
*/
@Component
@RabbitListener(queues = "queue1")
public class Receiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("Receive1:" + message);
}
}
package com.bennyrhys.rabbitmqconsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author bennyrhys
* @Date 12/22/20 7:56 PM
* 消費者2
*/
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Receive2:" + message);
}
}
CSDN認證博客專家
分布式
Java
架構
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/239096.html
標籤:其他
