ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # MessageConverter详解 ~~~ org.springframework.amqp.support.converter.MessageConverter ~~~ ~~~ Message toMessage(Object object, MessageProperties messageProperties); 将java对象和属性对象转换成Message对象。 Object fromMessage(Message message) throws MessageConversionException; 将消息对象转换成java对象。 ~~~ ## Demo **定义Config类** ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler()); //指定消息转换器 adapter.setMessageConverter(new TestMessageConverter()); //设置处理器的消费消息的默认方法 adapter.setDefaultListenerMethod("onMessage"); container.setMessageListener(adapter); return container; } } ~~~ MessageListenerAdapter中定义的消息转换器,消费端接收的消息就从Message类型转换成了String类型 ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class TestMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { System.out.println("=======toMessage========="); return new Message(object.toString().getBytes(),messageProperties); } //消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型 @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("=======fromMessage========="); return new String(message.getBody()); } } ~~~ **消费者处理消息的Handler** ~~~csharp public class MessageHandler { public void onMessage(String message){ System.out.println("---------onMessage-------------"); System.out.println(message); } } ~~~ **启动类** ~~~java import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.TimeUnit; /** * MessageConverter可以把java对象转换成Message对象,也可以把Message对象转换成java对象 * * MessageListenerAdapter内部通过MessageConverter把Message转换成java对象,然后找到相应的处理方法,参数为转换成的java对象 */ @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); System.out.println("===start up======"); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ 启动应用类,发送消息到`zhihao.miao.order`队列,控制台打印: ~~~ ===start up====== =======fromMessage========= ---------onMessage------------- String类型的消息 ~~~ 从控制台打印我们知道了在消费者处理消息之前会进行消息类型转换,调用`TestMessageConverter`的`fromMessage`方法,然后执行消息处理器的`onMessage`方法,方法参数就是`String`类型。 ### 扩展 自定义一个MyBody类型,将消息从Message转换成MyBody类型 ~~~java public class MyBody { private byte[] bodys; public MyBody(byte[] bodys){ this.bodys = bodys; } @Override public String toString() { return new String(bodys); } } ~~~ 然后修改`TestMessageConverter`的`fromMessage`方法,返回了`MyBody`类型,那么消息处理器的消费方法也是MyBody参数的消费方法 ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class TestMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { System.out.println("=======toMessage========="); return new Message(object.toString().getBytes(),messageProperties); } //消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型 @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("=======fromMessage========="); return new MyBody(message.getBody()); } } ~~~ 此时的消息处理器,处理器中的方法的入参就是MyBody类型了, ~~~ public class MessageHandler { public void onMessage(MyBody message){ System.out.println("---------onMessage---MyBody-------------"); System.out.println(message); } } ~~~ 此时控制台打印: ~~~ ===start up====== =======fromMessage========= ---------onMessage---MyBody------------- Mybody类型的消息 ~~~ ## 小结 我们还测试如下如果不使用自定义的`Converter`,那么当消息的属性中含有属性content\_type的值为text,那么默认的转换成的java类型就是String类型,如果不指定那么默认的转换类型就是byte\[\] ## 源码分析 我们跟进去`MessageListenerAdapte`r的`setMessageConverter`方法, ~~~ /** * Set the converter that will convert incoming Rabbit messages to listener method arguments, and objects returned * from listener methods back to Rabbit messages. * <p> * The default converter is a {@link SimpleMessageConverter}, which is able to handle "text" content-types. * @param messageConverter The message converter. */ public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } ~~~ ~~~ private MessageConverter messageConverter = new SimpleMessageConverter(); ~~~ 我们发现默认的`MessageConverter`是`SimpleMessageConverter`,我们进入`SimpleMessageConverter`类中看其默认的转换逻辑 ~~~ @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); //contentType属性值是以text开头,那么就将Message类型转换成String类型 if (contentType != null && contentType.startsWith("text")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = this.defaultCharset; } try { content = new String(message.getBody(), encoding); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert text-based Message content", e); } } //如果content_type的值是application/x-java-serialized-object则把消息序列化为java对象 else if (contentType != null && contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) { try { content = SerializationUtils.deserialize( createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); } catch (IOException e) { throw new MessageConversionException( "failed to convert serialized Message content", e); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert serialized Message content", e); } catch (IllegalStateException e) { throw new MessageConversionException( "failed to convert serialized Message content", e); } } } if (content == null) { //都没有符合,则转换成字节数组 content = message.getBody(); } return content; } ~~~ 源码分析总结: 1.`MessageConverter`可以把`java`对象转换成`Message`对象,也可以把`Message`对象转换成`java`对象 2.`MessageListenerAdapter`内部通过`MessageConverter`把`Message`转换成java对象,然后找到相应的处理方法,参数为转换成的java对象。 3.`SimpleMessageConverter`处理逻辑: 如果`content_type`是以text开头,则把消息转换成`String`类型 如果`content_type的`值是`application/x-java-serialized-object`则把消息序列化为java对象,否则,把消息转换成字节数组