💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] `RabbitTemplate`类是简化RabbitMQ访问的工具类(发送和接收消息) > 总结: > 1.使用RabbitTemplate进行消息的发送。 > 2.使用SimpleMessageListenerContainer类监听队列,进行消息的消费。 ~~~xml <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> </dependencies> ~~~ **配置类** 将`ConnectionFactory`和`RabbitTemplate`纳入到spring容器中 ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //设置Exchange默认操作的exchange和routingkey rabbitTemplate.setExchange("zhihao.direct.exchange"); rabbitTemplate.setRoutingKey("zhihao.debug"); return rabbitTemplate; } } ~~~ **测试类** `RabbitTemplate#send`方法发送消息, ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","消息发送"); messageProperties.getHeaders().put("type",10); Message message = new Message("hello".getBytes(),messageProperties); /** * 调用rabbitTemplate的send方法发送消息,如果没有指定exchange,Routing,则使用声明Exchange指定的 * exchange,Routing * 如果RabbitTemplate没有设置,则默认的exchange 是DEFAULT_EXCHANGE为"", * 默认的routkey是DEFAULT_ROUTING_KEY为"" */ //1. //rabbitTemplate.send(message); //2.指定Routingkey,而exchange是Rabbitmq默认指定的 //rabbitTemplate.send("zhihao.error",message); //3.即指定exchange,又指定了routing_key //rabbitTemplate.send("zhihao.login","ulogin",message); /** * 4.使用默认的defaultExchange进行投递消息,route key就是队列名,指定correlation_id属性,correlation_id属性是rabbitmq 进行异步rpc进行标识每次请求的唯一 * id,下面会讲到 */ rabbitTemplate.send("","zhihao.order.queue",message,new CorrelationData("spring.amqp")); context.close(); } } ~~~ 查看web管控台发现消息都发送成功了。 使用`RabbitTemplate#convertAndSend`方法发送消息, ~~~ import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application2 { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); /** * 使用convertAndSend方法,接收的参数是Object对象,其实是将接收的对象转换成Message对象,不指定exchange和routing key,那么就 * 使用RabbitTemplate中设置的exchange和routing key */ //rabbitTemplate.convertAndSend("this is my message"); //指定exchange或者指定exchage和routing key //rabbitTemplate.convertAndSend("zhihao.error","this is my message order111"); //rabbitTemplate.convertAndSend("","zhihao.user.queue","this is my message order222"); //发送消息的后置处理器,MessagePostProcessor类的postProcessMessage方法得到的Message就是将参数Object内容转换成Message对象 /* rabbitTemplate.convertAndSend("", "zhihao.user.queue", "this is my message processor", new MessagePostProcessor() { //在后置处理器上加上order和count属性 @Override public Message postProcessMessage(Message message) throws AmqpException { System.out.println("-------处理前message-------------"); System.out.println(message); message.getMessageProperties().getHeaders().put("order",10); message.getMessageProperties().getHeaders().put("count",1); return message; } }); */ rabbitTemplate.convertAndSend("", "zhihao.user.queue", "message before", message1 -> { //使用lamdba的语法 MessageProperties properties = new MessageProperties(); properties.getHeaders().put("desc","消息发送"); properties.getHeaders().put("type",10); Message messageafter = new Message("message after".getBytes(),properties); return messageafter; }); context.close(); } } ~~~ **消息的消费** 使用容器的方式进行消费 认识一个接口`org.springframework.amqp.rabbit.listener.MessageListenerContainer`, 其默认实现类`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`。 `MessageListenerContainer#setMessageListener`方法,接收的参数类型 `org.springframework.amqp.core.MessageListener`或者`org.springframework.amqp.rabbit.core.ChannelAwareMessageListener`接口 代码: 将`ConnectionFactory`,`RabbitTemplate`,`SimpleMessageListenerContainer`实例纳入到spring容器中进行管理。 ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /* @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //监听队列zhihao.user.queue,监听队列可以多个,参数类型是String[] container.setQueueNames("zhihao.user.queue"); container.setMessageListener(new MessageListener() { //具体的消费逻辑 @Override public void onMessage(Message message) { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } */ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //队列可以是多个,参数是String的数组 container.setQueueNames("zhihao.user.queue"); container.setMessageListener(new ChannelAwareMessageListener(){ @Override //得到了Channel参数,具体使用会在下面的博客详细讲解 public void onMessage(Message message, Channel channel) throws Exception { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } } ~~~ 关于setMessageListener接收的类型参数 当接收的参数不是`MessageListener`或者`ChannelAwareMessageListener`类型,则会抛出异常,具体的逻辑在`checkMessageListener(messageListener);`方法 应用启动类,向zhihao.user.queue对象发送消息,并启动了spring容器,发现监听到队列并且消费了。 ~~~java import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.TimeUnit; @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); rabbitTemplate.convertAndSend("","zhihao.user.queue","hello spring amqp"); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ **原理分析** 稍微分析一下原理 `org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`接口,它继承`AbstractMessageListenerContainer`类,实现`SmartLifecycle`接口然后继承`Lifecycle`接口,意味着一旦`SimpleMessageListenerContainer`实例被spring容器管理,其生命周期就托管与spring容器来管理了,意味着当spring容器运行起来的时候,`SimpleMessageListenerContainer`容器启动,spring容器关闭的时候,`SimpleMessageListenerContainer`容器也关闭了。 设置在spring容器初始化的时候设置SimpleMessageListenerContainer不启动,(`container.setAutoStartup(false);`) ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //队列可以是多个,参数是String的数组 container.setQueueNames("zhihao.miao.order"); //设置autoStartUp为false表示SimpleMessageListenerContainer没有启动 container.setAutoStartup(false); container.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } } ~~~ 此时不能消费消息,也可以在应用启动类启动`SimpleMessageListenerContainer`容器,在应用启动类中启动 ~~~java import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.TimeUnit; @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); //在spring容器中启动SimpleMessageListenerContainer context.getBean(SimpleMessageListenerContainer.class).start(); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ 以上说明只有`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`启动了,才会消费消息。 **总结** `SimpleMessageListenerContainer`可以托管到spring容器中,由spring容器进行`SimpleMessageListenerContainer`的生命周期管理,默认情况下spring容器启动的时候,启动`SimpleMessageListenerContainer`,spring容器关闭,会stop掉`SimpleMessageListenerContainer`,也可以设置`SimpleMessageListenerContainer`手动启动`(context.getBean(SimpleMessageListenerContainer.class).start();`)