主頁 > 軟體設計 > 【RabbitMQ】RabbitMQ應用

【RabbitMQ】RabbitMQ應用

2020-12-23 11:06:57 軟體設計

文章目錄

  • 初識
    • 訊息佇列
    • 特點
    • AMQP協議
    • Rabbit核心概念
  • RabbitMQ的安裝 配置 啟動
    • 安裝Erlang三種方案
    • Linux下安裝RabbitMQ
      • 安裝erlang
      • 安裝RabbitMQ
      • RabbitMQ常用命令
    • Mac OS下的安裝和啟動
    • Windows安裝
  • 代碼
    • helloworld
    • 單個消費者處理(Thread.sleep)
    • 多消費者批處理(回圈平均分配,不公平)
    • 多消費者批處理(壓力平均分配,公平)
  • 交換機模式
    • 廣播fanout
    • 直接direct
    • Topic模式
  • SpringBoot整合MQ
    • 效果圖
    • 代碼
    • poducer
    • Consumer

初識

語言Erlang

訊息佇列

在這里插入圖片描述
特性:業務無關、FIFO、容災、性能

使用理由:系統解耦、異步呼叫、流量消峰、

特點

在這里插入圖片描述

AMQP協議

在這里插入圖片描述

Rabbit核心概念

在這里插入圖片描述在這里插入圖片描述在這里插入圖片描述

RabbitMQ的安裝 配置 啟動

安裝Erlang三種方案

在這里插入圖片描述

Linux下安裝RabbitMQ

官方安裝指南:https://www.rabbitmq.com/install-rpm.html

安裝erlang

// 創建erlang源
vim /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

//yum清理
yum clear all
//下載快取的生成
yum makecache
//下載erlang
yum install erlang

解決yum命令出現Loaded plugins: fastestmirror
vi /etc/yum.conf
plugins=0

解決no clear
yum clean all

要確認源是“rabbitmq_erlang
如果你網路不好,也可以使用教輔下載好的直接安裝

yum install安裝包上傳到linux的完整路徑名

安裝RabbitMQ

我們將要安裝的RabbitMQ的版本是3.8.2

匯入密鑰
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

下載rpm安裝包:
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm

如果速度比較慢,就用:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm

 或者本地上傳
scp  /Users/didi/Desktop/RabbitMQ教輔/rabbitmq-server-3.8.2-1.el7.noarch.rpm root@114.55.219.216:/root

下載完成后,安裝:
yum install rabbitmq-server-3.8.2-1.el7.noarch.rpm

如果出現解壓錯誤,說明下載了多次,用ls -la看一下有幾個檔案,如果有多個安裝包,要把多余的刪掉,把正確的改名為rabbitmq-server-3.8.2-1.el7.noarch.rpm,再執行yum install來安裝

到這里RabbitMQ就安裝好了

RabbitMQ常用命令

停止RabbitMQ
$rabbitmqctl stop

設定開機啟動
$ systemctl enable rabbitmq-server

啟動RabbitMQ
$ systemctl start rabbitmq-server

看看埠有沒有起來,查看狀態
$ rabbitmqctl status

要檢查RabbitMQ服務器的狀態,請運行:

systemctl status rabbitmq-server

開啟web管理界面
rabbitmq-plugins enable rabbitmq_management

rabbitmq-plugins enable rabbitmq_management
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator

//開啟安全策略15672
http://xxx:15672/

Mac OS下的安裝和啟動

官方安裝指南:https://www.rabbitmq.com/install-homebrew.html

在Mac OS X中使用brew工具,可以很容易的安裝RabbitMQ的服務端,只需要按如下命令操作即可:

brew的卸載安裝
https://github.com/homebrew/install#uninstall-homebrew

卸載brew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/uninstall.sh)"
安裝鏡像中科大 序列號1 ,y
/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"
本地軟體庫串列:brew ls
        查找軟體:brew search google(其中google替換為要查找的軟體關鍵字)
        查看brew版本:brew -v  更新brew版本:brew update

brew更新到最新版本,執行:brew update
安裝Erlang,執行:brew install erlang
安裝RabbitMQ Server,執行:brew install rabbitmq

解決mac升級
macOS升級至macOS Big Sur 使用brew upgrade更新軟體報Error: Your CLT does not support macOS 11.

sudo rm -rf /Library/Developer/CommandLineTools
sudo xcode-select --install

mq環境變數
通過上面的命令安裝后,RabbitMQ Server的命令會被安裝到/usr/local/opt/rabbitmq/sbin,并不會自動加到用戶的環境變數中去:
啟動Terminal

進入當前用戶的home目錄

輸入 cd ~
編輯.bash_profile檔案

輸入open -e .bash_profile,這時./bash_profile就會打開,可以在后面加入要寫入的環境變數(注意:從后往前讀,注意覆寫)
所以我們需要在.bash_profile或.profile檔案中增加下面內容:
export PATH=$PATH:/usr/local/Cellar/rabbitmq/3.8.9_1/sbin


更新剛配置的環境變數
輸入source .bash_profile
這樣,我們就可以通過rabbitmq-server命令來啟動RabbitMQ的服務端了,
rabbitmq-server

0##  ##      RabbitMQ 3.8.2
  ##  ##
  ##########  Copyright (c) 2007-2019 Pivotal Software, Inc.
  ######  ##
  ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com

  Doc guides: https://rabbitmq.com/documentation.html
  Support:    https://rabbitmq.com/contact.html
  Tutorials:  https://rabbitmq.com/getstarted.html
  Monitoring: https://rabbitmq.com/monitoring.html

  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
        /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 6 plugins.

打開管理后臺
rabbitmq-plugins enable rabbitmq_management

添加admin賬號,賦予administrator權限

rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator

然后訪問瀏覽器進入管理頁面

Windows安裝

不推薦,因為要求系統用戶名和計算機名必須是英文,而Win10改名比較麻煩,而且可能會有其他坑,而且和未來的實際作業場景嚴重不符,沒有Windows作為服務器的,
在這里插入圖片描述
在這里插入圖片描述

官方安裝指南:https://www.rabbitmq.com/install-windows.html

詳細步驟:https://www.cnblogs.com/saryli/p/9729591.html

安裝Erland,通過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開并完成安裝,

安裝RabbitMQ,通過官方下載頁面https://www.rabbitmq.com/install-windows.html獲取exe安裝包并安裝,下載地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2.exe

下載完成后,直接運行安裝程式,

RabbitMQ Server安裝完成之后,會自動的注冊為服務,并以默認配置啟動起來,

在這里插入圖片描述

用終端cmd輸入:
cd E:\你的RabbitMQ按照地址\sbin

rabbitmq-server

rabbitmq-plugins enable rabbitmq_management

然后就可以用guest訪問http://127.0.0.1:15672/#/

代碼

helloworld

在這里插入圖片描述

生產者

package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/21/20 4:37 PM
 */
public class Send {
    private final static String QUEUE_NAME = "helloworld";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶
        factory.setHost("39.106.75.223");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發布訊息
        String message = "Hello World 2";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("發送了訊息" + message);
        //關閉連接
        channel.close();
        connection.close();
    }
}

消費者

package helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/21/20 5:17 PM
 */
public class Recv {
    private final static String QUEUE_NAME = "helloworld";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶
        factory.setHost("39.106.75.223");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //接收訊息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了訊息" + message);
            }
        });
    }
}

單個消費者處理(Thread.sleep)

在這里插入圖片描述

package workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 2:38 PM
 * 任務有所耗時,多個任務
 */
public class NewTask {
    private final static String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
        factory.setHost("localhost");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        //發布訊息
        for (int i = 0; i < 10; i++) {
            String message;
            message = i + "...";
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 2:59 PM
 * 消費者 批量處理
 */
public class Work {
    private final static String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
        factory.setHost("localhost");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        //訂閱訊息 批處理
        channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了訊息:" + message);
                try {
                    doWork(message);
                }finally {
                    System.out.println("訊息處理完成");
                }
            }
        });
    }

    private static void doWork(String task) {
        char[] chars = task.toCharArray();
        for (char c : chars) {
            if (c == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

多消費者批處理(回圈平均分配,不公平)

在這里插入圖片描述
在這里插入圖片描述

多消費者批處理(壓力平均分配,公平)

手動ack
在這里插入圖片描述

package workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 2:38 PM
 * 任務有所耗時,多個任務
 */
public class NewTask {
    private final static String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
        factory.setHost("localhost");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        //發布訊息
        for (int i = 0; i < 10; i++) {
            String message;
            if (i % 2 == 0) {
                message = i + "...";
            }else {
                message = String.valueOf(i);
            }
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}
package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 2:59 PM
 * 消費者 批量處理
 */
public class Work {
    private final static String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址 注意開放安全組5672 用自己創建的用戶(本地啟動localhost 默認guest,先啟動本地 rabbitmq-server)
        factory.setHost("localhost");
        //建立連接
        Connection connection = factory.newConnection();
        //獲得信道
        final Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        //訂閱訊息 批處理
        //希望處理的數量
        channel.basicQos(1);
        //關閉自動ack
        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了訊息:" + message);
                try {
                    doWork(message);
                }finally {
                    //同時多個確認false
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    System.out.println("訊息處理完成");
                }
            }
        });
    }

    private static void doWork(String task) {
        char[] chars = task.toCharArray();
        for (char c : chars) {
            if (c == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

交換機模式

在這里插入圖片描述

廣播fanout

日志 (列印,存盤)
佇列訊息不積壓,必先開啟消費者,
當沒有消費者時,自動洗掉佇列,每次重新,新建新的佇列名
創建交換機,系結交換機與佇列
在這里插入圖片描述
在這里插入圖片描述

package fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 4:46 PM
 * 發送日志
 */
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String message = "info: Hello World";

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println("發送了訊息:" + message);
        channel.close();
        connection.close();
    }
}
package fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 4:51 PM
 * 接收日志:開啟配置并行處理多個佇列,系結交換機獲取相同內容
 */
public class RecvLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //獲取每次自動創建的佇列名
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("開始接收訊息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

直接direct

日志不同級別(記錄error,螢屏列印全部)
根據關鍵字分發
在這里插入圖片描述
在這里插入圖片描述
發送

package direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:16 PM
 * 日志發送 三種級別
 */
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String message1 = "info: Hello World";
        String message2 = "error: Hello World";
        String message3 = "warn: Hello World";

        channel.basicPublish(EXCHANGE_NAME, "info", null, message1.getBytes("UTF-8"));
        System.out.println("發送了訊息:" + message1);
        channel.basicPublish(EXCHANGE_NAME, "error", null, message2.getBytes("UTF-8"));
        System.out.println("發送了訊息:" + message2);
        channel.basicPublish(EXCHANGE_NAME, "warn", null, message3.getBytes("UTF-8"));
        System.out.println("發送了訊息:" + message3);
        channel.close();
        connection.close();
    }
}

接收三種

package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:19 PM
 * 接收日志 三種級別
 */
public class RecvLogDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //生成一個隨機的臨時的queue
        String queueName = channel.queueDeclare().getQueue();
        //一個交換機同時系結三個queue
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        channel.queueBind(queueName, EXCHANGE_NAME, "warn");

        System.out.println("開始接收訊息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

接收一種

package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:19 PM
 * 接收日志 一種級別
 */
public class RecvLogDirect2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //生成一個隨機的臨時的queue
        String queueName = channel.queueDeclare().getQueue();
        //一個交換機同時系結1個queue
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("開始接收訊息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

Topic模式

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

package topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:40 PM
 * Topic交換機 發送 多種匹配* #匹配
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String message = "Animal World";

        String[] routingKeys = new String[9];
        routingKeys[0] = "quick.orange.rabbit";
        routingKeys[1] = "lazy.orange.elephant";
        routingKeys[2] = "quick.orange.fox";
        routingKeys[3] = "lazy.brown.fox";
        routingKeys[4] = "lazy.pink.rabbit";
        routingKeys[5] = "quick.brown.fox";
        routingKeys[6] = "orange";
        routingKeys[7] = "quick.orange.male.rabbit";
        routingKeys[8] = "lazy.orange.male.rabbit";
        for (int i = 0; i < routingKeys.length; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKeys[i] + "':'" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:45 PM
 * 接收1種匹配
 */
public class RecvLogTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //生成一個隨機的臨時的queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "*.orange.*";
        //一個交換機同時系結1個queue
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

        System.out.println("開始接收訊息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息:" + message + "roukingKey:" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author bennyrhys
 * @Date 12/22/20 5:45 PM
 * 接收1種匹配
 */
public class RecvLogTopic2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //生成一個隨機的臨時的queue
        String queueName = channel.queueDeclare().getQueue();

        String routingKey = "*.*.rabbit";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
        String routingKey2 = "lazy.#";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);

        System.out.println("開始接收訊息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息:" + message + "roukingKey:" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

SpringBoot整合MQ

效果圖

在這里插入圖片描述

代碼

pom

        <version>2.2.1.RELEASE</version>
        
		<!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

poducer

server.port=8080
spring.application.name=product

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
package com.bennyrhys.rabbitmqproduct;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 描述:     rabbitmq配置類
 */
@Configuration
public class TopicRabbitConfig {

    /**
     * 此處queue回傳值和方法名一樣,方便spring識別
     * @return
     */
    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("bootExchange");
    }

    @Bean
    Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
    }

    @Bean
    Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
    }
}
package com.bennyrhys.rabbitmqproduct;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 描述:     發送訊息
 */
@Component
public class MsgSender {

    @Autowired
    private AmqpTemplate rabbitmqTemplate;

    public void send1() {
        String message = "This is message 1, routing key is dog.red";
        System.out.println("發送了:"+message);
        this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.red", message);
    }

    public void send2() {
        String message = "This is message 2, routing key is dog.black";
        System.out.println("發送了:"+message);
        this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.black", message);
    }
}

Consumer

server.port=8081
spring.application.name=consumer

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
package com.bennyrhys.rabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author bennyrhys
 * @Date 12/22/20 7:56 PM
 * 消費者1
 */
@Component
@RabbitListener(queues = "queue1")
public class Receiver1 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Receive1:" + message);
    }
}
package com.bennyrhys.rabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author bennyrhys
 * @Date 12/22/20 7:56 PM
 * 消費者2
 */
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Receive2:" + message);
    }
}
瑞 新 CSDN認證博客專家 分布式 Java 架構
求職中 ? Java全堆疊養成計劃
公眾號 ? 讓我遇見相似的靈魂
回復領取:競賽 書籍 專案 面試

左手代碼,右手吉他,這就是天下:如果有一天我遇見相似的靈魂 那它肯定是步履艱難 不被理解 喜黑怕光的,如果可以的話 讓我觸摸一下吧 它也一樣孤獨得太久, 不一樣的文藝青年,不一樣的程式猿,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/239096.html

標籤:其他

上一篇:海康工業相機拍照存圖控制臺demo

下一篇:Java中高級核心知識全面決議——Redis(簡介、基本資料結構、跳躍表【簡介、實作】)上

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more