程式里有一個應用場景使用到了rabbitmq——當財務確認收到企業的打款金額后,系統會把企業訂單生成用戶付款單,由于訂單記錄資料量大,改為通過mq來異步實作,即財務確認收款操作后,將企業訂單資料放入mq,另一端監聽mq訊息佇列,將收到的企業訂單加工轉換成用戶付款單,并做持久化,
本地開發環境與測驗環境共用一套rabbitmq,當專案部署到測驗環境后,QA測驗程序中,總是“莫名其妙”的發現所保存的用戶付款單資料有問題,
當然,首先要排查程式,檢查Consumer的資料處理的邏輯是否有bug,單元測驗后,發現并不存在測驗環境的bug,
原來,訊息佇列被“非正常”消費了!
Q: 什么情況?
A: 幾個伙伴一起參與的專案,大家總是要除錯自己的程式的,而如果碰巧本地程式監聽到訊息佇列里有訊息,那么,訊息就被本地程式消費掉了,問題正是出現在這里!————團隊開發,大家并不會及時檢出git上最新的程式版本,如果本地的程式版本不是最新的正確的版本,勢必會出現bug,
那么,怎么辦?
每次你改了邏輯,告訴大家獲取最新?
不現實,約定的東西往往不奏效的,
如何保證mq佇列里的訊息只被測驗服務器上的consumer消費,避免本地環境誤消費?
只要肯琢磨,辦法總比困難多!
我們知道,rabbitmq手動ack模式,這還不夠,因為我們怎么讓consumer來決定是否消費呢? 所以,我們需要一個標識————producer設定一個標識,consumer如果匹配這個標識,則消費,否則予以reject放回訊息佇列,

通過查看spring-rabbit/spring-amqp的代碼,發現可以在spring-amqp里的MessageProperties上做文章,生產者與消費者每次訊息傳輸都會攜帶一個MessageProperties,通常我們是不指定的,走MessageProperties的默認設定值,
我的策略:MessageProperties有一個屬性叫AppId,我們程式所部署的測驗機器就一臺,即訊息Producer和訊息Consumer在一臺機器上,那么,我就可以利用機器的IP來識別訊息,只有Producer與Consumer的IP匹配,才消費訊息,程式員本機IP與測驗服務器IP不一樣,就會拒絕接收訊息,會把訊息重新放回訊息佇列,等待測驗服務器的Consumer消費,
話不多說,上代碼吧,
生產者代碼:
package com.sboot.mq; import org.junit.Test; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import java.net.InetAddress; import java.util.UUID; public class MQProducerTest extends BaseTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void test() throws Exception { for (int i = 1; i <= 5; i++) { MessageProperties messageProperties = new MessageProperties(); String ip = InetAddress.getLocalHost().getHostAddress(); messageProperties.setAppId(ip); // messageProperties.setUserId(String.valueOf(i)); MessageConverter messageConverter = new SimpleMessageConverter(); String msg = UUID.randomUUID().toString(); // System.out.println(msg); Message message1 = messageConverter.toMessage(msg, messageProperties); rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1); System.out.println("入隊完成"); Thread.sleep(500L); } } }
消費者手動ACK,要實作ChannelAwareMessageListener介面,感知rabbitmq.client.Channel實體,呼叫channel的basicAck、basicReject等方法:
package com.sboot.mq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import java.net.InetAddress; @Component @Profile(value = "dev") @Slf4j public class UserSettlementDevConsumer implements ChannelAwareMessageListener { @RabbitHandler @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL") @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis()); long tag = message.getMessageProperties().getDeliveryTag(); String appId = message.getMessageProperties().getAppId(); log.info("{}-{}, 訊息出隊", tag, appId); String receiveMsg = ""; try { //核對標識,決定是否消費訊息 String ip = InetAddress.getLocalHost().getHostAddress(); if (!ip.equals(appId)) { log.info("這不是我需要的訊息,放回佇列,{}", receiveMsg); // channel.basicNack(tag, false, true); channel.basicReject(tag, true); // channel.basicRecover(true); return; } MessageConverter messageConverter = new SimpleMessageConverter(); receiveMsg = String.valueOf(messageConverter.fromMessage(message)); ,,,,在這里消費訊息 log.info("success " + receiveMsg); channel.basicAck(tag, false); } catch (Exception e) { log.error("receive message has an error, ", e); channel.basicNack(tag, false, true); } } }
說明一下依賴的spring-rabbit包的版本,我的是2.2.0.RELEASE,如果是2.1.4版本里,@RabbitListener注解沒有ackMode,
解決本案問題程序中的花絮:

@RabbitListener的ackMode的值見列舉org.springframework.amqp.core.AcknowledgeMode
- NONE-- no acks(自動消費 autoAck)
- MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手動消費,Consumer端必須顯式呼叫ack或nack)
- AUTO --

設定了手動消費,上文消費端的deliveryTag會是不同的long值,自動消費的deliveryTag是重復的1和2這樣的,并且,自動消費時,如果要使用channel的ack/nack,會報例外:
2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/159461.html
標籤:Java
