主頁 >  其他 > RabbitMQ基礎快速入門和代碼demo

RabbitMQ基礎快速入門和代碼demo

2021-07-29 06:56:38 其他

一、MQ的基本概念

1、MQ概述

在這里插入圖片描述

2、MQ優勢

1、應用解耦
在這里插入圖片描述
在這里插入圖片描述
2、異步提速
在這里插入圖片描述
在這里插入圖片描述
3、削峰填谷
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
優勢小結
在這里插入圖片描述

3、MQ劣勢

在這里插入圖片描述

4、常見的MQ產品

在這里插入圖片描述

5、RabbitMQ簡介

在這里插入圖片描述
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布,RabbitMQ 采用 Erlang 語言開發,Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛,

RabbitMQ 基礎架構如下圖
在這里插入圖片描述

6、MQ中的相關概念介紹

  • Broker:接收和分發訊息的應用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網路中的 namespace 概念,當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低,Channel 是在 connection 內部建立的邏輯連接,如果應用程式支持多執行緒,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的,Channel 作為輕量級的 Connection
  • Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去,常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:訊息最終被送到這里等待 consumer 取走
  • Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 資訊被保存到 exchange 中的查詢表中,用于 message 的分發依據

7、RabbitMQ中的6種作業模式

RabbitMQ 提供了 6 種作業模式:簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程呼叫模式(遠程呼叫,不太算 MQ;暫不作介紹),
在這里插入圖片描述

二、RabbitMQ 的作業模式

1、作業模式介紹

1、Work Queues作業佇列模式
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
2、發布訂閱模式(Pub/Sub 訂閱模式 )
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
3、Routing 路由模式
在這里插入圖片描述
在這里插入圖片描述
4、Topics 通配符模式

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

2、作業模式總結

在這里插入圖片描述

3、RabbitMQ訊息確認機制

在這里插入圖片描述
在這里插入圖片描述


注意,Confirm只表示訊息被送到Broker中,至于消費者是否會消費掉Broker中的訊息,是無法確定的,就像是信件被送到了郵局,到底能不能送到收件人手中,還不確定,

當然,我們也可以通過ACK機制來完成消費者端的消費確認,代碼如下:

	channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

在這里插入圖片描述

三、代碼demo

依賴:

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.3.0</version>
</dependency>

1、簡單模式

生產者代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.131.171");
        connectionFactory.setPort(5672);  // 默認埠是5672
        connectionFactory.setUsername("jihu");
        connectionFactory.setPassword("jihu");
        connectionFactory.setVirtualHost("/jihu");

        // 建立TCP長連接
        Connection connection = connectionFactory.newConnection();
        // 創建通信通道,相當于TCP中的虛擬連接
        Channel channel = connection.createChannel();
        // 創建佇列,宣告并創建一個佇列,如果佇列已經存在,則使用這個佇列
        // 引數1:佇列名稱ID
        // 引數2:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
        // 引數3:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
        // 引數4:是否自動洗掉 false代表連接停掉后不自動洗掉這個佇列
        // 引數5:其他的額外引數
        channel.queueDeclare("helloworld", true, false, false, null);

        String message = "jihu666";
        // 引數1:交換機,簡單模式不需要指定交換機,會有默認的交換機
        // 引數2:佇列名稱
        // 引數3:額外的設定屬性
        // 引數4:訊息的位元組陣列
        channel.basicPublish("", "helloworld", null, message.getBytes());

        channel.close();
        connection.close();
        System.out.println("==== 資料發送成功 =====");
    }
}

消費者代碼

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.131.171");
        connectionFactory.setPort(5672);  // 默認埠是5672
        connectionFactory.setUsername("jihu");
        connectionFactory.setPassword("jihu");
        connectionFactory.setVirtualHost("/jihu");

        // 建立TCP長連接
        Connection connection = connectionFactory.newConnection();
        // 創建通信通道,相當于TCP中的虛擬連接
        Channel channel = connection.createChannel();
        // 創建佇列,宣告并創建一個佇列,如果佇列已經存在,則使用這個佇列
        // 引數1:佇列名稱ID
        // 引數2:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
        // 引數3:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
        // 引數4:是否自動洗掉 false代表連接停掉后不自動洗掉這個佇列
        // 引數5:其他的額外引數
        channel.queueDeclare("helloworld", true, false, false, null);

        // 從MQ服務器中獲取訊息
        // 引數1:佇列名稱
        // 引數2:是否自動確認收到訊息,false代表手動編程來確認訊息,沒這事MQ的推薦做法
        // 引數2:
        channel.basicConsume("helloworld", false, new Receiver(channel));

        // 消費者的長連接不能關閉,因為需要持續監聽佇列獲取訊息
    }
}

class Receiver extends DefaultConsumer {
    private Channel channel;

    // 重寫建構式
    public Receiver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        long deliveryTag = envelope.getDeliveryTag();
        System.out.println("[消費者] 接收到的訊息:" + msg + ". 訊息的tagId: " + deliveryTag);

        // 簽收訊息, false代表只確認當前訊息,設定為true代表簽收該消費者所有未簽收的訊息
        channel.basicAck(deliveryTag, false);
    }
}

測驗,先啟動消費者,然后我們查看rabbitmq頁面:
在這里插入圖片描述
我們再啟動生成者:
在這里插入圖片描述
在這里插入圖片描述
從運行log來看,已經成功實作了訊息的發送和消費,

我們關閉消費者之后,再啟動生產者發送一條新訊息,然后來看看overview頁面:
在這里插入圖片描述
在這里插入圖片描述

可以看到,此時的ready數量為1,代表已經收到了一條訊息,

這個頁面的重繪我們可以手動重繪,也可以在右下角設定重繪頻率:
在這里插入圖片描述

2、workqueue模式(模擬12306短信服務)

為了復用代碼,我們建立一個工廠類用來獲取連接:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitUntils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();

    static {
        connectionFactory.setHost("192.168.131.171");
        connectionFactory.setPort(5672);  // 默認埠是5672
        connectionFactory.setUsername("jihu");
        connectionFactory.setPassword("jihu");
        connectionFactory.setVirtualHost("/jihu");
    }

    public static Connection getConnection() {
        Connection connection = null;
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
         throw new RuntimeException(e);
        }
    }
}

我們來模擬一個12306的短信發送功能:
在這里插入圖片描述
我們先來新建三個短信發送者來消費訊息:

import com.jihu.rabbitmq.constant.RabbitConstant;
import com.jihu.rabbitmq.utils.RabbitUntils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class SMSSender1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
	
		// 如果不寫channel.basicQos(1),則MQ自動會采用輪訓演算法將請求平均發送給所有消費者
        // basicQos:MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息之后(確認消費后),再從佇列中獲取新的訊息
        // channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String sms = new String(body);
                System.out.println("SMSSender1-短信發送成功:" + sms);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}


public class SMS {
    String name;
    String mobile;
    String content;
    .....
 }   

在這里插入圖片描述
在這里插入圖片描述
然后構建生產者,訂單:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jihu.rabbitmq.constant.RabbitConstant;
import com.jihu.rabbitmq.utils.RabbitUntils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class OrderSystem {
    private static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUntils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        for (int i = 1; i <= 100; i++) {
            SMS sms = new SMS("乘客+" + i, "1390000000" + i, "您的車票已預定成功!");
            byte[] bytes = objectMapper.writeValueAsBytes(sms);
            channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, bytes);
        }

        System.out.println("資料發送成功!");
        channel.close();
        connection.close();
    }
}

在這里插入圖片描述
從log可以看到,訊息已經被三個訊息發送器都消費完了,

默認情況下,RabbitMQ使用的是輪訓演算法,

其實我們可以設定讓消費者處理完一個訊息被確認消費后,再獲取一個新的訊息,

// 如果不寫channel.basicQos(1),則MQ自動會采用輪訓演算法將請求平均發送給所有消費者
// basicQos:MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息之后(確認消費后),再從佇列中獲取新的訊息
channel.basicQos(1);

這樣處理之后,訊息并不是平均的分發給消費者們,而是消費者性能越好處理越快的,消費的訊息獲取的則越多,反之亦然,

我們可以來模擬一下這個場景,給SMSSender1休眠10ms,SMSSender2休眠100ms,SMSSender3休眠1200ms:

channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String sms = new String(body);
                System.out.println("SMSSender1-短信發送成功:" + sms);

                try {
                    TimeUnit.MICROSECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

這樣的話,相當于服務器性能SMSSender1 > SMSSender2 > SMSSender3.

啟動一下測驗,發現SMSSender3消費的訊息最少:
在這里插入圖片描述

3、Publish / Subscribe 發布訂閱模式(模擬天氣預報)

在這里插入圖片描述
注意:使用到的交換機必須提前宣告,否則消費者消費的時候會報錯!這和佇列不一樣,佇列默認會自動添加!
在這里插入圖片描述
注意交換機的型別,此時我們需要的是廣播模式的交換機:
在這里插入圖片描述

我們來創建一個廣播型別的交換機:
在這里插入圖片描述

然后我們先來創建氣象局,可以發送天氣訊息:

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUntils.getConnection();

        String input = new Scanner(System.in).next();
        Channel channel = connection.createChannel();

        // 引數1:交換機名稱
        // 引數2:佇列名稱, 佇列會和交換機系結,我們在消費端處理即可
        // 引數3:額外引數
        // 引數4:傳輸資料
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());

        channel.close();
        connection.close();
    }
}

然后我們創建新浪天氣和百度天氣:

public class Sina {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();

        // 宣告佇列資訊
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
                false, null);

        // 佇列系結交換機
        // 引數1:佇列名
        // 引數1:交換機名
        // 引數1:路由key,在發布訂閱模式中用不到
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
        // 消費完當前訊息后再獲取下一條訊息
        channel.basicQos(1);

        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

百度天氣只是佇列名稱和新浪不一樣,這兩個佇列都是被系結在weather交換機上,用來將這個交換機上的訊息拉取到自己獨立的佇列中去消費,

建好之后我們啟動它們,在rabbitmq界面可以看到新建的佇列及交換機的系結關系:
在這里插入圖片描述
接著我們來發布天氣訊息:
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
可以看到,此時已經實作了廣播模式,

4、Routing 路由模式(模擬接收特定地區和日期的天氣)

在這里插入圖片描述
我們首先來創建一個direct型別的交換機:
在這里插入圖片描述
注意:一個佇列可以系結多個路由key!

然后我們來創建氣象局,此時氣象局值給固定路由的佇列發訊息:

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 天氣資訊, key作為路由
        Map<String, String> area = new LinkedHashMap<>();
        area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
        area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
        area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
        area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");

        area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
        area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
        area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
        area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");


        Connection connection = RabbitUntils.getConnection();

        Channel channel = connection.createChannel();

        // 回圈發布氣象局的天氣訊息
        area.forEach((key, value) -> {
            try {
                // map的key作為訊息的routing key
                channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, key, null, value.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("訊息發送完畢!");
        channel.close();
        connection.close();
    }
}

然后讓新浪只接受特定地區特定日期的天氣:

public class Sina {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();

        // 宣告佇列資訊
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
                false, null);

        // ==== 一個佇列可以系結多個路由key =====
        // 指定交換機和佇列以及路由key的關系
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20211127");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20211127");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20211128");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.zhuzhou.20211127");

        // 消費完當前訊息后再獲取下一條訊息
        channel.basicQos(1);

        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
public class Baidu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();

        // 宣告佇列資訊
        channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false,
                false, null);

        channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20211127");
        channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20211128");
        // 消費完當前訊息后再獲取下一條訊息
        channel.basicQos(1);

        channel.basicConsume(RabbitConstant.QUEUE_BAI_DU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【百度天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

然后我們分別啟動新浪和百度,最后啟動氣象局類:
在這里插入圖片描述
在這里插入圖片描述
此時來查看rabbitmq頁面上的交換機系結:
在這里插入圖片描述

5、Topics 通配符模式(模擬接收中國地區的天氣資訊)

在這里插入圖片描述
我們首先來新建一個交換機:
在這里插入圖片描述
創建氣象局:

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 天氣資訊, key作為路由
        Map<String, String> area = new LinkedHashMap<>();
        area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
        area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
        area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
        area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");

        area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
        area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
        area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
        area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");


        Connection connection = RabbitUntils.getConnection();

        Channel channel = connection.createChannel();

        // 回圈發布氣象局的天氣訊息
        area.forEach((key, value) -> {
            try {
                // map的key作為訊息的routing key
                channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, key, null, value.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("訊息發送完畢!");
        channel.close();
        connection.close();
    }
}

接下來我們規定,新浪只被允許接收中國地區的訊息:

public class Sina {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();

        // 宣告佇列資訊
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
                false, null);

        // 指定交換機和佇列以及路由key的關系
        // 交換機的型別在創建的時候就設定好了
        // #表示匹配多個,*表示匹配一個
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");

        // 消費完當前訊息后再獲取下一條訊息
        channel.basicQos(1);

        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

在這里插入圖片描述
百度只被允許接收20211127這一天的訊息:

public class Baidu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUntils.getConnection();

        final Channel channel = connection.createChannel();

        // 宣告佇列資訊
        channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false,
                false, null);

        channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20211127");
        // 消費完當前訊息后再獲取下一條訊息
        channel.basicQos(1);

        channel.basicConsume(RabbitConstant.QUEUE_BAI_DU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費業務即ACK確認消費
                System.out.println("【百度天氣】收到氣象訊息:" + new String(body));

                // 確認消費
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

在這里插入圖片描述

6、訊息確認機制

在這里插入圖片描述
在這里插入圖片描述
1、Confirm測驗

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 天氣資訊, key作為路由
        Map<String, String> area = new LinkedHashMap<>();
        area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
        area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
        area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
        area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");

        area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
        area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
        area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
        area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");


        Connection connection = RabbitUntils.getConnection();
        Channel channel = connection.createChannel();

        // 開啟confirm監聽模式
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 第二個引數代表接收的引數是否為批量接收
                System.out.println("訊息已經被Broker接收. Tag: " + 1);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("訊息已被Brokder拒收. Tag: " + 1);
            }
        });

        // 回圈發布氣象局的天氣訊息
        area.forEach((key, value) -> {
            try {
                // map的key作為訊息的routing key
                channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, null, value.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("訊息發送完畢!");
      // === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
//        channel.close();
//        connection.close();
    }
}

在這里插入圖片描述
注意,此時的Confirm確認代表訊息已經成功發送到Broker中去了,即再等待訊息者完成消費,如果Broker中的佇列滿了或者因為其他原因可能導致產生Return情況,即訊息被Broker退還給生產者,
在這里插入圖片描述
2、Return測驗

public class WeatherBureauReturn {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 天氣資訊, key作為路由
        Map<String, String> area = new LinkedHashMap<>();
        area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
        area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
        area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
        area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");

        area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
        area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
        area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
        area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");


        Connection connection = RabbitUntils.getConnection();
        Channel channel = connection.createChannel();

        // 開啟confirm監聽模式
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 第二個引數代表接收的引數是否為批量接收
                System.out.println("訊息已經被Broker接收. Tag: " + deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("訊息已被Brokder拒收. Tag: " + deliveryTag);
            }
        });

        // 開啟Return監聽模式
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                System.err.println("=================================");

                System.err.println("Return編碼:" + returnMessage.getReplyCode() + "-Return描述:"
                        + returnMessage.getReplyText());
                System.err.println("交換機:" + returnMessage.getExchange() + "-路由key:"
                        + returnMessage.getRoutingKey());
                System.err.println("Return主題:" + new String(returnMessage.getBody()));

                System.err.println("=================================");
            }
        });

        // 回圈發布氣象局的天氣訊息
        area.forEach((key, value) -> {
            try {
                // map的key作為訊息的routing key
                channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, null, value.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("訊息發送完畢!");
        // === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
//        channel.close();
//        connection.close();
    }
}

然后我們創建新浪接收,因為新浪只接受中國地區的天氣資訊,所以我們發送的其他天氣訊息要被退回:

注意,如果要得到回傳結果:basicPublish方法的第三個引數mandatory true代表如果訊息無法正常投遞則return回生產者,如果false,則直接將訊息放棄,

channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, true, null, value.getBytes());
public class WeatherBureauReturn {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 天氣資訊, key作為路由
        Map<String, String> area = new LinkedHashMap<>();
        area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
        area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
        area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
        area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");

        area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
        area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
        area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
        area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");


        Connection connection = RabbitUntils.getConnection();
        Channel channel = connection.createChannel();

        // 開啟confirm監聽模式
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 第二個引數代表接收的引數是否為批量接收
                System.out.println("訊息已經被Broker接收. Tag: " + deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("訊息已被Brokder拒收. Tag: " + deliveryTag);
            }
        });

        // 開啟Return監聽模式
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                System.err.println("=================================");

                System.err.println("Return編碼:" + returnMessage.getReplyCode() + "-Return描述:"
                        + returnMessage.getReplyText());
                System.err.println("交換機:" + returnMessage.getExchange() + "-路由key:"
                        + returnMessage.getRoutingKey());
                System.err.println("Return主題:" + new String(returnMessage.getBody()));

                System.err.println("=================================");
            }
        });

        // 回圈發布氣象局的天氣訊息
        area.forEach((key, value) -> {
            try {
                // map的key作為訊息的routing key
                // 注意:第三個引數為:mandatory true代表如果訊息無法正常投遞則return回生產者,如果false,則直接將訊息放棄,
                channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, true,null, value.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("訊息發送完畢!");
        // === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
//        channel.close();
//        connection.close();
    }
}

我們先來啟動新浪,然后再啟動氣象發布:
在這里插入圖片描述
此時將不滿足條件的訊息直接回傳給了生產者,當然我們可以設定成為直接丟棄等策略,注意,此時的到達和退回指的是訊息被發動到Broker容器中的佇列中,不是消費者已經確認了消費!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290606.html

標籤:其他

上一篇:資料治理-資料質量-資料質量參考架構

下一篇:淺聊 kafka-如何保證生產資料不丟失方案?可用簡單

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more