主頁 > 後端開發 > RabbitMQ 3.9( 基礎 )

RabbitMQ 3.9( 基礎 )

2022-05-07 09:49:21 後端開發

1、認識MQ

1.1、什么是MQ?

  • MQ全稱:message queue 即 訊息佇列
  • 這個佇列遵循的原則:FIFO 即 先進先出
  • 佇列里面存的就是message


1.2、為什么要用MQ?

1.2.1、流量削峰

image

  • 這種情況,要是訪問 1020次 / s呢?這種肯定會讓支付系統宕機了,因為太大了嘛,受不了,所以:流量削峰

image

  • 這樣就讓message排著隊了,然后使用FIFO先進先出,這樣支付系統就可以承受得了了

1.2.2、應用解耦

image

  • 上面這種,只要支付系統或庫存系統其中一個掛彩了,那么訂單系統也要掛彩,因此:解耦唄

image

  • 而采用了MQ之后,支付系統和庫存系統有一個出問題,那么它的處理記憶體是在MQ中的,此時訂單系統就不會有影響,可以正常完成,等到故障恢復了,訂單系統再處理對應的事情,這就提高了系統的可用性

1.2.3、異步處理

image

  • 如上圖,訂單系統要呼叫支付系統的API,而訂單系統并不知道支付系統處理對應的業務需要多久,要解決可以采用訂單系統隔一定時間又去訪問支付系統,看處理完沒有,而使用MQ更容易解決,

image



1.3、RabbitMQ的原理?

image

  • 圖中的東西后續會慢慢見到
  • Broker物體:接收和分發訊息的應用 / RabbitMQ Server / Message Broker
  • 而上圖中RabbitMQ的四個核心就是:Producer生產者、exchange交換機、queue佇列、Consumer消費者
    • Producer生產者:就是負責推送訊息的程式
    • Exchange交換機:接收來自生產者的訊息,并且把訊息放到佇列中
    • queue佇列:就是一個資料結構,本質就是一個很大的訊息緩沖區,許多生產者可以把訊息推送到一個佇列,許多消費者可以從一個佇列中獲得資料
    • Consumer消費者:就是接收訊息的程式
    • 注意:生產者、訊息中間件MQ、消費者大多時候并不是在同一臺機器上的,所以:生產者有時可以是消費者;而消費者有時也可以是生產者
  • Connection鏈接:就是讓Producer生產者、Broker物體、Consumer消費者之間建立TCP鏈接
  • Virtual host虛擬機:處于多租戶和安全因素考慮而設計的,當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個 vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
  • Channel信道:就是發訊息的通道,它是在Connection內部建立的邏輯連接
  • Routes路由策略 / binding系結:交換機以什么樣的策略將訊息發布到Queue,也就是exchange交換機 和 queue佇列之間的聯系,即 二者之間的虛擬連接,它里面可以包含routing key 路由鍵


1.4、RabbitMQ的通訊方式

  • 這個玩意兒在官網中有圖,地址:https://www.rabbitmq.com/getstarted.html 學完之后這張圖最好放在自己腦海里,平時開發玩的就是這些,下面的作業模式在后續會慢慢接觸
  • 另外:下面放的是七種,實質上第六種RPC用得很少

image

  • 1、hello word - 簡單模式
  • 2、work queues - 作業模式
  • 3、publish / subscribe - 發布訂閱模式
  • 4、Routing - 路由模式
  • 5、Topics - 主題模式
  • 6、RPC模式 - 不用了解也行
  • 7、publisher confirms - 發布確認模式


2、安裝RabbitMQ

  • 以下的方式自行選擇一種即可


2.1、在Centos 7下安裝

  • 查看自己的Linux版本
	uname -a

image



2.1.1、使用rpm紅帽軟體

準備作業

  • 1、下載Erlang,因為:RabbitMQ是Erlang語言寫的,Erlang下載地址【 ps:這是官網 】:https://www.erlang.org/downloads,選擇自己要的版本即可
    • 另外:RabbitMQ和Erlang的版本對應關系鏈接地址 https://www.rabbitmq.com/which-erlang.html

image


  • 當然:上面這種是下載gz壓縮包,配置挺煩的,可以直接去github中下載rpm檔案,地址:https://github.com/rabbitmq/erlang-rpm/releases , 選擇自己需要的版本即可,注意一個問題:要看是基于什么Linux的版本

image


  • 要是github下載慢的話,都有自己的文明上網加速方式,要是沒有的話,可以進入 https://github.com/fhefh2015/Fast-GitHub 下載好了然后集成到自己瀏覽器的擴展程式中即可,而如果進入github很慢的話,可以選擇去gitee中搜索一個叫做:dev-sidecar的東西安裝,這樣以后進入github就很快了,還有另外的很多方式,不介紹了,

  • 2、執行rpm -ivh erlang檔案 命令
    • i 就是 install的意思
    • vh 就是顯示安裝進度條
    • 注意:需要保證自己的Linux中有rpm命令,沒有的話,執行yum install rpm指令即可安裝rpm

image


  • 3、安裝RabbitMQ需要的依賴環境
	yum install socat -y

image


  • 4、下載RabbitMQ的rpm檔案,github地址:https://github.com/rabbitmq/rabbitmq-server/releases , 選擇自己要的版本即可

  • 5、安裝RabbitMQ

image


  • 6、啟動RabbitMQ服務

    	啟動服務
    	sbin/service rabbitmq-server start
    
    	停止服務
    	/sbin/service rabbitmq-server stop
    
    	查看啟動狀態
    	/sbin/service rabbitmq-server status
    
    	開啟開機自動
    	chkconfig rabbitmq-server on
    
    

    image

    • 查看啟動狀態

    image

    • 這表示正在啟動,需要等一會兒,看到下面的樣子就表示啟動成功

    image


  • 7、安裝web管理插件
	1、停止RabbitMQ服務
	service rabbitmq-server stop   // 使用上面的命令 /sbin/service rabbitmq-server stop也行

	2、安裝插件
	rabbitmq-plugins enable rabbitmq_management

	3、開啟RabbitMQ服務
	service rabbitmq-server start

image

image

  • 要是訪問不了,看看自己的防火墻關沒關啊
# 查看防火墻狀態
systemctl status firewalld

# 關閉防火墻
systemctl stop firewalld

# 一勞永逸 禁用防火墻
systemctl enable firewalld

  • 同時查看自己的服務器有沒有開放15672埠,不同的東西有不同的處理方式,如我的云服務器直接在服務器網址中添加規則即可,其他的方式自行百度


2.1.2、使用Docker安裝

  • 需要保證自己的Linux中有Docker容器,教程鏈接:https://www.cnblogs.com/xiegongzi/p/15621992.html

  • 使用下面的兩種方式都不需要進行web管理插件的安裝和erlang的安裝


  • 1、查看自己的docker容器中是否已有了rabbitmq這個名字的鏡像
	docker images

image

  • 洗掉鏡像
	docker rmi 鏡像ID  // 如上例的 dockerrmi 16c 即可洗掉鏡像

image


  • 2、拉取RabbitMQ鏡像 并 啟動Docker容器
	docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

image


  • 3、查看Docker容器是否啟動
	docker ps

image


  • 4、再次在瀏覽器進行訪問就可以吃雞了,不需要再安裝插件啊,剛剛上一步拉鏡像和啟動時已經安裝好了

image



2.1.3、使用Docker-compose安裝

  • 采用了第二種方式的話,記得把已經啟動的Docker容器關了,以下是附加的一些Docker的基操

# 拉取鏡像
docker pull 鏡像名稱

# 查看全部鏡像
docker images

# 洗掉鏡像
docker rmi 鏡像ID

# 將本地的鏡像匯出
docker save -o 匯出的路徑 鏡像id

# 加載本地的鏡像檔案
docker load -i 鏡像檔案

# 修改鏡像名稱
docker tag 鏡像id 新鏡像名稱:版本

# 簡單運行操作
docker run 鏡像ID | 鏡像名稱

# 跟引數的運行
docker run -d -p 宿主機埠:容器埠 --name 容器名稱 鏡像ID | 鏡像名稱
# 如:docker run -d -p 8081:8080 --name tomcat b8
# -d:代表后臺運行容器 
# -p 宿主機埠:容器埠:為了映射當前Linux的埠和容器的埠 
# --name 容器名稱:指定容器的名稱

# 查看運行的容器
docker ps [-qa]
# -a:查看全部的容器,包括沒有運行
# -q:只查看容器的標識

# 查看日志
docker logs -f 容器id
# -f:可以滾動查看日志的最后幾行

# 進入容器內部
docker exec -it 容器id bash
# 退出容器:exit

# 將宿主機的檔案復制到容器內部的指定目錄
docker cp 檔案名稱 容器id:容器內部路徑
docker cp index.html 982:/usr/local/tomcat/webapps/ROOT


=====================================================================


# 重新啟動容器 
docker restart 容器id

# 啟動停止運行的容器
docker start 容器id

# 停止指定的容器(洗掉容器前,需要先停止容器)
docker stop 容器id

# 停止全部容器
docker stop $(docker ps -qa)

# 洗掉指定容器 
docker rm 容器id

# 洗掉全部容器
docker rm $(docker ps -qa)

image


  • 1、創建一個檔案夾,這些我很早之前就玩過了,所以建好了的
	# 創建檔案夾
	mkdir 檔案夾名

image


  • 2、進入檔案夾,創建docker-compose.yml檔案,注意:檔案名必須是這個
	# 創建檔案
	touch docker-compose.yml

image


  • 3、編輯docker-compose.yml檔案
	# 編輯檔案
	vim docker-compose.yml

  • 里面撰寫的內容如下,撰寫好保存即可,注意:別用tab縮進啊,會出問題的,另外:每句的后面別有空格,嚴格遵循yml格式的
version: "3.1"
services:
  rabbitmq:
# 鏡像
    image: rabbitmq:3.9-management
# 自啟
    restart: always
# Docker容器名
    container_name: rabbitmq
# 埠號,docker容器內部埠 映射 外部埠
    ports:
      - 5672:5672
      - 15672:15672
# 資料卷映射 把容器里面的東西映射到容器外面去 容易操作,否則每次都要進入容器
    volumes:
      - ./data:/opt/install/rabbitMQ-docker/


  • 4、在docker-compose.yml所在路徑執行如下命令,注意:一定要在此檔案路徑中才行,因為默認是在當前檔案夾下找尋docker-compose檔案
# 啟動
docker-compose up -d
# -d 后臺啟動

=========================================================

# 附加內容:docker-compose的一些命令操作

# 1. 基于docker-compose.yml啟動管理的容器
docker-compose up -d

# 2. 關閉并洗掉容器
docker-compose down

# 3. 開啟|關閉|重啟已經存在的由docker-compose維護的容器
docker-compose start|stop|restart

# 4. 查看由docker-compose管理的容器
docker-compose ps 

# 5. 查看日志 
docker-compose logs -f

# 有興趣的也可以去了解docker-file自定義鏡像

image


  • 去瀏覽器訪問一樣的吃雞
  • 上面就是RabbitMQ的基操做完了,不過默認賬號是guest游客狀態,很多事情還做不了呢,所以還得做一些操作


2.1.4、解決不能登入web管理界面的問題

2.1.4.1、使用rpm紅帽軟體安裝的RabbitMQ
  • 這種方式直接使用guest進行登錄是不得吃的

image


  • 這是因為guest是游客身份,不能進入,需要添加新用戶
	查看當前用戶 / 角色有哪些
	rabbitmqctl list_users

	洗掉用戶
	rabbitmqctl delete_user 用戶名

	添加用戶
	rabbitmqctl add_user 用戶名 密碼

	設定用戶角色
	rabbitmqctl set_user_tags 用戶名 administrator

	設定用戶權限【 ps:guest角色就是沒有這一步 】
	rabbitmqctl set_permissions -p "/" 用戶名 ".*" ".*" ".*"
# 設定用戶權限指令解釋
				set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

image


  • 現在使用admin去瀏覽器登錄就可以了

image



2.1.4.2、使用docker 或 docker-compose安裝的RabbitMQ
  • 這兩種方式直接使用guest就可以進行登錄,后續的操作就是一樣的了

image

image

image



3、開始玩RabbitMQ

  • 創建Maven專案 并匯入如下依賴
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>


  • 回到前面的RabbitMQ原理圖

image



3.1、Hello word 簡單模式

  • 對照原理圖來玩,官網中有Hello word的模式圖

image

  • 即:一個生產者Producer、一個默認交換機Exchange、一個佇列queue、一個消費者Consumer

生產者

  • 就是下圖前面部分

image


package cn.zixieqing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static final String HOST = "ip";		// 放RabbitMQ服務的服務器ip
    public static final int PORT = 5672;		// 服務器中RabbitMQ的埠號,在瀏覽器用的15672是通過5672映射出來的15672
    public static final String USER_NAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String QUEUE_NAME = "hello word";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1、獲取鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();

        // 2、設定鏈接資訊
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        /*
            當然:這里還可以設定vhost虛擬機 - 前提是自己在web管理界面中添加了vhost
            factory.setVirtualHost();
         */

        // 3、獲取鏈接Connection
        Connection connection = factory.newConnection();

        // 4、創建channel信道 - 它才是去和交換機 / 佇列打交道的
        Channel channel = connection.createChannel();

        // 5、準備一個佇列queue
        // 這里理論上是去和exchange打交道,但是:這里是hello word簡單模式,所以直接使用默認的exchange即可
        /*
            下面這是引數的完整意思,原始碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            引數1、佇列名字
            引數2、是否持久化( 保存到磁盤 ),默認是在記憶體中的
            引數3、是否共享,即:是否只供一個消費者消費,是否讓多個消費者共享這個佇列中的資訊
            引數4、是否自動洗掉,即:最后一個消費者獲取資訊之后,這個佇列是否自動洗掉
            引數5、其他配置項,這涉及到后面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("正在發送資訊!!!");
        // 6、推送資訊到佇列中
        // 準備發送的資訊內容
        String message = "it is hello word";
        /*
            basicPublish( exchangeName,queueName,properties,message )
            引數1、互動機名字 - 目前使用了默認的
            引數2、指定路由規則 - 目前使用佇列名字
            引數3、指定傳遞的訊息所攜帶的properties
            引數4、推送的具體訊息 - byte型別的
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        // 7、釋放資源 - 倒著關閉即可
        if ( null != channel ) channel.close();

        if ( null != connection ) connection.close();

        System.out.println("訊息發送完畢");

    }
}



  • 運行之后,去瀏覽器管理界面進行查看

image


消費者

image

public class Consumer {

    public static final String HOST = "ip";   // 自己的服務器ip
    public static final int PORT = 5672;
    public static final String USER_NAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String QUEUE_NAME = "hello word";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1、創建鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();

        // 2、設定鏈接資訊
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        // 3、創建鏈接物件
        Connection connection = factory.newConnection();

        // 4、創建信道channel
        Channel channel = connection.createChannel();

        // 5、從指定佇列中獲取訊息
        /*
            basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
            引數1、佇列名
            引數2、是否自動應答,為true時,消費者接收到訊息后,會立即告訴RabbitMQ
            引數3、消費者未成功消費訊息的回呼
            引數4、消費者取消消費的回呼
         */
        System.out.println("開始接收訊息!!!");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了訊息:" + new String(message.getBody(), StandardCharsets.UTF_8) );
        };

        CancelCallback cancelCallback = consumerTag -> System.out.println("消費者取消了消費資訊行為");

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

        // 6、釋放資源 - 但是這里不能直接關閉啊,否則:看不到接收的結果的,可以選擇不關,也可以選擇加一句代碼System.in.read();

        // channel.close();
        // connection.close();

    }
}



3.2、work queue作業佇列模式

  • 流程圖就是官網中的

image


  • 一個生產者批量生產訊息
  • 一個默認交換機
  • 一個佇列
  • 多個消費者
  • 換言之:就是有大量的任務 / 密集型任務有待處理( 生產者生產的訊息 ),此時我們就將這些任務推到佇列中去,然后使用多個作業執行緒( 消費者 )來進行處理,否則:一堆任務直接就跑來了,那消費者不得亂套了,因此:這種就需要讓這種模式具有如下的特點:
    • 1、訊息是有序排好的( 也就是在佇列中 )
    • 2、作業執行緒 / 消費者不能同時接收同一個訊息,換言之:生產者推送的任務必須是輪詢分發的,即:作業執行緒1接收第一個,作業執行緒2接收第二個;作業執行緒1再接收第三個,作業執行緒2接收第四個

抽取RabbitMQ鏈接的工具類

package cn.zixieqing.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQUtil {

    public static final String HOST = "自己的ip";
    public static final int PORT = 5672;
    public static final String USER_NAME = "admin";
    public static final String PASSWORD = "admin";

    public static Channel getChannel(String vHost ) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        if ( !vHost.isEmpty() ) factory.setVirtualHost(vHost);

        return factory.newConnection().createChannel();

    }
}


生產者

  • 和hello word沒什么兩樣

package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class WorkProducer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 1、宣告佇列
        /*
            下面這是引數的完整意思,原始碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            引數1、佇列名字
            引數2、是否持久化( 保存到磁盤 ),默認是在記憶體中的
            引數3、是否共享,即:是否只供一個消費者消費,是否讓多個消費者共享這個佇列中的資訊
            引數4、是否自動洗掉,即:最后一個消費者獲取資訊之后,這個佇列是否自動洗掉
            引數5、其他配置項,這涉及到后面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 2、準備訊息
        System.out.println("請輸入要推送的資訊,按回車確認:");
        Scanner input = new Scanner(System.in);

        // 3、推送資訊到佇列中
        while (input.hasNext()) {
            /*
                basicPublish( exchangeName,routing key,properties,message )
                引數1、互動機名字 - 目前是使用了默認的
                引數2、指定路由規則 - 目前使用佇列名字
                引數3、指定傳遞的訊息所攜帶的properties
                引數4、推送的具體訊息 - byte型別的
            */
            String message = input.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("訊息====>" + message + "====>推送完畢!");
        }
    }
}


消費者

  • 消費者01
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkConsumer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了訊息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println( consumerTag + "消費者中斷了接收訊息====>" );
        };

        System.out.println("消費者01正在接收訊息......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}


  • 消費者02
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkConsumer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了訊息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println( consumerTag + "消費者中斷了接收訊息====>" );
        };

        System.out.println("消費者02正在接收訊息......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

image

image

image



3.3、訊息應答機制

  • 消費者在接收到訊息并且處理該訊息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該訊息洗掉了
  • 目的就是為了保證資料的安全,如果沒有這個機制的話,那么就會造成下面的情況

image

  • 消費者接收佇列中的訊息時,沒接收完,出現例外了,然后此時MQ以為消費者已經把訊息接收并處理了( MQ并沒有接收到訊息有沒有被消費者處理完畢 ),然后MQ就把佇列 / 訊息給刪了,后續消費者例外恢復之后再次接收訊息,就會出現:接收不到了


3.3.1、訊息應答機制的分類

  • 這個東西已經見過了
	/*
            basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
            引數1、佇列名
            引數2、是否自動應答,為true時,消費者接收到訊息后,會立即告訴RabbitMQ
            引數3、消費者未成功消費訊息的回呼
            引數4、消費者取消消費的回呼
    */
	channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);


3.3.1.1、自動應答
  • 指的是:訊息發送后立即被認為已經傳送成功
  • 需要具備的條件:
    • 1、發送的訊息很多,就是高吞吐量的那種
    • 2、發送的訊息在傳輸方面是安全的
  • 優點:處理效率快,很高效

3.3.1.2、手動應答
  • 就是我們自己去設定,好處是可以批量應答并且減少網路擁堵

  • 呼叫的API如下:

    • 	Channel.basicACK( long, boolean );		// 用于肯定確認,即:MQ已知道該訊息 并且 該訊息已經成功被處理了,所以MQ可以將其丟棄了
      
      	Channel.basicNack( long, boolena, boolean );	// 用于否定確認
      
      	Channel.basicReject( long, boolea );		// 用于否定確認
      		與Channel.basicNack( long, boolena, boolean )相比,少了一個引數,這個引數名字叫做:multiple
      
  • multiple引數說明,它為true和false有著截然不同的意義【 ps:建議弄成false,雖然是挨個去處理,從而應答,效率慢,但是:資料安全,否則:很大可能造成資料丟失 】

    • true 代表批量應答MQ,channel 上未應答 / 消費者未被處理完畢的訊息

    image

    • false 只會處理佇列放到channel信道中當前正在處理的訊息告知MQ是否確認應答 / 消費者處理完畢了

    image



3.3.1.3、訊息重新入隊原理
  • 指的是:如果消費者由于某些原因失去連接(其通道已關閉,連接已關倍訓 TCP 連接丟失),導致訊息未發送 ACK 確認,RabbitMQ 將了解到訊息未完全處理,并將對其重新排隊,如果此時其他消費者可以處理,它將很快將其重新分發給另一個消費者,這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何訊息
  • 如下圖:訊息1原本是C1這個消費者來接收的,但是C1失去鏈接了,而C2消費者并沒有斷開鏈接,所以:最后MQ將訊息重新入隊queue,然后讓C2來處理訊息1

image



3.3.1.4、手動應答的代碼演示

生產者

package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class AckProducer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 宣告佇列
        /*
            下面這是引數的完整意思,原始碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            引數1、佇列名字
            引數2、是否持久化( 保存到磁盤 ),默認是在記憶體中的
            引數3、是否共享,即:是否只供一個消費者消費,是否讓多個消費者共享這個佇列中的資訊
            引數4、是否自動洗掉,即:最后一個消費者獲取資訊之后,這個佇列是否自動洗掉
            引數5、其他配置項,這涉及到后面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println("請輸入要推送的訊息:");
        Scanner input = new Scanner(System.in);
        while (input.hasNext()) {
            /*
                basicPublish( exchangeName,routing key,properties,message )
                引數1、互動機名字 - 使用了默認的
                引數2、指定路由規則,使用佇列名字
                引數3、指定傳遞的訊息所攜帶的properties
                引數4、推送的具體訊息 - byte型別的
         */
            String message = input.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("訊息====>" + message + "推送完畢");
        }
    }
}


消費者01

package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;


public class AckConsumer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {

                Thread.sleep(5*1000);

                System.out.println("接收到了訊息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));

                // 添加手動應答
                /*
                    basicAck( long, boolean )
                    引數1、訊息的標識tag,這個標識就相當于是訊息的ID
                    引數2、是否批量應答multiple
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        System.out.println("消費者01正在接收訊息,需要5秒處理完");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            System.out.println("觸發消費者取消消費訊息行為的回呼");
            System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
        });
    }
}


消費者02


package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;


public class AckConsumer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {

                Thread.sleep(10*1000);

                System.out.println("接收到了訊息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));

                // 添加手動應答
                /*
                    basicAck( long, boolean )
                    引數1、訊息的標識tag,這個標識就相當于是訊息的ID
                    引數2、是否批量應答multiple
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        System.out.println("消費者02正在接收訊息,需要10秒處理完");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            System.out.println("觸發消費者取消消費訊息行為的回呼");
            System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
        });
    }
}

image

image

image



3.4、RabbitMQ的持久化 durable

3.4.1、佇列持久化

  • 這個玩意兒的配置吧,早間就過了,在生產者訊息發送時,有一個宣告佇列的程序,那里面就有一個是否持久化的配置
		/*
            下面這是引數的完整意思,原始碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            引數1、佇列名字
            引數2、是否持久化( 保存到磁盤 ),默認是在記憶體中的
            引數3、是否共享,即:是否只供一個消費者消費,是否讓多個消費者共享這個佇列中的資訊
            引數4、是否自動洗掉,即:最后一個消費者獲取資訊之后,這個佇列是否自動洗掉
            引數5、其他配置項,這涉及到后面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  • 而如果沒有持久化,那么RabbitMQ服務由于其他什么原因導致掛彩的時候,那么重啟之后,這個沒有持久化的佇列就灰飛煙滅了【 ps:注意和里面的訊息還沒關系啊,不是說佇列持久化了,那么訊息就持久化了 】
  • 在這個佇列持久化配置中,它的默認值就是false,所以要改成true時,需要注意一個點:選擇佇列持久化,那么必須保證當前這個佇列是新的,即:RabbitMQ中沒有當前佇列,否則:需要進到web管理界面把已有的同名佇列刪了,然后重新配置當前佇列持久化選項為true,不然:報錯

image

  • 那么:當我把持久化選項改為true,并 重新發送訊息時

image

  • inequivalent arg 'durable' for queue 'queue durable' in vhost '/': received 'true' but current is 'false'
  • 告知你:vhost虛擬機中已經有了這個叫做durable的佇列,要接收的選項值是true,但是它當前的值是false,所以報錯了唄
  • 解決方式就是去web管理界面,把已有的durable佇列刪了,重新執行

image

  • 再次執行就可以吃雞了,同時去web管理界面會發現它狀態變了,多了一個D標識

image

  • 有了這個玩意兒之后,那么就算RabbitMQ出問題了,后續恢復之后,那么這個佇列也不會丟失


3.4.2、訊息持久化

  • 注意:這里說的訊息持久化不是說配置之后訊息就一定不會丟失,而是:把訊息標記為持久化,然后RabbitMQ盡量讓其持久化到磁盤

  • 但是:也會有意外,比如:RabbitMQ在將訊息持久化到磁盤時,這是有一個時間間隔的,資料還沒完全刷寫到磁盤呢,RabbitMQ萬一出問題了,那么訊息 / 資料還是會丟失的,所以:訊息持久化配置是一個弱持久化,但是:對于簡單佇列模式完全足夠了,強持久化的實作方式在后續的publisher / confirm發布確認模式中

  • 至于配置機極其的簡單,在前面都已經見過這個配置項,就是生產者發訊息時做文章,就是下面的第三個引數,把它改為MessageProperties.PERSISTENT_TEXT_PLAIN即可

		 /*
            basicPublish( exchangeName,routing key,properties,message )
            引數1、互動機名字 - 使用了默認的
            引數2、指定路由規則,使用佇列名字
            引數3、指定傳遞的訊息所攜帶的properties
            引數4、推送的具體訊息 - byte型別的
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

		// 改成訊息持久化
        channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

  • MessageProperties類的原始碼如下:
public class MessageProperties {

    public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public MessageProperties() {
    }
}

  • 上面用到了BasicProperties型別,它的屬性如下:
    public static class BasicProperties extends AMQBasicProperties {
        // 訊息內容的型別
        private String contentType;
        // 訊息內容的編碼格式
        private String contentEncoding;
        // 訊息的header
        private Map<String, Object> headers;
        // 訊息是否持久化,1:否,2:是
        private Integer deliveryMode;
        // 訊息的優先級
        private Integer priority;
        // 關聯ID
        private String correlationId;
        // :用于指定回復的佇列的名稱
        private String replyTo;
        // 訊息的失效時間
        private String expiration;
        // 訊息ID
        private String messageId;
        // 訊息的發送時間
        private Date timestamp;
        // 型別
        private String type;
        // 用戶ID
        private String userId;
        // 應用程式ID
        private String appId;
        // 集群ID
        private String clusterId;
   }


3.5、不公平分發 和 預取值

不公平分發

  • 這個東西是在消費者那一方進行設定的
  • RabbitMQ默認是公平分發,即:輪詢分發
  • 輪詢分發有缺點:如前面消費者01( 誰5秒的那個 )和 消費者02 ( 誰10秒的那個 ),這種情況如果采用輪詢分發,那么:01要快一點,而02要慢一點,所以01很快處理完了,然后處于空閑狀態,而02還在拼命奮斗中,最后的結果就是02不停干,而01悠悠閑閑的,浪費了時間,所以:應該壓榨一下01,讓它不能停
  • 設定方式:在消費者接收訊息之前進行channel.basicQos( int prefetchCount )設定
		// 不公平分發,就是在這里接收訊息之前做處理
        /* 
            basicQos( int prefetchCount )
            為0、輪詢分發 也是RabbitMQ的默認值
            為1、不公平分發
         */
        channel.basicQos(1);

        channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
            System.out.println("消費者中斷了接收訊息行為觸發的回呼");
        });


預取值

  • 指的是:多個消費者在消費訊息時,讓每一個消費者預計消費多少條訊息

image

  • 而要設定這種效果,和前面不公平分發的設定是一樣的,只是把里面的引數改一下即可

		// 預取值,也是在這里接收訊息之前做處理,和不公平分發調的是同一個API
        /* 
            basicQos( int prefetchCount )	為0、輪詢分發 也是RabbitMQ的默認值;為1、不公平分發
            而當這里的數字變成其他的,如:上圖中上面的那個消費者要消費20條訊息,那么把下面的數字改成對應的即可
            注意點:這是要設定哪個消費的預取值,那就是在哪個消費者代碼中進行設定啊
         */
        channel.basicQos(10);		// 這樣就表示這個代碼所在的消費者需要消費10條訊息了

        channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
            System.out.println("消費者中斷了接收訊息行為觸發的回呼");
        });



3.6、publisher / confirms 發布確認模式

3.6.1、發布確認模式的原理

  • 這個玩意兒的目的就是為了持久化

image

  • 在上面的程序中,想要讓資料持久化,那么需要具備以下的條件
    • 1、佇列持久化
    • 2、訊息持久化
    • 3、發布確認
  • 而所謂的發布確認指的就是:資料在刷寫到磁盤時,成功了,那么MQ就回復生產者一下,資料確認刷寫到磁盤了,否則:只具備前面的二者的話,那也有可能出問題,如:資料推到了佇列中,但是還沒來得及刷寫到磁盤呢,結果RabbitMQ宕機了,那資料也有可能會丟失,所以:現在持久化的程序就是如下的樣子:

image


開啟發布確認

  • 在發送訊息之前( 即:調basicPublish() 之前 )調一個API就可以了
	channel.confirmSelect();		// 沒有引數



3.6.2、發布確認的分類

3.6.2.1、單個確認發布
  • 一句話:一手交錢一手交貨,即 生產者發布一條訊息,RabbitMQ就要回復確認狀態,否則不再發放訊息,因此:這種模式是同步發布確認的方式,缺點:很慢,優點:能夠實時地了解到那條訊息出例外 / 哪些訊息都發布成功了
  • 這種方式,
    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {

        // 單個確認發布
        singleConfirm();        // 單個確認發布發送這些訊息花費4797ms
    }

	public static void singleConfirm() throws IOException, TimeoutException, InterruptedException {

        Channel channel = MQUtil.getChannel("");

        // 開啟確認發布
        channel.confirmSelect();

        // 宣告佇列 并 讓佇列持久化
        channel.queueDeclare("singleConfirm", true, false, false, null);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 發送訊息 并 讓訊息持久化
            channel.basicPublish("","singleConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );

            // 發布一個 確認一個 channel.waitForConfirms()
            if ( channel.waitForConfirms() )
                System.out.println("訊息".concat( String.valueOf(i) ).concat( "發送成功") );

        }

        long end = System.currentTimeMillis();

        System.out.println("單個確認發布發送這些訊息花費".concat( String.valueOf( end-begin ) ).concat("ms") );
    }



3.6.2.2、批量確認發布
  • 一句話:只要結果,是怎么一個批量管不著,只需要把一堆訊息發布之后,回復一個結果即可,這種發布也是同步的
  • 優點:效率相比單個發布要高
  • 缺點:如果因為什么系統故障而導致發布訊息出現問題,那么就會導致是批量發了一些訊息,然后再回復的,中間有哪個訊息出問題了鬼知道
    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        // 單個確認發布
        // singleConfirm();        // 單個確認發布發送這些訊息花費4797ms

        // 批量發布
        batchConfirm();         // 批量發布發送的訊息共耗時:456ms

    }

    public static void batchConfirm() throws IOException, TimeoutException, InterruptedException {

        Channel channel = MQUtil.getChannel("");

        // 開啟確認發布
        channel.confirmSelect();

        // 宣告佇列 并 讓佇列持久化
        channel.queueDeclare("batchConfirm", true, false, false, null);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 發送訊息 并 讓訊息持久化
            channel.basicPublish("","batchConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );

            // 批量發布 并 回復批量發布的結果 - 發了10條之后再確認
            if (i % 10 == 0) {

                channel.waitForConfirms();
                System.out.println("訊息" + ( i-10 ) + "====>" + i + "的訊息發布成功");
            }
        }

        // 為了以防還有另外的訊息未被確認,再次確認一下
        channel.waitForConfirms();

        long end = System.currentTimeMillis();

        System.out.println("批量發布發送的訊息共耗時:" + (end - begin) + "ms");

    }



3.6.2.3、異步確認發布 - 必須會的一種

image

  • 由上圖可知:所謂的異步確認發布的就是:
    • 1、生產者只管發訊息就行,不用管訊息有沒有成功
    • 2、發布的訊息是存在一個map集合中的,其key就是訊息的標識tag / id,value就是訊息內容
    • 3、如果訊息成功發布了,那么物體broker會有一個ackCallback()回呼函式來進行處理【 ps:里面的處理邏輯是需要我們取進行設計的 】
    • 4、如果訊息未成功發布,那么物體broker會呼叫一個nackCallback()回呼函式來進行處理【 ps:里面的處理邏輯是需要我們取進行設計的 】
    • 5、而需要異步處理,就是因為生產者只管發就行了,因此:一輪的訊息肯定是很快就發布過去了,就可以做下一輪的事情了,至于上一輪的結果是怎么樣的,那就需要等到兩個callback回呼執行完了之后給結果,而想要能夠調取到兩個callback回呼,那么:就需要對發送的資訊進行監聽 / 對信道進行監聽
  • 而上述牽扯到一個map集合,那么這個集合需要具備如下的條件:
    • 1、首先此集合應是一個安全且有序的,同時還支持高并發
    • 2、其次能夠將序列號( key ) 和 訊息( value )輕松地進行關聯

代碼實作

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        // 單個確認發布
        // singleConfirm();        // 單個確認發布發送這些訊息花費4797ms

        // 批量發布
        // batchConfirm();         // 批量發布發送的訊息共耗時:456ms

        asyncConfirm();             // 異步發布確認耗時:10ms

    }

    // 異步發布確認
    public static void asyncConfirm() throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.confirmSelect();
        channel.queueDeclare("async confirm", true, false, false, null);

        // 1、準備符合條件的map
        ConcurrentSkipListMap<Long, Object> messagePoolMap = new ConcurrentSkipListMap<>();

        // 3、對信道channel進行監聽
        // 成功確認發布回呼
        ConfirmCallback ackCallback = (messageTag, multiple) -> {
            System.out.println("確認發布了訊息=====>" + messagePoolMap.headMap(messageTag) );

            // 4、把確認發布的訊息刪掉,減少記憶體開銷
            // 判斷是否是批量洗掉
            if ( multiple ){
                // 通過訊息標識tag 把 確認發布的訊息取出
                messagePoolMap.headMap(messageTag).clear();
                /**
                 * 上面這句代碼拆分寫法
                 *    ConcurrentNavigableMap<Long, Object> confirmed = messagePoolMap.headMap(messageTag);
                 *    confirmed.clear();
                 */
            }else {
                messagePoolMap.remove(messageTag);
            }
        };

        // 沒成功發布確認回呼
        ConfirmCallback nackCallback = (messageTag, multiple) -> {
            System.out.println("未確認的訊息是:" + messagePoolMap.get(messageTag) );
        };

        // 進行channel監聽 這是異步的
        /**
         * channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
         * 引數1、訊息成功發布的回呼函式 ackCallback()
         * 引數2、訊息未成功發布的回呼函式 nackCallback()
         */
        channel.addConfirmListener( ackCallback,nackCallback );

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 2、將要發布的全部資訊保存到map中去
            /*
                channel.getNextPublishSeqNo() 獲取下一次將要發送的訊息標識tag
             */
            messagePoolMap.put(channel.getNextPublishSeqNo(),String.valueOf(i) );
            // 生產者只管發布就行
            channel.basicPublish("","async confirm",MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes());

            System.out.println("訊息=====>" + i + "發送完畢");
        }


        long end = System.currentTimeMillis();

        System.out.println("異步發布確認耗時:" + ( end-begin ) + "ms" );
    }



3.7、交換機

  • 正如前面一開始就畫的原理圖,交換機的作用就是為了接收生產者發送的訊息 并 將訊息發送到佇列中去

image

  • 注意點:前面一直玩的哪些模式,雖然沒有寫交換機,但并不是說RabbitMQ就沒用交換機【 ps:使用的是""空串,也就是使用了abbitMQ的默認交換機 】,生產者發送的訊息只能發到交換機中,從而由交換機來把訊息發給佇列


3.7.1、交換機exchange的分類

  • 直接( direct ) / 默認
  • 主題( topic )
  • 標題 ( heanders ) - 這個已經很少用了
  • 扇出( fancut ) / 發布訂閱

臨時佇列

  • 所謂的臨時佇列指的就是:自動幫我們生成的佇列名 并且 當生產者和佇列斷開之后,這個佇列會被自動洗掉
  • 所以這么一說:前面玩過的一種就屬于臨時佇列,即:將下面的第四個引數改成true即可【 ps:當然讓佇列名隨機生成就完全匹配了 】
		/*
            下面這是引數的完整意思,原始碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            引數1、佇列名字
            引數2、是否持久化( 保存到磁盤 ),默認是在記憶體中的
            引數3、是否共享,即:是否只供一個消費者消費,是否讓多個消費者共享這個佇列中的資訊
            引數4、是否自動洗掉,即:最后一個消費者獲取資訊之后,這個佇列是否自動洗掉
            引數5、其他配置項,這涉及到后面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  • 而如果要更簡單的生成臨時佇列,那么呼叫如下的API即可
        String queueName = channel.queueDeclare().getQueue();

  • 這樣幫我們生成的佇列效果就和channel.queueDeclare(QUEUE_NAME, false, false, true, null);是一樣的了


3.7.2、fanout扇出 / 發布訂閱模式

  • 這玩意兒吧,好比微信群,一人發,很多人收到訊息,就是原理圖的另一種樣子,生產者發布的一個訊息,可以供多個消費者進行消費

image

  • 實作方式就是讓一個交換機binding系結多個佇列

生產者

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class FanoutProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        /**
         * 定義交換機
         * 引數1、交換機名字
         * 引數2、交換機型別
         */
        channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT);

        System.out.println("請輸入要發送的內容:");
        Scanner input = new Scanner(System.in);
        while (input.hasNext()){
            String message = input.next();
            channel.basicPublish("fanoutExchange","", null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("訊息=====>" + message + "發送完畢");
        }
    }
}


消費者01

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class FanoutConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 系結佇列
        /**
         * 引數1、佇列名字
         * 引數2、交換機名字
         * 引數3、用于系結的routing key / binding key
         */
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "fanoutExchange", "");

        System.out.println("01消費者正在接收訊息........");
        channel.basicConsume(queueName,true,(consumerTag,message)->{
            // 這里面接收到訊息之后就可以用來做其他事情了,如:存到磁盤
            System.out.println("接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTage->{});
    }
}


消費者02

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class FanoutConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 系結佇列
        /**
         * 引數1、佇列名字
         * 引數2、交換機名字
         * 引數3、用于系結的routing key / binding key
         */
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "fanoutExchange", "");

        System.out.println("02消費者正在接收訊息........");
        channel.basicConsume(queueName,true,(consumerTag,message)->{
            // 這里面接收到訊息之后就可以用來做其他事情了,如:存到磁盤
            System.out.println("接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTage->{});
    }
}

image



3.7.3、direct交換機 / routing路由模式

  • 這個玩意兒吧就是發布訂閱模式,也就是fanout型別交換機的變樣板,即:多了一個routing key的配置而已,也就是說:生產者和消費者傳輸訊息就通過routing key進行關聯起來因此:現在就變成了生產者想把訊息發給誰就發給誰

image


生產者

package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class DirectProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("directExchange", BuiltinExchangeType.DIRECT);

        System.out.println("請輸入要發送的訊息:");
        Scanner input = new Scanner(System.in);

        while (input.hasNext()){
            String message = input.next();
            /**
             * 對第二個引數routing key做文章
             * 假如這里的routing key為zixieqing 那么:就意味著消費者只能是系結了zixieqing的佇列才可以進行接收這里發的訊息內容
             */
            channel.basicPublish("directExchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("訊息=====>" + message + "====>發送完畢");
        }
    }
}


消費者01

package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class DirectConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.queueDeclare("direct", false, false, false, null);
        /**
         * 佇列系結
         * 引數1、佇列名
         * 引數2、交換機名字
         * 引數3、routing key 這里的routing key 就需要和生產者中的一樣了,這樣才可以通過這個routing key去對應的佇列中取訊息
         */
        channel.queueBind("direct", "directExchange", "zixieqing");

        System.out.println("01消費者正在接收訊息.......");
        channel.basicConsume("direct",true,(consumerTag,message)->{
            System.out.println("01消費者接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTag->{});
    }
}


  • 上面這種,生產者的訊息肯定能夠被01消費者給消費,因為:他們的交換機名字、佇列名字和routing key的值都是相同的

image


  • 而此時再加一個消費者,讓它的routing key值和消費者中的不同
package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class DirectConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.queueDeclare("direct", false, false, false, null);
        /**
         * 佇列系結
         * 引數1、佇列名
         * 引數2、交換機名字
         * 引數3、routing key 這里的routing key 就需要和生產者中的一樣了,這樣才可以通過這個routing key去對應的佇列中取訊息
         */
        // 搞點事情:這里的routing key的值zixieqing和生產者的不同
        channel.queueBind("direct", "directExchange", "xiegongzi");

        System.out.println("02消費者正在接收訊息.......");
        channel.basicConsume("direct",true,(consumerTag,message)->{
            System.out.println("02消費者接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTag->{});
    }
}

image



3.7.4、topic交換機 / topic主題模式 - 使用最廣的一個

  • 前面玩的fanout扇出型別的交換機 / 發布訂閱模式是一個生產者發布,多個消費者共享訊息,和qq群類似;而direct直接交換機 / 路由模式是消費者只能消費和消費者相同routing key的訊息
  • 而上述這兩種還有局限性,如:現在生產者的routing key為zi.xie.qing,而一個消費者只消費含xie的訊息,一個消費者只消費含qing的訊息,另一個消費者只消費第一個為zi的零個或無數個單詞的訊息,甚至還有一個消費者只消費最后一個單詞為qing,前面有三個單詞的routing key的訊息呢?
  • 這樣一看,發布訂閱模式和路由模式都不能解決,更別說前面玩的簡單模式、作業佇列模式、發布確認模式了,這些和目前的這個需求更不搭了,因此:就來了這個topic主題模式

topic中routing key的要求

  • 只要交換機型別是topic型別的,那么其routing key就不能亂寫,要求:routing key只能是一個單詞串列,多個單詞之間采用點隔開,如:cn.zixieqing.rabbit
  • 單詞串列的長度不能超過255個位元組

  • 在routing key的規則串列中有兩個替換符可以用
    • 1、* 代表一個單詞
    • 2、# 代表零活無數個單詞

  • 假如有如下的一個系結關系圖

image

  • Q1系結的是:中間帶 orange 帶 3 個單詞的字串(.orange.)
  • Q2系結的是:
    • 最后一個單詞是 rabbit 的 3 個單詞(..rabbit)
    • 第一個單詞是 lazy 的多個單詞(lazy.#)
  • 熟悉一下這種系結關系( 左為一些routes路由規則,右為能匹配到上圖系結關系的結果 )

	quick.orange.rabbit 		被佇列 Q1Q2 接收到
	lazy.orange.elephant 		被佇列 Q1Q2 接收到
	quick.orange.fox 			被佇列 Q1 接收到
	lazy.brown.fox 				被佇列 Q2 接收到
	lazy.pink.rabbit 			雖然滿足兩個系結,但只被佇列 Q2 接收一次
	quick.brown.fox 			不滿足任何系結關系,不會被任何佇列接收到,會被丟棄
	quick.orange.male.rabbit 	是四個單詞,不滿足任何系結關系,會被丟棄
	lazy.orange.male.rabbit 	雖是四個單詞,但匹配 Q2,因:符合lazy.#這個規則

  • 當佇列系結關系是下列這種情況時需要引起注意
    • 當一個佇列系結鍵是#,那么這個佇列將接收所有資料,就有點像 fanout 了
    • 如果佇列系結鍵當中沒有#和*出現,那么該佇列系結型別就是 direct 了

把上面的系結關系和測驗轉換成代碼玩一波

生產者

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;


public class TopicProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);

        /**
         * 準備大量的routing key 和 message
         */
        HashMap<String, String> routesAndMessageMap = new HashMap<>();
        routesAndMessageMap.put("quick.orange.rabbit", "被佇列 Q1Q2 接收到");
        routesAndMessageMap.put("lazy.orange.elephant", "被佇列 Q1Q2 接收到");
        routesAndMessageMap.put("quick.orange.fox", "被佇列 Q1 接收到");
        routesAndMessageMap.put("lazy.brown.fox", "被佇列 Q2 接收到");
        routesAndMessageMap.put("lazy.pink.rabbit", "雖然滿足兩個系結,但只被佇列 Q2 接收一次");
        routesAndMessageMap.put("quick.brown.fox", "不滿足任何系結關系,不會被任何佇列接收到,會被丟棄");
        routesAndMessageMap.put("quick.orange.male.rabbit", "是四個單詞,不滿足任何系結關系,會被丟棄");
        routesAndMessageMap.put("lazy.orange.male.rabbit ", "雖是四個單詞,但匹配 Q2,因:符合lazy.#這個規則");

        System.out.println("生產者正在發送訊息.......");
        for (Map.Entry<String, String> routesAndMessageEntry : routesAndMessageMap.entrySet()) {
            String routingKey = routesAndMessageEntry.getKey();
            String message = routesAndMessageEntry.getValue();
            channel.basicPublish("topicExchange",routingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("訊息====>" + message + "===>發送完畢");
        }
    }
}


消費者01

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TopicConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("Q1", false, false, false, null);
        channel.queueBind("Q1", "topicExchange", "*.orange.*");

        System.out.println("消費者01正在接收訊息......");
        channel.basicConsume("Q1",true,(consumerTage,message)->{
            System.out.println("01消費者接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
            System.out.println("此條訊息的交換機名為:" + message.getEnvelope().getExchange() + ",路由鍵為:" + message.getEnvelope().getRoutingKey());
        },consumerTag->{});
    }
}


消費者02

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TopicConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("Q2", false, false, false, null);
        channel.queueBind("Q2", "topicExchange", "*.*.rabbit");
        channel.queueBind("Q2", "topicExchange", "lazy.#");

        System.out.println("消費者02正在接收訊息......");
        channel.basicConsume("Q2",true,(consumerTage,message)->{
            System.out.println("02消費者接收到了訊息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
            System.out.println("此條訊息的交換機名為:" + message.getEnvelope().getExchange() + ",路由鍵為:" + message.getEnvelope().getRoutingKey());
        },consumerTag->{});
    }
}

image



3.8.2、佇列超過最大長度

3.8.2.1、佇列超過所限制的最大個數
  • 意思就是:某一個佇列要求只能放N個訊息,但是放了N+1個訊息,這就超過佇列的最大個數了

生產者

  • 就是一個正常的生產者發送訊息而已
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("messageNumber_normal_exchange", BuiltinExchangeType.DIRECT);

        for (int i = 1; i < 11; i++) {
            String message = "生產者發送了訊息" + i;
            channel.basicPublish("messageNumber_normal_exchange","zi",null,
                    message.getBytes(StandardCharsets.UTF_8) );
            System.out.println("訊息====>" + message + "====>發送完畢");
        }
    }
}


01消費者

package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;


public class Consumer01 {

    /**
     * 正常交換機名稱
     */
    public static final String NORMAL_EXCHANGE = "messageNumber_normal_exchange";

    /**
     * 正常佇列名稱
     */
    public static final String NORMAL_QUEUE = "messageNumber_queue";

    /**
     * 死信交換機名稱
     */
    public static final String DEAD_EXCHANGE = "messageNumber_dead_exchange";

    /**
     * 死信佇列名稱
     */
    public static final String DEAD_QUEUE = "messageNumber_dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 宣告正常交換機、死信交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 宣告死信佇列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 死信交換機和死信佇列進行系結
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");

        // 宣告正常佇列 并 考慮達到條件時和死信交換機進行聯系
        HashMap<String, Object> params = new HashMap<>();
        // 死信交換機
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 死信路由鍵
        params.put("x-dead-letter-routing-key", "xie");
        // 達到佇列能接受的最大個數限制就多了如下的配置
        params.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        // 正常佇列和正常交換機進行系結
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zi");

        System.out.println("01消費者正在接收訊息......");
        channel.basicConsume(NORMAL_QUEUE,true,(consumeTag,message)->{
            System.out.println("01消費者接收到了訊息:" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumeTag->{});
    }
}

  • 啟動01消費者,然后關掉( 模仿例外 ),最后啟動生產者,那么:生產者發送了10個訊息,由于01消費者這邊做了配置,所以有6個訊息是在正常佇列中,余下的4個訊息就會進入死信佇列

image



3.8.2.2、超過佇列能接受訊息的最大位元組長度
  • 和前面一種相比,在01消費者方做另一個配置即可
	params.put("x-max-length-bytes", 255);

image


注意:關于兩種情況同時使用的問題

  • 如配置的如下兩個
        params.put("x-max-length", 6);
        params.put("x-max-length-bytes", 255);

  • 那么先達到哪個上限設定就執行哪個


3.8.3、訊息被拒收

  • 注意點:必須開啟手動應答
	// 第二個引數改成false
	channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{},consumeTag->{});


生產者

package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("reack_normal_exchange", BuiltinExchangeType.DIRECT);

        for (int i = 1; i < 11; i++) {
            String message = "生產者發送的訊息" + i;
            channel.basicPublish("reack_normal_exchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("訊息===>" + message + "===>發送完畢");
        }
    }
}


消費者

package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;


public class Consumer01 {

    public static final String NORMAL_EXCHANGE = "reack_normal_exchange";
    public static final String DEAD_EXCHANGE = "reack_dead_exchange";

    public static final String DEAD_QUEUE = "reack_dead_queue";
    public static final String NORMAL_QUEUE = "reack_normal_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 宣告正常交換機、死信交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 宣告死信佇列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 死信佇列系結死信交換機
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");

        // 宣告正常佇列
        HashMap<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        params.put("x-dead-letter-routing-key", "xie");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zixieqing");

        System.out.println("01消費者正在接收訊息.....");
        // 1、注意:需要開啟手動應答( 第二個引數為false )
        channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);

            // 如果發送的訊息為:生產者發送的訊息5  則:拒收
            if ( "生產者發送的訊息5".equals( msg ) ) {
                System.out.println("此訊息====>" + msg + "===>是拒收的");
                // 2、做拒收處理 - 注意:第二個引數設為false,表示不再重新入正常佇列的隊,這樣訊息才可以進入死信佇列
                channel.basicReject( message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("01消費者接收到了訊息=====>" + msg);
            }
        },consumeTag->{});
    }
}

image

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/469882.html

標籤:其他

上一篇:go-micro集成RabbitMQ實戰和原理

下一篇:Day14 note1

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more