我首先為傳出訊息設定了一個攔截器,該攔截器作業順利,但是當我嘗試攔截消費者中的傳入訊息時,postProcessMessage 方法被跳過,訊息到達使用@RabbitListener 注釋的方法,下面是我的整個代碼程序,我省略了不重要的代碼。
制片人
RabbitMQProducer攔截器
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
RabbitMQProducerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
向消費者發送訊息
用戶生產者
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
消費者
RabbitMQConsumerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
RabbitMQConsumerInterceptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
用戶 消費者
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
這是代碼,沒有拋出例外,唯一的問題是攔截器被跳過
uj5u.com熱心網友回復:
RabbitTemplate不用于接收 a 的訊息@RabbitListener;訊息由偵聽器容器接收;您必須afterReceivePostProcessors在偵聽器容器工廠上設定。
如果您使用的是 Spring Boot,只需將 auto-configuredSimpleRabbitListenerContainerFactory作為引數添加到您的其他@Bean一個并在其上設定 MPP。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/464400.html
