主頁 > 後端開發 > RabbitMQ不講武德,發個訊息也這么多花招

RabbitMQ不講武德,發個訊息也這么多花招

2021-01-04 06:07:53 後端開發

前言

本篇博客已被收錄GitHub:https://zhouwenxing.github.io/
文中所涉及的原始碼也已被收錄GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模塊)

使用訊息佇列必須要保證生產者發送的訊息能被消費者所接收,那么生產者如何接收訊息呢?下圖是 RabbitMQ 的作業模型:

上圖中生產者會將訊息發送到交換機 Exchange 上,再由 Exchange 發送給不同的 Queue ,而 Queue 是用來存盤訊息佇列,那么假如有多個生產者,那么訊息發送到交換機 Exchange 之后,應該如何和 Queue 之間建立系結關系呢?

如何使用 RabbitMQ 發送訊息

RabbitMQ 中提供了3種發送訊息的路由方式,

直連 Direct 模式

通過指定一個精確的系結鍵來實作 Exchange(交換機) 和 Queue(訊息佇列) 之間的系結,也就是說,當創建了一個直連型別的交換機時,生產者在發送訊息時攜帶的路由鍵(routing key),必須與某個系結鍵(binding key)完全匹配時,這條訊息才會從交換機路由到滿足路由關系訊息佇列上,然后消費者根據各自監聽的佇列就可以獲取到訊息(如下如吐所示,Queue1 系結了 order ,那么這時候發送訊息的路由鍵必須為 order 才能分配到 Queue1 上):

主題 Topic 模式

Direct 模式會存在一定的局限性,有時候我們需要按型別劃分,比如訂單類路由到一個佇列,產品類路由到另一個佇列,所以在 RabbitMQ 中,提供了主題模式來實作模糊匹配,使用主題型別連接方式支持兩種通配符:

直連方式只能精確匹配,有時候我們需要實作模糊匹配,那么這時候就需要主題型別的連接方式,在 RabbitMQ 中,使用主題型別連接方式支持兩種通配符:

  • :表示 0 個或者多個單詞

  • *:表示 1 個單詞

PS:使用通配符時,單詞指的是用英文符號的小數點 . 隔開的字符,如:abc.def 就表示有 abcdef 兩個單詞,

下圖所示中,因為 Queue1 系結了 order.#,所以當發送訊息的路由鍵為 order 或者 order.xxx時都可以使得訊息分配到 Queue1 上:

廣播 Fanout 模式

當我們定義了一個廣播型別的交換機時就不需要指定系結鍵,而且生產者發送訊息到交換機上時,也不需要攜帶路由鍵,此時當訊息到達交換機時,所有與其系結的佇列都會收到訊息,這種模式的訊息發送適用于訊息通知類需求,

如下如所示,Queue1Queue2Queue3 三個佇列都系結到了一個 Fanout 交換機上,那么當 Fanout Exchange 收到訊息時,會同時將訊息發送給三個佇列:

RabbitMQ 提供的后臺管理系統中也能查詢到創建的交換機和佇列等資訊,并且可以通過管理后臺直接創建佇列和交換機:

訊息發送實戰

下面通過一個 SpringBoot 例子來體會一下三種發送訊息的方式,

  • 1、application.yml 檔案中添加如下配置:
spring:
  rabbitmq:
    host: ip
    port: 5672
    username: admin
    password: 123456
  • 2、新增一個 RabbitConfig 配置類(為了節省篇幅省略了包名和匯入 ),此類中宣告了三個交換機和三個佇列,并分別進行系結:
@Configuration
public class RabbitConfig {
    //直連交換機
    @Bean("directExchange")
    public DirectExchange directExchange(){
        return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
    }

    //主題交換機
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
    }

    //廣播交換機
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
    }


    @Bean("orderQueue")
    public Queue orderQueue(){
        return new Queue("LONGLY_WOLF_ORDER_QUEUE");
    }

    @Bean("userQueue")
    public Queue userQueue(){
        return new Queue("LONGLY_WOLF_USER_QUEUE");
    }

    @Bean("productQueue")
    public Queue productQueue(){
        return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
    }

    //Direct交換機和orderQueue系結,系結鍵為:order.detail
    @Bean
    public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
    }

    //Topic交換機和userQueue系結,系結鍵為:user.#
    @Bean
    public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
    }

    //Fanout交換機和productQueue系結
    @Bean
    public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
  • 3、新建一個消費者 ExchangeConsumer 類,不同的方法實作分別監聽不同的佇列:
@Component
public class ExchangeConsumer {

    /**
     * 監聽系結了direct交換機的的訊息佇列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
    public void directConsumer(String msg){
        System.out.println("direct交換機收到訊息:" + msg);
    }

    /**
     * 監聽系結了topic交換機的的訊息佇列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
    public void topicConsumer(String msg){
        System.out.println("topic交換機收到訊息:" + msg);
    }

    /**
     * 監聽系結了fanout交換機的的訊息佇列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
    public void fanoutConsumer(String msg){
        System.out.println("fanout交換機收到訊息:" + msg);
    }
}
  • 4、新增一個 RabbitExchangeController 類來作為生產者,進行訊息發送:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="https://www.cnblogs.com/send/direct")
    public String sendDirect(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no direct message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="https://www.cnblogs.com/send/topic")
    public String sendTopic(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no topic message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="https://www.cnblogs.com/send/fanout")
    public String sendFaout(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no faout message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
        return "succ";
    }
}

  • 5、啟動服務,當我們呼叫第一個介面時候,路由鍵和系結鍵 order.detail 精確匹配時,directConsumer 就會收到訊息,同樣的,呼叫第二介面時,路由鍵滿足 user.# 時,topicConsumer 就會收到訊息,而只要呼叫第三個介面,不論是否指定路由鍵,fanoutConsumer 都會收到訊息,

訊息過期了怎么辦

簡單的發送訊息我們學會了,難道這就能讓我們就此止步了嗎?顯然是不能的,要玩就要玩高級點,所以接下來讓我們給訊息加點佐料,

TTL(Time-To-Live)

TTL 即 一條訊息在佇列中的最大存活時間,在一條在佇列中超過配置的 TTL 的訊息稱為已死訊息,但是需要注意的是,已死訊息并不能保證會立即從佇列中洗掉,但是能保證已死的訊息不會被投遞出去,

設定 TTL 的方式有兩種:

  • 1、給佇列設定 x-message-ttl,此時所有被投遞到佇列中的訊息,都會在到達 TTL 時成為已死訊息,

    這種情況就會出現當一條訊息同時路由到 N 個帶有 TTL 時間的佇列,而由于每個佇列的 TTL 不一定相同,所以同一條訊息在不同的佇列中可能會在不同時間死亡或者不會死亡(未設定 TTL ),所以一個佇列中的訊息死亡不會影響到其他佇列中的訊息,

  • 2、單獨給某一條訊息設定過期時間,

    此時需要注意的時,當訊息達到 TTL 時,可能不會馬上被丟棄,因為只有處于佇列頭部訊息過期后才會被丟棄,假如佇列頭部的訊息沒有設定 TTL,而第 2 條訊息設定了 TTL,那么即使第 2 條訊息成為了已死訊息,也必須要等到佇列頭部的訊息被消費之后才會被丟棄,而已死訊息在被丟棄之前也會被計入統計資料(比如佇列中的訊息總數),所以為了更好的利用 TTL 特性,建議讓消費者在線消費訊息,這樣才能確保訊息更快的被丟棄,防止訊息堆積,

PS:訊息過期和消費者傳遞之間可能存在自然的競爭條件,例如,訊息可能在發送途中(未到達消費者)過期,

佇列的生存

TTL 針對訊息不同的是,我們可以通過設定過期時間屬性 `x-expires`` 來處理佇列,當在指定過期時間內內未使用佇列時,服務器保證將洗掉佇列(但是無法保證在過期時間過后佇列將以多快的速度被洗掉),

TTL 和過期時間實戰

  • 1、在上面定義的 RabbitConfig 類中,再新增一個 TTL 佇列并將其系結到 direct 交換機上:
@Bean("ttlQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl", 5000);//佇列中所有訊息5秒后過期
    map.put("x-expires", 100000);//佇列閑置10秒后被洗掉
    //引數1-name:佇列名稱
    //引數2-durable:是否持久化
    //引數3-exclusive:是否排他,設定為true時,則該佇列只對宣告當前佇列的連接(Connection)可用,一旦連接斷開,佇列自動被洗掉
    //引數4-autoDelete:是否自動洗掉,前提是必須要至少有一個消費者先連上當前佇列,然后當所有消費者都斷開連接之后,佇列自動被洗掉
    return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
    }

//ttl佇列系結到direct交換機(交換機和佇列可以多對多)
@Bean
public Binding ttlBindFanoutExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("test.ttl");
}
  • 2、在 ExchangeConsumer 消費者類上監聽 TTL 佇列(和其他消費者不同的時候,這里為了列印出佇列屬性,改成了通過 Message 物件來接收訊息 ):
/**
 * 監聽ttl訊息佇列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
public void ttlConsumer(Message message){
    System.out.println("ttl佇列收到訊息:" + new String(message.getBody()));
    System.out.println("ttl佇列收到訊息:" + JSONObject.toJSONString(message.getMessageProperties()));
}
  • 3、在生產者類 RabbitExchangeController 上新增一個介面用來測驗發送過期訊息,這里通過 MessageProperties 設定的 expiration 屬性就相當于是給單條訊息設定了一個 TTL
@GetMapping(value="https://www.cnblogs.com/send/ttl")
public String sendTtl(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no ttl message") String msg){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setExpiration("5000");//5秒后被洗掉,即TTL屬性(針對單條訊息)
    Message message = new Message(msg.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,message);
    return "succ";
}
  • 4、此時如果我們把消費者的監聽去掉之后再發送訊息,在管理后臺就可以看到 5 秒之后訊息會被洗掉,10 秒之后佇列會被洗掉,

PS:如果同時給佇列和單條訊息都設定了 TTL,則會以時間短的為主,

其他屬性

佇列中還有其他一些屬性可以設定,在這里我們就不一一舉例了:

  • x-message-ttl:佇列中訊息的存活時間(毫秒),達到TTL的訊息可能會被洗掉,
  • x-expires:佇列在多長時間(毫秒)沒有被訪問以后會被洗掉,
  • x-max-length:佇列中的最大訊息數,
  • x-max-length-bytes:佇列的最大容量(bytes),
  • overflow:佇列溢位之后的策略,主要可以配置如下引數:reject-publish - 直接丟棄最近發布的訊息,如若啟用了 publisher confirm(發布者確認),發布者將通過發送 basic.nack 訊息通知拒絕,如果當前佇列系結有多個消費者,則訊息在收到 basic.nack 拒絕通知后,仍然會被發布到其他佇列;drop-head - 丟棄佇列頭部訊息(集群模式下只支持這種策略) reject-publish-dlx - 最近發布的訊息會進入死信佇列,
  • x-dead-letter-exchange:佇列的死信交換機,
  • x-dead-letter-routing-key:死信交換機的路由鍵,
  • x-single-active-consumer:true/false,表示是否最多只允許一個消費者消費,如果有多個消費者同時系結,則只會激活第一個,除非第一個消費者被取消或者死亡,才會自動轉到下一個消費者,
  • x-max-priority:佇列中訊息的最大優先級, 訊息的優先級不能超過它,
  • x-queue-mode:3.6.0 版本引入的,主要是為了實作惰性加載,佇列將收到的訊息盡可能快的進行持久化操作到磁盤上,然后只有在用戶請求的時候才會加載到 RAM 記憶體,這個引數支持兩個值:defaultlazy,當不進行設定的時候,就是默認為 default,不做任何改變;當設定為 lazy 就會進行懶加載,
  • x-queue-master-locator:為了保證訊息的 FIFO,所以在高可用集群模式下需要選擇一個節點作為主節點,這個引數主要有三種模式:min-masters- 托管最小數量的系結主機的節點;client-local- 選擇宣告的佇列已經連接到客戶端的節點;random- 隨機選擇一個節點,

神奇的死信佇列(Dead Letter)

上面的引數介紹中,提到了死信佇列,這又是什么新鮮的東西呢?其實從名字上來看很好理解,就是指的已死的訊息,或者說無家可歸的訊息,一個訊息進入死信佇列,主要有以下三種條件:

  • 1、訊息被消費者拒絕并且未設定重回佇列,

  • 2、訊息過期(即設定了 TTL),

  • 3、佇列達到最大長度,超過了 Max lengthMax length bytes,則佇列頭部的訊息會被發送到死信佇列,

死信佇列實戰

  • 1、在上面定義的 RabbitConfig 類中,定義一個死信交換機,并將之前的 ttl 佇列新增一個屬性 x-dead-letter-exchange,最后再將死信佇列和死信交換機進行系結:
//直連死信交換機(也可以用topic或者fanout型別交換機)
@Bean("deatLetterExchange")
public DirectExchange deatLetterExchange(){
    return new DirectExchange("LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");
}
@Bean("ttlQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl", 5000);//佇列中所有訊息5秒后過期
    map.put("x-dead-letter-exchange", "LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");//已死訊息會進入死信交換機
    return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
}
//死信佇列
@Bean("deadLetterQueue")
public Queue deadLetterQueue(){
    return new Queue("LONGLY_WOLF_DEAD_LETTER_QUEUE");
}
  • 2、在 ExchangeConsumer 消費者類上將監聽 TTL 佇列的監聽取消,注釋掉監聽:
	/**
     * 監聽ttl訊息佇列
     */
    @RabbitHandler
//    @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
    public void ttlConsumer(Message message){
        System.out.println("ttl佇列收到訊息:" + new String(message.getBody()));
        System.out.println("ttl佇列收到訊息:" + JSONObject.toJSONString(message.getMessageProperties()));
    }
  • 3、此時 TTL 佇列無消費者,并且設定了訊息的 TTL5 秒,所以 5 秒之后就會進入死信佇列,
  • 5、訪問介面:http://localhost:8080/exchange/send/ttl?routingKey=test&msg=測驗死信佇列,發送訊息之后,等待 5 秒就查看訊息,進入死信佇列:

訊息真的發送成功了嗎

了解了訊息的基本發送功能之后,就可以高枕無憂了嗎?訊息發出去之后,消費者真的收到訊息了嗎?訊息發送之后如何知道訊息發送成功了?假如發送訊息路由錯了導致無法路由到佇列怎么辦?大家是不是都有這些疑問呢?別著急,接下來就讓我們來一一來分析一下,

一條訊息從生產者開始發送訊息到消費者消費完訊息主要可以分為以下 4 個階段:

  • 1、生產者將訊息發送到 Broker (即:RabbitMQ 的交換機),
  • 2、交換機將訊息路由到佇列,
  • 3、佇列收到訊息后存盤訊息,
  • 4、消費者從佇列獲取訊息進行消費,

接下來我們就從這 4 個步驟上來逐步分析 RabbitMQ 如何保證訊息發送的可靠性,

訊息真的到達交換機了嗎

當我們發送一條訊息之后,如何知道對方收到訊息了?這就和我們寫信一樣,寫一封信出去,如何知道對方收到我們寄出去的信?最簡單的方式就是對方也給我們回一封信,我們收到對方的回信之后就可以知道自己的信已經成功寄達,

RabbitMQ 中服務端也提供了 2 種方式來告訴客戶端(生產者)是否收到訊息:Transaction(事務)模式和 Confirm(確認)模式,

Transaction(事務) 模式

Java API 編程中開啟事務只需要增加以下代碼即可:

 try {
     channel.txSelect();//開啟事務
     channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
     channel.txCommit();//提交事務
 }catch (Exception e){
     channel.txRollback();//訊息回滾
 }

Spring Boot 中需要對 RabbitTemplate 進行事務設定:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);//開啟事務
    return rabbitTemplate;
}

為了了解 RabbitMQ 當中事務機制的原理,我們在 Wireshark 中輸入 ip.addr==192.168.1.1 對本地 ip 進行抓包,發送一條訊息之后,抓到如下資料包:

通過資料包,可以得出開啟事務之后,除了原本的發送訊息之外,多出了開啟事務和事務提交的通信:

開啟事務之后,有一個致命的缺點就是發送訊息流程會被阻塞,也就是說必須一條訊息發送成功之后,才會允許發送另一條訊息,正因為事務模式有這個缺點,所以一般情況下并不建議在生產環境開啟事務,那么有沒有更好的方式來實作訊息的送達確認呢?那么就讓我們再看看Confirm(確認)模式,

Confirm(確認)模式

訊息確認模式又可以分為三種(事務模式和確認模式無法同時開啟):

  • 單條確認模式:發送一條訊息,確認一條訊息,此種確認模式的效率也不高,
  • 批量確認模式:發送一批訊息,然后同時確認,批量發送有一個缺點就是同一批訊息一旦有一條訊息發送失敗,就會收到失敗的通知,需要將這一批訊息全部重發,
  • 異步確認模式:一邊發送一邊確認,訊息可能被單條確認也可能會被批量確認,

Java API 實作確認模式

  • 單條訊息確認模式
channel.confirmSelect();//開啟確認模式
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (channel.waitForConfirms()){//wait.ForConfirms(long time)方法可以指定等待時間
    System.out.println("訊息確認發送成功");
}
  • 批量確認模式
channel.confirmSelect();//開啟確認模式
//批量發送
for (int i=0;i<10;i++){
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
try{
    channel.waitForConfirmsOrDie();
}catch (IOException e){//只要有1條訊息未被確認,就會拋出例外
    System.out.println("有訊息發送失敗了");
}
  • 異步確認模式
channel.addConfirmListener(new ConfirmListener() {
    /**
      * 已確認訊息,即發送成功后回呼
      * @param deliveryTag -唯一標識id(即發送訊息時獲取到的nextPublishSeqNo)
      * @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的訊息被批量確認,multiple=false,表示只確認了單條
      */
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功回呼
        System.out.println("收到確認訊息了");
        //TODO 可以做一些想做的事
    }

    /**
       * 發送失敗訊息后回呼
       * @param deliveryTag -唯一標識id(即發送訊息時獲取到的nextPublishSeqNo)
       * @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的訊息被批量確認,multiple=false,表示只確認了單條
       */
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失敗回呼
        if (multiple) {//批量確認,<deliveryTag的訊息都發送失敗
            //TODO 訊息重發?
        } else {//非批量,=deliveryTag的訊息發送失敗
            //TODO 訊息重發?
        }
    }
});

channel.confirmSelect();//開啟確認模式
for (int i=0;i<10;i++){//批量發送
    long nextSeqNo = channel.getNextPublishSeqNo();//獲取發送訊息的唯一標識(從1開始遞增)
    //TODO 可以考慮把訊息id存起來
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}

SpringBoot 實作確認模式

通過組態檔 spring.rabbitmq.publisher-confirm-type 引數進行配置確認(舊版本是 spring.rabbitmq.publisher-confirms 引數),

  • 1、新增組態檔屬性配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated # none-表示禁用回呼(默認) simple- 參考RabbitExchangeController#sendWithSimpleConfirm()方法
  • 2、RabbitConfig 組態檔中修改如下:
 @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
//        rabbitTemplate.setChannelTransacted(true);//開啟事務
        //訊息是否成功發送到Exchange
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack){//訊息發送失敗
                    System.out.println("訊息發送失敗,原因為:" + cause);
                    return;
                }
                //訊息發送成功
                System.out.println("訊息發送成功");
            }
        });
        return rabbitTemplate;
    }

這樣當我們發送訊息成功之后,就會收到回呼,

  • 3、當上面的引數配置修改為 simple,則需要在發送訊息的時候使用 invoke 呼叫 waitForConfirms 或者 waitForConfirmsOrDie 方法來確認是否發送成功:
 @GetMapping(value="https://www.cnblogs.com/send/confirm")
 public String sendWithSimpleConfirm(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no direct message") String msg){
       //使用waitForConfirms方法確認
        boolean sendFlag = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE",
                    "routingKey",
                    msg
            );
            return rabbitTemplate.waitForConfirms(5000);
        });
        //也可以使用waitForConfirmsOrDie方法確認
        boolean sendFlag2 = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE",
                    "routingKey",
                    msg
            );
            try {
                rabbitTemplate.waitForConfirmsOrDie(5000);
            }catch (Exception e){
                return false;
            }
            return true;
        });
        System.out.println(sendFlag);
        System.out.println(sendFlag2);
        return "succ";
    }

訊息無法從交換機路由到正確的佇列怎么辦

上面通過事務或者確認機制確保了訊息成功發送到交換機,那么接下來交換機會負責將訊息路由到佇列,這時候假如佇列不存在或者路由錯誤就會導致訊息路由失敗,這又該如何保證呢?

同樣的,RabbitMQ 中也提供了 2 種方式來確保訊息可以正確路由到佇列:開啟監聽模式或者通過新增備份交換機模式來備份資料,

監聽回呼

上面介紹的是訊息是否發送到交換機的回呼,而從交換機路由到佇列,同樣可以開啟確認模式,

Java API 方式開啟監聽模式

下面就是開啟監聽主要代碼,為了節省篇幅,省略了其余不相干代碼(完成代碼已上傳至 GitHub

channel.addReturnListener(new ReturnListener() {
     @Override
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
         System.out.println("收到未路由到佇列的回呼訊息:" + new String(body));
     }
 });
//注意這里的第三個引數,mandatory需要設定為true(發送一個錯誤的路由,即可收到回呼)
channel.basicPublish(EXCHANGE_NAME,"ERROR_ROUTING_KEY",true,null,msg.getBytes());

Spring Boot 開啟監聽模式

RabitConfig 類中添加如下配置:

 @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    rabbitTemplate.setMandatory(true);//開啟監聽回呼
    //訊息是否成功被路由到佇列,沒有路由到佇列時會收到回呼(原setReturnCallback在2.0版本已過期)
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("收到未路由到佇列的回呼訊息:" + new String(returnedMessage.getMessage().getBody()));
        }
    });
    return rabbitTemplate;
}

備份交換機

除了開啟監聽的方式,還可以通過定義備份交換機的方式來實作,當原交換機無法正確路由到佇列時,則會進入備份交換機,再由備份交換機路由到正確佇列(要注意區分備份交換機和死信交換機的區別),

Java API 實作備份交換機

下面就是一個實作備份交換機的例子,因為這里備份交換機定義的是 Topic 型別,所有路由必須滿足定義好的路由,實際使用中一般會設定會 Fanout,因為無法預測錯誤的路由到底是多少:

 //宣告交換機且指定備份交換機
Map<String,Object> argMap = new HashMap<String,Object>();
argMap.put("alternate-exchange","TEST_ALTERNATE_EXCHANGE");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,false,argMap);
//佇列和交換機進行系結
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);

//宣告備份交換機和備份佇列,并系結(為了防止收不到訊息,備份交換機一般建議設定為Fanout型別)
channel.queueDeclare("BAK_QUEUE", false, false, false, null);
channel.exchangeDeclare("TEST_ALTERNATE_EXCHANGE", BuiltinExchangeType.TOPIC);
channel.queueBind("BAK_QUEUE","TEST_ALTERNATE_EXCHANGE","ERROR.#");

String msg = "I'm a bak exchange msg";
channel.basicPublish(EXCHANGE_NAME,"ERROR.ROUTING_KEY",null,msg.getBytes());

Spring Boot 實作備份交換機

Spring Boot 實作備份交換機原理和 Java API 實作相同:

  • 1、首先在 RabbiConfig 中新增兩個交換機,一個是原始交換機,一個是備份交換機,同時新增一個備份佇列和備份交換機進行系結,這里的備份交換機是一個 Fanout 型別,注意因為這里主要是演示備份交換機,所以這里的原始交換機沒有和任何佇列系結,也就無法路由到佇列,從而使得訊息進入備份交換機:
//用于測驗備份交換機的原直連交換機
@Bean("bakDirectEchange")
public DirectExchange bakDirectEchange(){
    Map argMap = new HashMap<>();
    argMap.put("alternate-exchange", "LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
    return new DirectExchange("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",false,false,argMap);
}

//備份廣播交換機
@Bean("bakFanoutExchange")
public FanoutExchange bakFanoutExchange(){
    return new FanoutExchange("LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
}
//備份佇列
@Bean("bakQueue")
public Queue bakQueue(){
    return new Queue("LONELY_WOLF_BAK_QUEUE");
}
//備份交換機和備份佇列進行系結
@Bean
public Binding BindExchange(@Qualifier("bakQueue") Queue queue, @Qualifier("bakFanoutExchange") FanoutExchange fanoutExchange){
    return BindingBuilder.bind(queue).to(fanoutExchange);
}

2、在消費者類 ExchangeConsumer 中監聽備份佇列:

 /**
  * 監聽備份訊息佇列
  */
@RabbitHandler
@RabbitListener(queues = "LONELY_WOLF_BAK_QUEUE")
public void bakQueueConsumer(Message message){
    System.out.println("備份佇列收到訊息:" + new String(message.getBody()));
}
  • 3、最后在生產者類 RabbitExchangeController 中新增一個訊息發送的方法進行訊息發送:
@GetMapping(value="https://www.cnblogs.com/send/bak")
public String sendBak(String routingKey,@RequestParam(value = "https://www.cnblogs.com/lonely-wolf/p/msg",defaultValue = "https://www.cnblogs.com/lonely-wolf/p/no bak message") String msg){
    rabbitTemplate.convertAndSend("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",routingKey,msg);
    return "succ";
}

呼叫之后可以看到,備份佇列會收到訊息,從而說明了訊息在無法路由到佇列時會進入到備份佇列,

佇列存盤訊息后發生例外怎么辦

在保證了前面兩個階段的可靠性之后,訊息終于安全抵達了佇列,那么這時候就絕對安全了嗎?

當我們的消費者的消費速度跟不上生產者的生產速度時,就會導致訊息堆積在佇列中,而默認訊息是沒有持久化的,存在于記憶體之中,所以假如服務器宕機等故障發生,就會導致佇列中的資料丟失,

這里的解決方案也很簡單,就是將訊息進行持久化,在 RabbitMQ 當中,持久化也可以分為 3 種:交換機持久化,佇列持久化和訊息持久化,

雖然說持久化能一定程度上保證訊息的可靠性,然而當出現了服務器的磁盤損壞,依然可能出現訊息丟失,所以為了更加完美,RabbitMQ 集群可能是必須的,當然,本文不會涉及到集群的知識,集群的知識以及搭建會放到下次再來分析,

交換機持久化

宣告交換機時,durable 引數設定為 true

佇列持久化

宣告佇列時,durable 引數設定為 true

訊息持久化

發送訊息時可以將訊息設定為持久化,

Java API 訊息持久化

Java API 中,可以通過如下方式設定訊息持久化:

//deliveryMode=2表示訊息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes());

Spring Boot 訊息持久化

Spring Boot 中可以通過如下方式將訊息設定為持久化:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//訊息持久化
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchangeName","routingKey",message);

消費者消費訊息失敗了怎么辦

踏遍千山萬水,經過 3 層地獄模式,訊息終于被消費者拿到手了,然而悲劇的事情又發生了,消費者消費訊息的時候可能因為消費者本身的問題或者其他意外導致了消費者消費訊息失敗了,這時候訊息還是沒能被正確處理,這時候難道眼睜睜看著最后關頭了束手無策了嗎?

非也,作為一款如此優秀的訊息佇列,怎么可能沒考慮到這種場景呢,還記不記得上面我們提到的確認模式,實際上,上面的兩種確認模式都屬于服務端的確認,在 RabbitMQ 中為消費者也提供了確認模式,這就是消費者的確認,

消費者確認(ack)

佇列當中會把訊息洗掉的前提就是這條訊息被消費者消費掉了,但是服務器如何知道訊息被消費了呢?這就是需要通過消費者確認之后才會洗掉,而我們前面在介紹訊息發送的時候貌似并沒有看到消費者確認流程,這是因為消費者默認在收到訊息后會給服務器一個應答,服務端收到消費者的應答之后,就會洗掉訊息,

Java API 實作消費者應答

Java API 中應答方式有兩種,自動應答和手動應答,當自動應答時,則只要消費者收到訊息就會給服務端確認,不在乎訊息是否消費成功,

  • 1、新建一個消費者 AckConsumer 類(省略了包名和匯入),這里為了實作方便,通過生產者的頭部標記來決定采用何種應答策略:
public class AckConsumer {
    private static String QUEUE_NAME = "ACK_QUEUE";
    public static void main(String[] args) throws Exception{
        //1.宣告連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://username:password@ip:port");

        //2.建立連接
        Connection conn = factory.newConnection();
        //3.創建訊息通道
        Channel channel = conn.createChannel();
        //4.宣告佇列(默認交換機AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" 等待接收訊息...");

        // 創建消費者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("收到訊息: " + new String(body, "UTF-8"));
                Map<String,Object> map = properties.getHeaders();//獲取頭部訊息
                String ackType = map.get("ackType").toString();
                if (ackType.equals("ack")){//手動應答
                    channel.basicAck(envelope.getDeliveryTag(),true);
                }else if(ackType.equals("reject-single")){//拒絕單條訊息
                    //拒絕訊息,requeue引數表示訊息是否重新入隊
                    channel.basicReject(envelope.getDeliveryTag(),false);
                    //                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                }else if (ackType.equals("reject-multiple")){//拒絕多條訊息
                    //拒絕訊息,multiple引數表示是否批量拒絕,為true則表示<deliveryTag的訊息都被拒絕
                    channel.basicNack(envelope.getDeliveryTag(),true,false);
                }
            }
        };

        //開始獲取訊息,第二個引數 autoAck表示是否開啟自動應答
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

  • 2、新建一個生產者 AckProducer 類(省略了包名和匯入):
public class AckProducter {
    private static String QUEUE_NAME = "ACK_QUEUE";//佇列
    private static String EXCHANGE_NAME = "ACK_EXCHANGE";//交換機
    private static String ROUTEING_KEY = "test";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:[email protected]:5672");
        // 建立連接
        Connection conn = factory.newConnection();
        // 創建訊息通道
        Channel channel = conn.createChannel();
        Map<String, Object> headers = new HashMap<String, Object>(1);
        headers.put("ackType", "ack");//請應答
//        headers.put("ackType", "reject-single");//請單條拒絕
//        headers.put("ackType", "reject-multiple");//請多條拒絕

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("UTF-8")  // 編碼
                .headers(headers) // 自定義屬性
                .messageId(String.valueOf(UUID.randomUUID()))
                .build();

        String msg = "I'm a ack message";
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false);
        //佇列和交換機進行系結
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);
        // 發送訊息
        channel.basicPublish(EXCHANGE_NAME, ROUTEING_KEY, properties, msg.getBytes());

        channel.close();
        conn.close();
    }
}

Spring Boot 實作消費者應答

Spring Boot 中消費者給服務端的確認方式分為 3 種:

  • NONE:自動應答(ack),

  • MANUAL:手動應答(ack),如果設定為手動應答,而消費者又遲遲不給服務器應答,那么訊息就會一直存在佇列,可能會造成訊息堆積和重復消費現象,

  • AUTO:當沒有拋出例外時會自動應答(ack),除此外,當發生例外時,分為以下三種情況:

    • 1、當拋出 AmqpRejectAndDontRequeueException 例外時,訊息會被拒絕,也不會重新入隊,
    • 2、當拋出 ImmediateAcknowledgeAmqpException 例外時,消費者會自動發送應答給服務端,
    • 3、當拋出其他例外時,訊息會被拒絕,且會重新入隊,當出現這種情況且消費者只有一個時,非常容易造成死回圈,所以應該極力避免這種情況的發生,
  • 1、Spring Boot 中可以通過引數控制應答型別:

spring:
  rabbitmq:
    listener:
      type: simple # direct型別是2.0之后才有的
      simple:
        acknowledge-mode: manual
  • 2、在消費者類 ExchangeConsumer 中新建一個方法來監聽佇列,其中第一個注釋掉的方法是原本存在的,第二個方法是新增的,主要新增了幾個引數,注意 Channelcom.rabbitmq.client.Channel 包下的:
/**
 * 監聽系結了direct交換機的的訊息佇列
 */
//    @RabbitHandler
//    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
//    public void directConsumer(String msg){
//        System.out.println("direct交換機收到訊息:" + msg);
//    }

/**
 * 監聽系結了direct交換機的的訊息佇列,并進行手動應答
 */
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void manualDirectConsumer(String msg, Channel channel,Message message) throws IOException {
    System.out.println("direct交換機收到訊息:" + msg + ",此訊息需要手動應答");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動應答
}
  • 3、或者也可以通過 SimpleMessageListenerContainer 類實作監聽,新建一個 RabbitAckConfig 類(省略了包名和匯入):
@Configuration
public class RabbitAckConfig {
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("LONGLY_WOLF_ORDER_QUEUE");//設定監聽佇列名
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手動確認
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {//訊息處理
            System.out.println("收到訊息:" + new String(message.getBody()) + ",此訊息需要手動應答");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        });
        return container;
    }
}

PS:需要注意的是,這兩種方式不要同時使用,否則無法保證訊息會被哪個監聽到,

僅靠 RabbitMQ 自身可靠性能實作業務需求嗎

上面介紹的兩種確認模式,服務端確認和消費者確認,其中服務端確認是會回呼給生產者的,所以生產者可以知道訊息是否已經到達服務器且是否正確路由到佇列,然而,對于消費者的確認,生產者是不知道的,這是因為訊息佇列的作用之一就是為了實作生產者和消費者的解耦,換言之,消費者知道訊息成功發送到佇列,但是無法知道訊息是否被消費者消費

所以為了知道訊息是否被成功消費,主要有兩種思路:

  • 1、消費者在消費成功之后需要回呼生產者提供的API來告知訊息已經被消費
  • 2、服務端在收到消費者確認后給生產者一個回執通知

然而假如生產者遲遲沒有收到消費者是否消費成功的資訊,那么可能就需要補償,比如微信支付等都會有補償機制,間隔一定時間就將訊息重發一次,

補償機制同時也會帶來一個問題,假如說消費者消費成功了,但是在告訴生產者的時候失敗了,那么這時候訊息如果再次補償就會造成重復消費,所以消費者需要支持冪等(即無論一條訊息被消費多少次,都不會改變結果),當然,同時還有其他場景需要考慮,比如訊息之間的依賴性等等問題都需要結合具體業務場景來具體處理,

總結

本文主要講述了 RabbitMQ 的訊息發送方式,介紹了 3 種不同交換機的方式,同時最后也從發送訊息的主要 4 個步驟分析了每一個步驟如何保證訊息的可靠性,并分別通過 Java APISpring Boot 提供了示例,中間還提到了死信佇列,死信佇列本質也是一個佇列,只不過存盤的訊息比較特殊,相信通過本文,大家對 RabbitMQ 會有一個更深層次的了解,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/244115.html

標籤:Java

上一篇:Java生鮮電商平臺-開放API介面簽名驗證(小程式/APP)

下一篇:【Java面向物件】介面的概念與使用 介面和抽象類的區別

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more