企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # MessageListenerAdapter详解 消息监听适配器(adapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于Rabbit API 默认情况下,传入Rabbit消息的内容在被传递到目标监听器方法之前被提取,以使目标方法对消息内容类型进行操作以String或者byte类型进行操作,而不是原始Message类型。 (消息转换器) 消息类型转换委托给`MessageConverter`接口的实现类。 默认情况下,将使用`SimpleMessageConverter`。 (如果您不希望进行这样的自动消息转换, 那么请自己通过`#setMessageConverter MessageConverter`设置为null) 如果目标监听器方法返回一个非空对象(通常是消息内容类型,例如String或byte数组),它将被包装在一个Rabbit Message 中,并发送使用来自`Rabbit ReplyTo`属性或通过`#setResponseRoutingKey(String)`指定的`routingKey`的`routingKey`来传送消息。(使用rabbitmq 来实现异步rpc功能时候会使用到这个属性)。 注意:发送响应消息仅在使用ChannelAwareMessageListener入口点(通常通过Spring消息监听器容器)时可用。 用作MessageListener不支持生成响应消息。 # Demo 配置类MQConfig ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setExchange("zhihao.direct.exchange"); rabbitTemplate.setRoutingKey("zhihao.debug"); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("order","pay","zhihao.miao.order"); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler()); //设置处理器的消费消息的默认方法,如果没有设置,那么默认的处理器中的默认方式是handleMessage方法 adapter.setDefaultListenerMethod("onMessage"); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("order","onorder"); queueOrTagToMethodName.put("pay","onpay"); queueOrTagToMethodName.put("zhihao.miao.order","oninfo"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter); return container; } } ~~~ Handler类`MessageHandler`,`MessageHandler`类中定义的方法也就是上面翻译的目标监听器的处理方法: ~~~ public class MessageHandler { //没有设置默认的处理方法的时候,方法名是handleMessage public void handleMessage(byte[] message){ System.out.println("---------handleMessage-------------"); System.out.println(new String(message)); } //通过设置setDefaultListenerMethod时候指定的方法名 public void onMessage(byte[] message){ System.out.println("---------onMessage-------------"); System.out.println(new String(message)); } //以下指定不同的队列不同的处理方法名 public void onorder(byte[] message){ System.out.println("---------onorder-------------"); System.out.println(new String(message)); } public void onpay(byte[] message){ System.out.println("---------onpay-------------"); System.out.println(new String(message)); } public void oninfo(byte[] message){ System.out.println("---------oninfo-------------"); System.out.println(new String(message)); } } ~~~ 启动应用类: ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); System.out.println("===start up======"); TimeUnit.SECONDS.sleep(60); context.close(); } } ~~~ # 总结 使用`MessageListenerAdapter`处理器进行消息队列监听处理,如果容器没有设置`setDefaultListenerMethod`,则处理器中默认的处理方法名是handleMessage,如果设置了`setDefaultListenerMethod`,则处理器中处理消息的方法名就是`setDefaultListenerMethod`方法参数设置的值。也可以通过`setQueueOrTagToMethodName`方法为不同的队列设置不同的消息处理方法。 # 源码分析: 我们知道`MessageListenerAdapter`继承`AbstractAdaptableMessageListener`类,实现`MessageListener`和`ChannelAwareMessageListener`接口,而我们知道`MessageListener`和`ChannelAwareMessageListener`接口的`onMessage`方法就是具体容器监听队列处理队列消息的方法。 `MessageListenerAdapter`的`onMessage`方法 ~~~ @Override public void onMessage(Message message, Channel channel) throws Exception { // Check whether the delegate is a MessageListener impl itself. // In that case, the adapter will simply act as a pass-through. Object delegate = getDelegate(); if (delegate != this) { if (delegate instanceof ChannelAwareMessageListener) { if (channel != null) { ((ChannelAwareMessageListener) delegate).onMessage(message, channel); return; } else if (!(delegate instanceof MessageListener)) { throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a " + "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself"); } } if (delegate instanceof MessageListener) { ((MessageListener) delegate).onMessage(message); return; } } // Regular case: find a handler method reflectively. Object convertedMessage = extractMessage(message); //获取处理消息的方法名 String methodName = getListenerMethodName(message, convertedMessage); if (methodName == null) { throw new AmqpIllegalStateException("No default listener method specified: " + "Either specify a non-null value for the 'defaultListenerMethod' property or " + "override the 'getListenerMethodName' method."); } // Invoke the handler method with appropriate arguments. Object[] listenerArguments = buildListenerArguments(convertedMessage); Object result = invokeListenerMethod(methodName, listenerArguments, message); if (result != null) { handleResult(result, message, channel); } else { logger.trace("No result object given - no result to handle"); } } ~~~ 获取处理消息的方法名 ~~~ protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception { if (this.queueOrTagToMethodName.size() > 0) { MessageProperties props = originalMessage.getMessageProperties(); String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue()); if (methodName == null) { methodName = this.queueOrTagToMethodName.get(props.getConsumerTag()); } if (methodName != null) { return methodName; } } return getDefaultListenerMethod(); } ~~~ # 结论 `MessageListenerAdapter` 1.可以把一个没有实现`MessageListener`和`ChannelAwareMessageListener`接口的类适配成一个可以处理消息的处理器 2.默认的方法名称为:`handleMessage`,可以通过`setDefaultListenerMethod`设置新的消息处理方法 3.`MessageListenerAdapter`支持不同的队列交给不同的方法去执行。使用`setQueueOrTagToMethodName`方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理