問題
最近在使用RabbitMq時遇到了一個問題,明明是轉換成json發送到mq中的資料,消費者接收到的卻是一串數字也就是byte陣列,但是使用mq可視化頁面查看資料卻是正常的,之前在使用程序中從未遇到過這種情況,遇到的情況如下所示:

生產者發送訊息的代碼如下所示:
public void sendJsonStrMsg(String jsonStr){
rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr);
}
消費者代碼如下所示:
@RabbitHandler
@RabbitListener(queuesToDeclare = {@Queue(name=ProducerService.JSON_QUEUE, durable = "true")},containerFactory = "prefetchTenRabbitListenerContainerFactory")
public void listenJsonMsg(String msg, Channel channel, Message message){
log.debug("json字串型別訊息>>>>{}",msg);
}
引入的containerFactory如下所示:
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<x>
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter);
return factory;
}
注意代碼中標有<x>的地方,這里就是我們解決問題的關鍵,
解決方案
我們先說解決方案,再說原因,解決方案其實很簡單,在保持上述代碼不變的情況下,只需要再注入如下的bean即可:
@Bean
public MessageConverter jackson2JsonMessageConverter(){
return new Jackson2JsonMessageConverter("*");
}
解決方案就是這么簡單,只需要在原來的代碼的基礎上注入Jackson2JsonMessageConverter就可以了,但是原理是什么呢?且往后看,
原理分析
關于原理的解釋我們從原始碼層面來說,畢竟原始碼面前沒有秘密.
生產者原始碼分析
首先看我們發送訊息到mq的方法rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr),從此方法進去后,經過多載的方法后最終到達下面所示的方法:
@Override
public void convertAndSend(String exchange, String routingKey, final Object object,
@Nullable CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
著重看convertMessageIfNecessary方法,方法名已經很直白的告訴我們了,如果需要就轉換訊息,我們點進去看一下這個方法:
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) { //<1>
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties()); //<2>
}
<1>處是說如果要發送到mq的物件是Message的實體,那么就直接轉換成Message型別回傳,否則就獲取MessageConverter后呼叫toMessage()方法回傳Message物件,
我們先看一下RabbitTemplate#getRequiredMessageConverter(),如下所示:
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = getMessageConverter();
if (converter == null) {
throw new AmqpIllegalStateException(
"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
}
return converter;
}
public MessageConverter getMessageConverter() {
return this.messageConverter; //<1>
}
<1>處的代碼表明需要一個messageConverter物件,我在RabbitTemplate原始碼中找到了對應的set方法,由于我們沒有呼叫set方法取設定messageConverter的值,那么就需要取查找默認值,默認值的設定如下代碼所示:
/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
*/
public RabbitTemplate() {
initDefaultStrategies(); // NOSONAR - intentionally overridable; other assertions will check
}
/**
* Set up the default strategies. Subclasses can override if necessary.
設定默認策略,子類在必須的時候可以重寫
*/
protected void initDefaultStrategies() {
setMessageConverter(new SimpleMessageConverter());
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
我們點進去SimpleMessageConverter#toMessage()方法看一下是如何將一個java物件轉換成Message物件的,可惜的是在SimpleMessageConverter中未找到toMessage方法,我們在此先看一下SimpleMessageConverter繼承情況,類圖如下:

去掉了一些無用的介面和類之后,剩下的類圖如下所示,沿著類圖向上找,在AbstractMessageConverter中找到了toMessage方法:
@Override
public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg,
@Nullable Type genericType)
throws MessageConversionException {
MessageProperties messageProperties = messagePropertiesArg;
if (messageProperties == null) {
messageProperties = new MessageProperties();
}
Message message = createMessage(object, messageProperties, genericType); //<1>
messageProperties = message.getMessageProperties();
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
return message;
}
該方法中沒有我們需要的內容,繼續看<1>處的方法,該方法需要回傳到SimpleMessageConverter中:
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) { //<1>
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); //<1.x>
}
else if (object instanceof String) { //<2>
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//<2.x>
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) { //<3>
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);//<3.x>
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); //<4>
}
這個方法就比較有意思了,在<1>、<2>、<3>三處分別判斷了發送的訊息是否是byte[]、String、Serializable,并且在判斷之后將訊息的content_type屬性分別設定為application/octet-stream、text/plain、application/x-java-serialized-object三種型別,除了以上三種型別之外的資料將被拋出例外,很顯然我們前面發送的是字串訊息,那么content_type屬性的值必定是text/plain了,可以在mq可視化頁面上看到:

經過以上的步驟,java物件已經轉換成Message物件并且發送到了MQ中,下面就是消費者的原始碼分析了,
消費者原始碼分析
本來想從SpringBoot啟動開始到Bean加載、注冊一直到獲取訊息的原始碼分析下來的,奈何IoC和AOP的原始碼還沒看完,實在是心有余而力不足,此處留個坑待以后再戰,
前面生產者是呼叫MessageConverter.toMessage()方法將java物件轉換成Message物件發送到MQ中的,那么消費者應該是反其道而行之,呼叫MessageConverter.formMessage()方法將Message物件轉換成java物件,我們就從formMessage方法開始看,生產者使用的是SimpleMessageConverter,那么此處還是查看此類的fromMessage方法:
/**
* Converts from a AMQP Message to an Object.
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();//<1>
if (contentType != null && contentType.startsWith("text")) { //<2>
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
else if (contentType != null &&
contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) { //<3>
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException | IllegalArgumentException | IllegalStateException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
}
}
if (content == null) {
content = message.getBody(); //<4>
}
return content;
}
以上代碼很容易理解
<1>處是從訊息屬性中獲取到訊息的content_type屬性
<2>處和<3>處則是分別判斷是否text/plain以及application/x-java-serialized-object
如果以上兩種都不符合,那么只能是呼叫message.getBody()回傳一個byte[]型別的byte陣列,這也就是文章開頭回傳一串數字的由來,
問題解決
雖然消費者原始碼分析得到了一個回傳一串數字的緣由,但是這并不是造成本次問題的根本原因,我們再回顧一下問題中消費者所使用的一個containerFactory
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<1>
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter); //<2>
return factory;
}
<1>和<2>處使用的messageConver是Jackson2JsonMessageConverter,通過前面類圖我們可以知道它也是實作了MessageConvert介面,我們看一下這個類的原始碼:
/**
* JSON converter that uses the Jackson 2 Json library.
*/
public class Jackson2JsonMessageConverter extends AbstractJackson2MessageConverter {
public Jackson2JsonMessageConverter() {
this("*");
}
public Jackson2JsonMessageConverter(String... trustedPackages) {
this(new ObjectMapper(), trustedPackages);
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper) {
this(jsonObjectMapper, "*");
}
public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper, String... trustedPackages) {
super(jsonObjectMapper, MimeTypeUtils.parseMimeType(MessageProperties.CONTENT_TYPE_JSON), trustedPackages); //<1>
}
}
我刪掉了一些無用的代碼以及注釋,可以在類注釋上很顯然的看到這個轉換器是使用jackson的JSON轉換器,也就是說這個轉換器只對json資料有效,該類中并沒有找到fromMessage和toMessage方法,那么只能從其父類AbstractJackson2MessageConverter中查找fromMessage方法,如下所示,注意上面代碼中<1>的地方,傳遞的content_type型別是application/json
// AbstractJackson2MessageConverter
@Override
public Object fromMessage(Message message, @Nullable Object conversionHint) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();//<1>
//supportedContentType即為建構式中傳遞的MimeType
if (contentType != null && contentType.contains(this.supportedContentType.getSubtype())) {//<2>
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
if (conversionHint instanceof ParameterizedTypeReference) {
content = convertBytesToObject(message.getBody(), encoding,
this.objectMapper.getTypeFactory().constructType(
((ParameterizedTypeReference<?>) conversionHint).getType()));
}
else if (getClassMapper() == null) {
JavaType targetJavaType = getJavaTypeMapper()
.toJavaType(message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetJavaType);
}
else {
Class<?> targetClass = getClassMapper().toClass(// NOSONAR never null
message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetClass);
}
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
}
else {
if (this.log.isWarnEnabled()) {
this.log.warn("Could not convert incoming message with content-type ["
+ contentType + "], '" + this.supportedContentType.getSubtype() + "' keyword missing."); //<3>
}
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
上述代碼可以這么理解,Jackson2JsonMessageConverter初始化時將json格式的content_type傳遞到父類AbstractJackson2MessageConverter中,當消費者將Message訊息轉換為Java物件時實際上是呼叫的AbstractJackson2MessageConverter#fromMessage()方法,由于該方法只支持json格式的content_type,因此執行了<3>處的代碼,列印出了文章開頭所示的提示資訊,
因此最終的解決方案其實有2種
1.發送訊息時也使用
Jackson2JsonMessageConverter,這種方式用來支持json格式的資料傳輸;
2.洗掉containerFactory中設定的MessageConvert,使用默認的SimpleMessageConverter,這樣就只支持字串、byte陣列以及序列化物件三種訊息了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/207116.html
標籤:其他
上一篇:技術點2:CSS
