主頁 >  其他 > 訊息佇列:RabbitMQ

訊息佇列:RabbitMQ

2022-01-25 07:31:22 其他

訊息佇列 Message Queue

一、 訊息中間件概述

1.大多應用中,可通過訊息服務中間件來提升系統異步通信、擴展解耦能力

2.訊息服務中兩個重要概念:

  • 訊息代理(message broker)目的地(destination)
  • 當訊息發送者發送訊息以后,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地,

3.訊息佇列主要有兩種形式的目的地

  • 佇列(queue):點對點訊息通信(point-to-point)
  • 主題(topic):發布(publish)/訂閱(subscribe)訊息通信

4.點對點式

  • 訊息發送者發送訊息,訊息代理將其放入一個佇列中,訊息接收者從佇列中獲 取訊息內容,訊息讀取后被移出佇列
  • 訊息只有唯一的發送者和接受者,但并不是說只能有一個接收者

5.發布訂閱式:

  • 發送者(發布者)發送訊息到主題,多個接收者(訂閱者)監聽(訂閱)這個 主題,那么就會在訊息到達時同時收到訊息

6.JMS(Java Message Service)JAVA訊息服務:

  • 基于JVM訊息代理的規范,ActiveMQ、HornetMQ是JMS實作

7.AMQP(Advanced Message Queuing Protocol)

  • 高級訊息佇列協議,也是一個訊息代理的規范,兼容JMS

  • RabbitMQ是AMQP的實作

8.Spring支持

  • spring-jms提供了對JMS的支持

  • spring-rabbit提供了對AMQP的支持

  • 需要ConnectionFactory的實作來連接訊息代理

  • 提供JmsTemplate、RabbitTemplate來發送訊息

  • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上監聽訊息代理發布的訊息

  • @EnableJms@EnableRabbit開啟支持

9.Spring Boot自動配置

  • **JmsAutoConfiguration **

  • RabbitAutoConfiguration

10.市面上的MQ產品

?ActiveMQRabbitMQRocketMQKafka

提到訊息中間件就要想到異步消峰解耦

在這里插入圖片描述

在這里插入圖片描述

訊息佇列主要分為兩大類:一類是JMS(Java Message Service)JAVA訊息服務,另一類是:AMQP(Advanced Message Queuing Protocol)

在這里插入圖片描述

二、 RabbitMQ

2.1 RabbitMQ簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實作,

2.2 核心概念

Message

訊息,訊息是不具名的,它由訊息頭和訊息體組成,訊息體是不透明的,而訊息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對于其他訊息的優先權)、delivery-mode(指出該訊息可 能需要持久性存盤)等,

Publisher

訊息的生產者,也是一個向交換器發布訊息的客戶端應用程式,

Exchange

交換器,用來接收生產者發送的訊息并將這些訊息路由給服務器中的佇列,

Exchange有4種型別:direct(默認)fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別

Queue

訊息佇列,用來保存訊息直到發送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直 在佇列里面,等待消費者連接到這個佇列將其取走,

Binding

系結,用于訊息佇列和交換器之間的關聯,一個系結就是基于路由鍵將交換器和訊息佇列連接起來的路由規則,所以可以將交 換器理解成一個由系結構成的路由表,

Exchange 和Queue的系結可以是多對多的關系,

Connection

網路連接,比如一個TCP連接,

Channel

信道,多路復用連接中的一條獨立的雙向資料流通道,信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道 發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過信道完成,因為對于作業系統來說建立和銷毀 TCP 都 是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接,

Consumer

訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式,

Virtual Host

虛擬主機,表示一批交換器、訊息佇列和相關物件,虛擬主機是共享相同的身份認證和加密環境的獨立服務器域,每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁 有自己的佇列、交換器、系結和權限機制,vhostAMQP 概念的基礎,必須在連接時 指定,RabbitMQ 默認的 vhost 是 / ,

Broker

表示訊息佇列服務器物體,

在這里插入圖片描述

2.3 docker安裝rabbitmq

安裝命令:

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

對應埠號解釋:

  • 4369, 25672 (Erlang發現&集群埠)
  • 5672, 5671 (AMQP埠)
  • 15672 (web管理后臺埠)
  • 61613, 61614 (STOMP協議埠)
  • 1883, 8883 (MQTT協議埠)

可訪問 ip地址 : 15672 訪問控制頁面

2.4 RabbitMQ運行機制

AMQP中的訊息路由

AMQP 中訊息的路由程序和 Java 開 發者熟悉的 JMS 存在一些差別, AMQP 中增加了 Exchange 和 Binding 的角色,

生產者把訊息發布 到 Exchange 上,訊息最終到達佇列 并被消費者接收,而 Binding 決定交 換器的訊息應該發送到那個佇列,

在這里插入圖片描述

Exchange型別

Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種型別:direct、 fanout、topic、headers ,

headers 匹配 AMQP 訊息的 header 而不是路由鍵, headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接 看另外三種型別,

Direct Exchange

訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器 就將訊息發到對應的佇列中,路由鍵與隊 列名完全匹配,如果一個佇列系結到交換 機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發 “dog.puppy”,也不會轉發“dog.guard” 等等,它是完全匹配、單播的模式

在這里插入圖片描述

Fanout Exchange

每個發到 fanout 型別交換器的訊息都 會分到所有系結的佇列上去,

fanout 交換器不處理路由鍵,只是簡單的將佇列 系結到交換器上,每個發送到交換器的 訊息都會被轉發到與該交換器系結的所 有佇列上,

很像子網廣播,每臺子網內 的主機都獲得了一份復制的訊息,

fanout 型別轉發訊息是最快的,

在這里插入圖片描述

Topic Exchange

topic 交換器通過模式匹配分配訊息的 路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要系結到一個模式上,

它將路由鍵和系結鍵的字串切分成單詞,這些單詞之間用點隔開,它同樣也會識別兩個通配符:符號“#”和符號“*”,

#匹配0個或多個單詞,* 匹配一個單詞,

在這里插入圖片描述

三、 RabbitMQ整合SpringBoot

向pom.xml中引入springboot-starter:

<!-- 引入RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

觀察RabbitAutoConfiguration類可以看出,該配置類向容器中注入了幾個重要的Bean物件:CachingConnectionFactoryRabbitTemplateAmqpAdmin

(1) CachingConnectionFactory

RabbitTemplate使用CachingConnectionFactory作為連接工廠

配置類上標有這樣的注解:@EnableConfigurationProperties(RabbitProperties.class)

向容器中注入CachingConnectionFactory的代碼中是從組態檔中加載配置資訊的,

spring.rabbitmq為配置的前綴,可以指定一些埠號,ip地址等資訊,

#配置域名和埠號
spring.rabbitmq.host=192.168.190.131
spring.rabbitmq.port=5672
#配置虛擬地址
spring.rabbitmq.virtual-host=/

(2) AmqpAdmin

AmqpAdminorg.springframework.amqp.core下的類,通過此類,可以用代碼的方式創建Exchange、Queue還有Binding,

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createBinding() {
    // String destination 目的地
    // DestinationType destinationType 系結型別:佇列/交換機
    // String exchange 交換機名稱
    // String routingKey 路由鍵
    //、Map<String, Object> arguments 引數
    Binding binding = new Binding("hello.queue" , Binding.DestinationType.QUEUE, "hello", "hello.queue",null);
    amqpAdmin.declareBinding(binding);
}

@Test
public void createMQ() {
    /**
     * @param name 佇列的名稱
     * @param durable 是否持久化佇列
     * @param exclusive 是否宣告為一個獨占佇列
     * @param autoDelete 如果服務不在使用時是否自動洗掉佇列
     */
    Queue queue = new Queue("hello.queue", true, false, false);
    String s = amqpAdmin.declareQueue(queue);
    log.info("創建queue成功... {}", queue);
}

@Test
public void createExchange() {
    // String name 交換機名稱
    // boolean durable 是否持久化
    // boolean autoDelete 是否自動洗掉
    Exchange exchange = new DirectExchange("hello", true, false);
    amqpAdmin.declareExchange(exchange);
    log.info("創建exchange成功...");
}

(2) RabbitTemplate

通過RabbitTemplate類中的方法,可以像使用Rabbit客戶端一樣向佇列發送訊息以及更多其他的操作,并且多個多載的”send“(發送訊息)方法,

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void test() {
    // 發送訊息
    rabbitTemplate.convertAndSend("hello", "hello.queue"  ,"msg");
}

發送的訊息不僅可以是一個序列化的物件,還可以是Json格式的文本資料,

通過指定不同的MessageConverter來實作,可以向容器中注入我們想要的MessageConverter從而使用,

在這里插入圖片描述

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

(3) @RabbitListener和@RabbitHandler注解

@RabbitListener注解和@RabbitHandler都可以接受訊息佇列中的訊息,并進行處理,

@RabbitListener注解:

可以標記方法或類上進行使用

自定義方法的引數可以為以下型別:

1、Message message:原生訊息詳細資訊,頭 + 體

2、T <發送的訊息的型別> 可以是我們自定義的物件

3、Channel channel :當前傳輸資料的信道,

@RabbitListener(queues = {"hello.queue"})
public String receiveMessage(Message message, OrderEntity content) {
    //訊息體資訊
    byte[] body = message.getBody();
    // 訊息頭資訊
    MessageProperties messageProperties = message.getMessageProperties();
    log.info("收到的訊息: {}", content);
    return "ok";
}

同時要注意:Queue可以由很多方法來監聽,只要收到訊息,佇列就洗掉訊息,并且只能有一個方法收到訊息,并且一個方法接收訊息是一個線性的操作,只有處理完一個訊息之后才能接收下條訊息,

@RabbitHandler注解:

@RabbitHandler標在方法上,

@RabbitHandler標記的方法結合@RabbitListener,@RabbitHandler使用可以變得更加靈活,

比如說,當兩個方法對一個訊息佇列進行監聽時,用于監聽的兩個方法用于接收訊息內容的引數不同,根據訊息的內容可以自動的確定使用那個方法,

@Slf4j
@Controller
@RabbitListener(queues = {"hello.queue"})
public class RabbitController {
    @RabbitHandler
    public String receiveMessage(Message message, OrderReturnReasonEntity content) {
        //訊息體資訊
        byte[] body = message.getBody();
        // 訊息頭資訊
        MessageProperties messageProperties = message.getMessageProperties();

        log.info("收到的訊息: {}", content);
        return "ok";
    }

    @RabbitHandler
    public String receiveMessage2(Message message, OrderEntity content) {
        //訊息體資訊
        byte[] body = message.getBody();
        // 訊息頭資訊
        MessageProperties messageProperties = message.getMessageProperties();

        log.info("收到的訊息: {}", content);
        return "ok";
    }
}

四、 RabbitMQ訊息確認機制

概念:

  • 保證訊息不丟失,可靠抵達,可以使用事務訊息,但是性能會下降250倍,為此引入確認機制

  • publisher confirmCallback 確認模式

  • publisher returnCallback 未投遞到 queue 退回模式

  • consumer ack機制

在這里插入圖片描述

4.1 訊息確認機制-可靠抵達(發送端)

① ConfirmCallback

ConfirmCallbackRetruhnCallback一樣都是RabbitTemplate內部的介面,

訊息只要被 broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會呼叫 confirmCallback,

也就是說當訊息到達RabbitMQ的服務器就會執行回呼方法,

首先需要修改組態檔:

spring.rabbitmq.publisher-confirms=true

然后準備一個發送訊息使用的介面和兩個用來監聽訊息佇列并接收訊息的方法

發送訊息介面:

@RestController
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg")
    public String sendMsg() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setId(1L);
                orderEntity.setMemberUsername("Tom");
                orderEntity.setReceiveTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
            } else {
                OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                orderReturnReasonEntity.setCreateTime(new Date());
                orderReturnReasonEntity.setId(2L);
                orderReturnReasonEntity.setName("test");
                orderReturnReasonEntity.setSort(1);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));
            }
        }
        return "ok";
    }
}

監聽訊息佇列并接收訊息的方法:

@RabbitListener(queues = {"hello.news"})
@Slf4j
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) {
        //訊息體資訊
        byte[] body = message.getBody();
        // 訊息頭資訊
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("receiveMessage1 接收訊息: " + content);
    }

    @RabbitHandler
    public void receiveMessage2(Message message, OrderEntity content, Channel channel) {
        //訊息體資訊
        byte[] body = message.getBody();
        // 訊息頭資訊
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("receiveMessage2 接收訊息: " + content);
    }
}

第三步,在配置類中定制RedisTemplate:

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @PostConstruct // 該注解表示在初始化構造器之后就呼叫,初始化定制 RabbitTemplate
    public void initRabbitTemplate() {
        // 設定確認回呼
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 當前訊息的唯一相關資料 (這個是訊息的唯一id)
             * @param ack 訊息是否成功收到
             * @param cause 失敗的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback... correlationData: [" + correlationData + "] ==> ack: [" + ack + "] ==> cause: [" + cause + "]");
            }
        });
    }
}

然后訪問localhost:9000/sendMsg,就會發送訊息,觀察結果:

在這里插入圖片描述

用于接收訊息的兩個方法都接收到了訊息,并且自定義的ConfirmCallback回呼方法會列印相關資訊,

② ReturnCallback

被 broker 接收到只能表示 message 已經到達服務器,并不能保證訊息一定會被投遞到目標 queue 里,所以需要用到接下來的 returnCallback

如果在交換機將訊息投遞到queue的程序中,發生了某些問題,最終導致訊息投遞失敗,就會觸發這個方法,

為定制的RabbitTemplate添加這個方法:

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    /**
     * @param message 投遞失敗的訊息的詳細資訊
     * @param replyCode 回復的狀態碼
     * @param replyText 回復的文本內容
     * @param exchange 但是這個訊息發給哪個交換機
     * @param routingKey 當時這個訊息使用哪個路由鍵
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("FailMessage: [" + message + "] ==> replyCode: [" + replyText + "] ==> exchange: [" + exchange + "] ==> routingKey: [" + routingKey + "]");
    }
});

我們在發送訊息的一端故意寫錯路由鍵,致使exchange投遞訊息失敗,最后會看到回呼方法ReturnCallback中列印的內容:

在這里插入圖片描述

FailMessage: [(Body:'{"id":2,"name":"test","sort":1,"status":null,"createTime":1641608721639}' MessageProperties [headers={spring_returned_message_correlation=b6b21f2d-73ad-473d-9639-feec76953c7b, __TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] ==> replyCode: [NO_ROUTE] ==> exchange: [hello-java-exchange] ==> routingKey: [hello.news1]

補充:在發送訊息的時候還可以指定一個CorrelationData型別的引數(可以回顧上文的發送訊息的方法),這個CorrelationData類的構造器引數可以填一個UUID,代表訊息的唯一id,在重寫ConfirmCallback中的方法的第一個引數就是這個,通過這個引數就可以獲取訊息的唯一id,

注意:監聽方法回傳值必須為void,否則控制臺會不斷列印報錯資訊,(血的教訓)

4.2 訊息確認機制-可靠抵達(消費端)

ACK(Acknowledge)訊息確認機制

消費者獲取到訊息,成功處理,可以回復Ack給Broker

  • basic.ack用于肯定確認;broker將移除此訊息
  • basic.nack用于否定確認;可以指定broker是否丟棄此訊息,可以批量
  • basic.reject用于否定確認;同上,但不能批量

在默認狀況下,ACK訊息確認機制是當訊息一旦抵達消費方法就會直接出隊(洗掉),但是如果在訊息消費程序中服務器宕機了,這些訊息也會被洗掉,這就造成了訊息丟失的問題,

通過配置可以開啟訊息需要經過手動確認,才能從佇列中洗掉訊息

#手動ack訊息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

改寫方法:

@RabbitHandler
public void receiveMessage2(Message message, OrderEntity content, Channel channel) {
    //訊息體資訊
    byte[] body = message.getBody();
    // 訊息頭資訊
    MessageProperties messageProperties = message.getMessageProperties();
    long deliveryTag = messageProperties.getDeliveryTag();
    //手動接收訊息
    //long deliveryTag相當當前訊息派發的標簽,從messageProperties中獲取,并且在Channel中自增的
    //boolean multiple 是否批量確認
    try {
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("receiveMessage2 接收訊息: " + content);
}

我們在上方的代碼打上斷點并觀察RabbitMQ客戶端的狀況:

在這里插入圖片描述

對中總共有5條訊息,并且進入了Unacked,即未被確認的狀態,

但是這里使用debug模式啟動然后關掉服務模擬服務器宕機會發生一個問題,就是在關閉服務之前,idea會將未執行完的方法先執行完再關閉服務,

所以可以在cmd殺掉行程模擬宕機,

這時,由于打了斷點,沒有走到訊息確認的那一行代碼,隨機,服務器宕機,所有沒有確認的訊息都會從Unacked的狀態回呼Ready的狀態,

有接收訊息的方法就有拒絕訊息的方法:basicNackbasicReject

//long deliveryTag 當前訊息派發的標簽
//boolean multiple 是否批量處理
//boolean requeue 拒絕后是否將訊息重新入隊
channel.basicNack(deliveryTag, false, true);
channel.basicReject(deliveryTag, true);

basicNackbasicReject都可以用來拒絕訊息,但是basicNackbasicReject多了一個引數boolean multiple(是否批量處理)

如果將requeue設定為true,被拒絕的訊息就會重新入隊等待消費,

五、 RabbitMQ延時佇列(實作定時任務)

場景:

比如未付款訂單,超過一定時間后,系統自動取消訂單并釋放占有物品,

常用解決方案

spring的 schedule 定時任務輪詢資料庫

缺點

消耗系統記憶體、增加了資料庫的壓力、存在較大的時間誤差

解決:rabbitmq的訊息TTL和死信Exchange結合

(1) 訊息的TTL(Time To Live)

訊息的TTL就是訊息的存活時間

RabbitMQ可以對佇列和訊息分別設定TTL,

對佇列設定就是佇列沒有消費者連著的保留時間,也可以對每一個單獨的訊息做單獨的設定,超過了這個時間,我們認為這個訊息就死了,稱之為死信,

如果佇列設定了,訊息也設定了,那么會取小的,所以一個訊息如果被路由到不同的佇列中,這個訊息死亡的時間有可能不一樣(不同的佇列設定),這里單講單個訊息的

TTL,因為它才是實作延遲任務的關鍵,可以通過設定訊息的expiration欄位或者x-message-ttl屬性來設定時間,兩者是一樣的效果,

(2) Dead Letter Exchanges(DLX)死信路由

一個訊息在滿足如下條件下,會進死信路由,記住這里是路由而不是佇列, 一個路由可以對應很多佇列,

什么是死信?

  • 一個訊息被Consumer拒收了,并且reject方法的引數里requeue是false,也就是說不 會被再次放在佇列里,被其他消費者使用,*(basic.reject/ basic.nack)*requeue=false
  • 上面的訊息的TTL到了,訊息過期了,
  • 佇列的長度限制滿了,排在前面的訊息會被丟棄或者扔到死信路由上,

**Dead Letter Exchange(死信路由)**其實就是一種普通的exchange,和創建其他exchange沒有兩樣,只是在某一個設定Dead Letter Exchange的佇列中有 訊息過期了,會自動觸發訊息的轉發,發送到Dead Letter Exchange中去,

我們既可以控制訊息在一段時間后變成死信,又可以控制變成死信的訊息 被路由到某一個指定的交換機,結合二者,其實就可以實作一個延時佇列,

手動ack&例外訊息統一放在一個佇列處理建議的兩種方式

  • catch例外后,手動發送到指定佇列,然后使用channel給rabbitmq確認訊息已消費
  • 給Queue系結死信佇列,使用nack(requque為false)確認訊息消費失敗

延時佇列的實作:

方式一:設定一個有過期時間的訊息佇列

在這里插入圖片描述

方式二:發送的訊息賦予過期時間,

在這里插入圖片描述

但是基于RabbitMQ對訊息的惰性處理,通常選擇方式一,

(3) 延遲訊息佇列樣例測驗

示意圖:

在這里插入圖片描述

如果沒有RabbitMQ中沒有創建過訊息佇列、交換機等,可以通過@Bean注入容器的方式創建,

配置類:

@Configuration
public class MyRabbitMQConfig {
    @Bean
    public Queue orderDelayQueue() {
        /*
            Queue(String name,  佇列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自動洗掉
            Map<String, Object> arguments) 屬性
         */
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);
        return new Queue("order.delay.queue", true, false, false, arguments);
    }

    @Bean
    public Queue orderReleaseOrderQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

    /**
     * TopicExchange
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        /**
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         */
        return new TopicExchange("order-event-exchange", true, false);
    }

    @Bean
    public Binding orderCreateBinding() {
        /*
         * String destination, 目的地(佇列名或者交換機名字)
         * DestinationType destinationType, 目的地型別(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    @Bean
    public Binding orderReleaseBinding() {

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }
}

發送和接收訊息的方法:

@Autowired
RabbitTemplate rabbitTemplate;

@RabbitListener(queues = "order.release.order.queue")
public void listener(Message message, Channel channel, OrderEntity entity) throws IOException {
    System.out.println("收到過期的訊息,準備關閉的訂單:" + entity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}


@ResponseBody
@GetMapping("/test/createOrder")
public String testCreateOrder() {

    OrderEntity entity = new OrderEntity();
    // 設定訂單號
    entity.setOrderSn(UUID.randomUUID().toString());
    entity.setCreateTime(new Date());
    rabbitTemplate.convertAndSend("order-event-exchange",
            "order.create.order",
            entity);
    return "ok";
}

六、 訊息丟失、重復、積壓問題

1、訊息丟失

(1) 訊息發送出去,因為網路問題沒有抵達服務器

解決方案:

  • 做好容錯方法(try-catch),發送訊息可能會網路失敗,失敗后要有重試機制,可記錄到資料庫,采用定期掃描重發的方式,
  • 做好日志記錄,每個訊息狀態是否都被服務器收到都應該記錄,
  • 做好定期重發,如果訊息沒有發送成功,定期去資料庫掃描未成功的訊息進行重發,

(2) 訊息抵達Broker,Broker要將訊息寫入磁盤(持久化)才算成功,此時Broker尚未持久化完成,宕機

解決方案:

  • publisher也必須加入確認回呼機制,確認成功的訊息,修改資料庫訊息狀態,

(3) 自動ACK的狀態下,消費者收到訊息,但沒來得及訊息然后宕機

  • 一定開啟手動ACK,消費成功才移除,失敗或者沒來得及處理就NoAck并重新入隊

2、訊息重復

(1) 訊息消費成功,事務已經提交,ack時,機器宕機,導致沒有ack成功,Broker的訊息重新由unack變為ready,并發送給其他消費者,

(2) 訊息消費失敗,由于重試機制,自動又將訊息發送出去

(3) 成功消費,ack時宕機,訊息由unack變為ready,Broker又重新發送

解決方案:

  • 消費者的業務消費介面應該設計為冪等性的,比如扣庫存有 作業單的狀態標志,
  • 使用防重表(redis/mysql),發送訊息每一個都有業務的唯 一標識,處理過就不用處理,
  • rabbitMQ的每一個訊息都有redelivered欄位,可以獲取是否是被重新投遞過來的,而不是第一次投遞過來的,

3、訊息積壓

(1) 消費者宕機積壓

(2) 消費者消費能力不足積壓

(3) 發送者發送流量太大

解決方案:

  • 上線更多的消費者,進行正常消費
  • 上線專門的佇列消費服務,將訊息先批量取出來,記錄資料庫,離線慢慢處理

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/420481.html

標籤:其他

上一篇:阿里云MaxComputer SQL學習之內置函式

下一篇:返回列表

標籤雲
其他(135984) Python(24228) JavaScript(15073) Java(14739) C(11147) 區塊鏈(8215) AI(6935) 基礎類(6313) MySQL(5230) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4207) Linux(4118) PHP(3814) C#(3716) 爪哇(3561) html(3374) C語言(3288) C++語言(3117) sql(3024) R(2776) 熊猫(2774) Java相關(2746) 数组(2739) 疑難問題(2699) 反应(2482) 單片機工控(2479) css(2105) 数据框(1968) Web開發(1951) 节点.js(1938) VBA(1919) 網絡通信(1793) 蟒蛇-3.x(1774) 數據庫相關(1767) VB基礎類(1755) .NETCore(1671) ASP.NET(1650) 開發(1646) 系統維護與使用區(1617) C++(1582) 列表(1581) 基礎和管理(1579) json(1568) JavaEE(1566) 安卓(1523) HtmlCss(1519) 專題技術討論區(1515) Windows客戶端使用(1484) 扑(1469) iOS(1432) 查询(1408) .NET技术(1404) 打字稿(1376) Unity3D(1366) VCL組件開發及應用(1353) sql-server(1287) Qt(1283) 细绳(1226) HTML(CSS)(1220)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 訊息佇列:RabbitMQ

    總結了訊息中間件的特性、應用以及相關問題。詳細介紹了如何使用RabbitMQ解決分布式相關問題。...

    uj5u.com 2022-01-25 07:31:22 more
  • 阿里云MaxComputer SQL學習之內置函式

    ????在上一節中,我們學習了MaxComputer SQL的DML語言,并用DataWork給大家演示了一遍,今天我們進入內置函式的學習,這一部分中,我們接觸到的內置函式比較多,大家只要記住一些常用的,其他的函式知道有這么一個功能存在就行,對往期內容感興趣的小伙伴可以參考下面的文章????:hadoop專題: hadoop系列文章.spark專題: spark系列文章.阿里云系列: 阿里云MaxComputer SQL學習之DDL.阿里云系列: 阿里云MaxComputer SQL學習之DML....

    uj5u.com 2022-01-25 07:30:05 more
  • 年終獎領完想跳槽?先看看這份程式員薪資調查報告吧

    前言隨著 AI、大資料、云計算、5G 技術的成熟,各類技術人才成為各大行業爭奪重點。據相關資料顯示,大資料方面人才的需求,66% 來自于資訊技術行業,其他需求來源,則分布于智能制造、金融、地產、現代服務業等行業,且這部分需求有進一步增加的趨勢。過去的 2021 年,IT 技術崗位當中什么崗位最受歡迎?各崗位的薪資水平如何呢?你目前的薪資處于什么級別?有沒有技術含量低但高薪的職位呢?各類程式員崗位薪酬漲幅明顯與 2020 年平均薪酬相比,2021 年程式員崗位則是 1-3 年、3-5 年 及 5-1...

    uj5u.com 2022-01-25 07:28:02 more
  • spark技術學習與思考(sparkcore&sparksql)

    Spark 產生之前,已經有 MapReduce 這類非常成熟的并行計算框架存在了,并提供了高層次的API(map/reduce),它在集群上進行計算并提供容錯能力,從而實作分布式計算。所以為什么 spark 會流行呢?...

    uj5u.com 2022-01-25 07:26:53 more
  • Java開發之實時計算--Flink

    簡介介紹計算框架對java開發的重要性介紹flink的架構介紹flink的基本概念:常用算子、checkpoint、state、window介紹flink的編程模型:DataStream、DataSet、Table API、SQL介紹flink的部署計算框架每個Java開發一定要懂至少一個流行的計算框架,因為現在的資料量越來越大,光靠資料庫或者手寫代碼去實作難度已經越來越大,不僅涉及到資源調度,還要考慮分布式,并且還要考慮高可用、容錯等等,因此我們需要借助現有的分布式計算框架來實作我們大規...

    uj5u.com 2022-01-25 07:25:33 more
  • ElasticSearch

    介紹了ElasticSearch的基本概念,以及相關使用,并結合專案中的業務進行了總結。...

    uj5u.com 2022-01-25 07:25:08 more
  • hive中多表full join主鍵重復問題

    目錄0. 其他1. 問題描述2. 問題復現2.1. 建表陳述句2.2. 插入資料2.3. 查詢SQL以及問題3. 問題原因4. 問題解決0. 其他1. 問題描述在Hive中(其他類似SQL,比如PostgreSQL可能也存在此問題),當對多張表(3張及以上)進行full join時,會存在每張表的主鍵都是唯一,但當full join后,會發現主鍵可能有重復。2. 問題復現2.1. 建表陳述句create table realtime_dw......

    uj5u.com 2022-01-25 07:24:53 more
  • RabbitMQ 超詳細入門篇

    RabbitMQ 入門篇????MQ 的基本概念:什么是 MQ ?MQ全稱為Message Queue即訊息佇列"訊息佇列" 是在訊息的傳輸程序中保存訊息的容器它是典型的:生產者————消費者模型生產者不斷向訊息佇列中生產訊息 ———————— 消費者不斷的從佇列中獲取訊息.這樣的好處: 生產者只需要關注發訊息,消費者只需要關注收訊息,二者沒有業務邏輯的侵入,這樣就實作了生產者和消費者的解耦.為什么要使用 MQ?或者說MQ 有什么好處,MQ 主要可以實作三種功能:服務解耦...

    uj5u.com 2022-01-25 07:23:56 more
  • 九章云極DataCanvas公司榮獲機器之心三大獎項,助力產業數智化升級

    近日,國內領先的前沿科技媒體和產業服務平臺機器之心發布了「AI 中國」機器之心 2021 年度榜單。九章云極DataCanvas公司憑借在人工智能領域優秀的技術、豐富的AI解決方案、智能化場景的創新應用以及適合國內市場的商業模式,入選 “最佳人工智能公司 TOP 30”、“最具商業價值解決方案TOP30”、“最具創新價值落地案例TOP30”。圖片來源:機器之心作為人工智能技術及產業發展的風向標,機器之心「AI中國」年度評選自2017年設立,已連續舉辦5屆,成為國內人工智能界的權威年度獎項之一。本次榜單...

    uj5u.com 2022-01-25 07:22:09 more
  • 2021—很有意義的一年

    最大的改變2021年,對于我來說,最大的改變應該就是有在努力想要提高自己的溝通能力。我的溝通能力和表達能力對于我來說,就是一個永遠搬不動的大山,擋在我前進的路上。其實,我知道自己的弱點,也想要改變,但是就是不付諸行動,也不知咋改變這種現狀。直到離開學校,開始步入社會,進入了一個完全陌生的圈子。很慶幸的是,遇到了一個特別好的導師,他給了我很多的幫助,給我提了很多建議,不管是作業上、生活上、還是人生發展,都是很寶貴的建議。由于我的性格,我有時候不敢在群里發言,導師經常鼓勵我要多在群上發言,這樣可以克服交流的...

    uj5u.com 2022-01-25 07:21:57 more