問題:訊息傳遞到了rabbitmq,到了exchange,但是到不了consumer。
提前說明:本地環境與線上環境都是ok的,只有測驗環境會出現訊息丟失的情況。
詳細說明:用的是rabbitmqTemplate進行的訊息傳遞,初始化時配置了CachingConnectionFactory的
setPublisherConfirms(true)、setPublisherReturns(true)、setChannelCacheSize(100)、setChannelCheckoutTimeout(600)和
SimpleRabbitListenerContainerFactory的setPrefetchCount(100)、setConcurrentConsumers(10)、setAcknowledgeMode(AcknowledgeMode.MANUAL)。 這是一些生產者的確認、回呼模式的配置,消費者確認模式的配置和channel、channel中訊息數量的配置,其余均為默認配置。
然后訊息發送到rabbitmq,生產者收到confirm確認,回傳都是ack=true,并且沒有觸發returnMessage回呼,說明訊息確實都發送到了rabbitmq的exchange,并且都找到了匹配的queue(本地測驗的時候unbind queue后,確實會觸發returnMessage回呼)。但是從consumer的日志可以看出,每次都會丟失訊息。值得一提的是每次啟動專案后,第一條訊息丟失的概率很大,當然,重發這條訊息就ok。
求指點。
代碼:
生產者
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
logger.info("rabbitMq send msg:{} ", content);
rabbitTemplate.convertAndSend("exchange", "routingKey", content, correlationId);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回呼data:{}, ack:{}, cause:{}", correlationData, ack, cause);
if (ack) {
logger.info("訊息成功消費");
} else {
logger.warn("訊息消費失敗:" + cause);
//省略重發代碼...
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.warn("訊息丟失,將會重新發送。:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
//省略重發代碼...
}
消費者:
@RabbitListener(queues = "queue", containerFactory = "simpleRabbitListenerContainerFactory")
public class RabbitMqReceiver {
@RabbitHandler
public void process(String content, @Headers Map<String, Object> headers, Channel channel) throws Exception {
logger.info("rabbitMq received msg:{} ", content);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
//省略業務代碼...
}
}
uj5u.com熱心網友回復:
補一下tracing日志{"timestamp":"2020-10-21 2:03:45:689","type":"published","node":"rabbit@BackupServer","connection":"127.0.0.1:54098 -> 127.0.0.1:5672","vhost":"/","user":"guest","channel":12,"exchange":"exchange","queue":"none","routed_queues":["queue"],"routing_keys":["routingKey"],"properties":{"priority":0,"delivery_mode":2,"headers":{"spring_returned_message_correlation":"8ea40148-7b31-4e7f-a39d-e5f94cea8ae4","spring_listener_return_correlation":"2e76a86a-7024-4fd0-8b3a-5a69825c3a96"},"content_encoding":"UTF-8","content_type":"text/plain"},"payload":"eyJ0YXNrSWQiOiIxNGJjZGM0My01Y2YzLTRkZjYtOTBlOC0wYmNjM2ZkOTgyZmQiLCJ1cmwiOiIvZGF0YS93ZWIvcHJvamVjdC9tZmsveWlucGluL3JlY29yZF95aW5waW4vMzgzNC82LndhdiJ9"}
{"timestamp":"2020-10-21 2:03:45:689","type":"received","node":"rabbit@BackupServer","connection":"127.0.0.1:54098 -> 127.0.0.1:5672","vhost":"/","user":"guest","channel":2,"exchange":"exchange","queue":"queue","routed_queues":"none","routing_keys":["routingKey"],"properties":{"priority":0,"delivery_mode":2,"headers":{"spring_returned_message_correlation":"8ea40148-7b31-4e7f-a39d-e5f94cea8ae4","spring_listener_return_correlation":"2e76a86a-7024-4fd0-8b3a-5a69825c3a96"},"content_encoding":"UTF-8","content_type":"text/plain"},"payload":"eyJ0YXNrSWQiOiIxNGJjZGM0My01Y2YzLTRkZjYtOTBlOC0wYmNjM2ZkOTgyZmQiLCJ1cmwiOiIvZGF0YS93ZWIvcHJvamVjdC9tZmsveWlucGluL3JlY29yZF95aW5waW4vMzgzNC82LndhdiJ9"}
uj5u.com熱心網友回復:
我只發送了7條訊息,tracing日志里面有全部的已發送的訊息,但是consumer日志卻只有部分已發送的訊息,而且tracing并沒有新增訊息,說明訊息并沒有被重新排隊。而且看rabbitmq控制臺,可以看到沒有unacked訊息,那么訊息應該就是被丟棄了,然后怎么處理呢。。。uj5u.com熱心網友回復:
參考以下訊息丟失的解決方法吧https://www.jianshu.com/p/19e0927315da
uj5u.com熱心網友回復:
你看看你的rabbitmq server上的訊息日志,每個訊息都是以日志的方式在檔案中有記錄的uj5u.com熱心網友回復:
路徑是/app/rabbitmq/var/log/rabbitmq 么
我看到的是一些啟動、連接日志,沒有看到訊息日志呢
uj5u.com熱心網友回復:
這是一個工具類專案,兩分鐘一次輪詢進行重發,所以訊息丟失到不是很重要,還不至于搞到持久化+輪詢解決這個問題的程度,但是就很納悶,總感覺研究了一天官方檔案,結果是個假貼子
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/184797.html
標籤:Java EE
上一篇:2020-10-21:go中channel的send流程是什么?
下一篇:鏈表空指標
