企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 1.简单工作模式 > **无交换机**、只有一个队列一个消费者 > 1.声明一个队列 > 2.生产者向队列发送消息 > 3.消费者消费消息,如果队列有多个消费者则变成了work模式 1.声明一个队列 ~~~ @Configuration public class SimpleQueueConfig { /** * 定义简单队列名. */ private final String simpleQueue = "queue_simple"; @Bean public Queue simpleQueue() { return new Queue(simpleQueue); } } ~~~ # 自动创建队列 ``` //1. 手动创建,需在RabbitMQ中手动创建myQueue1 队列,否则报错 @RabbitListener(queues = “myQueue1”) public void process1(String message){ log.info(“MqReceiver1: {}”, message); } //2. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue(“myQueue2”)) public void process2(String message){ log.info(“MqReceiver2: {}”, message); } //3. 自动创建队列,Exchange 与 Queue绑定 @RabbitListener(bindings = @QueueBinding( value = @Queue(“myQueue3”), exchange = @Exchange(“testExChange”) )) public void process3(String message){ log.info(“MqReceiver3: {}”, message); } ``` 2.生产者向队列中发送消息 ~~~ @Slf4j @Component public class SimpleProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 5; i++) { String message = "简单消息" + i; log.info("我是生产信息:{}", message); rabbitTemplate.convertAndSend( "queue_simple", message); } } } ~~~ 3.消费者从队列中取到消息 ~~~ @Slf4j @Component public class SimpleConsumers { @RabbitListener(queues = "queue_simple") public void readMessage(Message message, Channel channel) throws IOException { //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消费信息:{}", new String(message.getBody())); } } ~~~ 4.controller ~~~ @RequestMapping("/simple") public String sendSimpleMsg(){ simpleProducer.sendMessage(); return "success"; } ~~~ 访问接口,输出 ``` 2022-06-07 14:58:57.380 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息0 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息1 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息2 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息3 2022-06-07 14:58:57.388 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息4 2022-06-07 14:58:57.397 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息0 2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息1 2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息2 2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息3 2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息4 ``` # 2.work队列 就是简单模式(也没有交换机),多了一个消费者,来抢占消息 ~~~ @Slf4j @Component public class WorkConsumers1 { @RabbitListener(queues = "queue_simple") public void readMessage(Message message, Channel channel) throws IOException { //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是work消费信息{}", new String(message.getBody())); } } ~~~ 访问简单模式的controller,输出如下,两个消费者都在消费消息 ``` 2022-06-07 15:29:21.002 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息0 2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息1 2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息2 2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息3 2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息4 2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息0 2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消费信息简单消息1 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息2 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消费信息简单消息3 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息4 ``` # 3. 发布订阅模式FanoutExchange > **多了一个交换机** > 1.创建队列 > 2.创建交换机FanoutExchange > **3.将队列绑定到交换机**,没有route-key,只要绑定到交换机,就给发消息 > 4.生产者发送数据到交换机 > 5.交换机把消息广播到所有绑定的队列 > 6.队列的消费者抢占消费消息 ## **3.1 配置队列和交换机** 如下 生命了三个队列,绑定到了同一个交换机 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue fanoutQueue1() { return new Queue("queue1", true); } @Bean public Queue fanoutQueue2() { return new Queue("queue2", false); } @Bean public Queue fanoutQueue3() { return new Queue("queue3", true); } //创建扇形交换机,参数为交换机的名称 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } //将三个队列都与该交换机绑定起来,无需binding_key @Bean public Binding bindingFanoutExchange1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); } } ~~~ ## **3.2 生产者** ~~~ @Service public class FanoutMQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(String message) { //广播消息,无需指定routing_key amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message); } } ~~~ ## **3.3 消费者** controller ~~~ @GetMapping("/fanoutExchange/{message}") public String fanoutExchange(@PathVariable("message") String message) { fanoutMQSender.send(message); return "success"; } ~~~ 消费者代码 ~~~ @Service public class FanoutMQReceiver { private static final Logger logger = LoggerFactory.getLogger(FanoutMQReceiver.class); //queue1订阅者1 @RabbitListener(queues = "queue1") public void receive1(String message) { logger.info("queue1 receive : fanout message {}", message); } //queue1订阅者2 @RabbitListener(queues = "queue1") public void receive1_1(String message) { logger.info("queue1_1 receive : fanout message {}", message); } @RabbitListener(queues = "queue2") public void receive2(String message) { logger.info("queue2 receive : fanout message {}", message); } @RabbitListener(queues = "queue3") public void receive3(String message) { logger.info("queue3 receive : fanout message {}", message); } } ~~~ 队列queue1,有两个消费者抢占消费,如下 ``` 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 001 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 001 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 001 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 002 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 002 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#0-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1_1 receive : fanout message 002 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 003 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 003 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 003 ``` 界面统计未被消费的消息 ![](https://img.kancloud.cn/2d/3d/2d3da0a53958750a791fe65c9ae0951b_1176x380.png) ## 3.4 新应用接入 如果有新的应用也想订阅消息怎么办?如一个新的springboot项目 ### 3.4.1 将自己创建的队列绑定到已有的交换机 1.应用自己生命一个队列 2.将队列绑定到上边的交换机 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue newQuene() { return new Queue("new_queue", true); } //创建扇形交换机,参数为交换机的名称 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(newQuene()).to(fanoutExchange()); } } ~~~ ### 3.4.2 订阅自己的队列即可 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue newQuene() { return new Queue("new_queue", true); } //创建扇形交换机,参数为交换机的名称 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(newQuene()).to(fanoutExchange()); } } ~~~ 原有项目 ![](https://img.kancloud.cn/36/1d/361d2fc1fbedcbdc737156805393ae9c_1234x293.png) 新项目,接收到交换机发来的消息消息 ![](https://img.kancloud.cn/e1/a9/e1a91bec7a7be74c9c75bc24a622c6b7_1366x221.png) * [ ] 消费者自动创建队列,没有配置的情况下 项目中引入了RabbitMQ,但是在加了@bean配置交换机和queue,启动项目却没自动化创建队列 原因:RabbitMQ懒加载模式, 需要配置消费者监听才会创建 ``` @RabbitListener(queues = "short_link.add.link.queue") ``` 另外一种方式(若Mq中无相应名称的队列,会自动创建Queue) ``` @RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") }) ``` # 4.路由模式DirectExchange 1.队列和交换机的绑定多了一个key 2.生产者发送消息到交换机带着一个key,交换机根据key,选择消息发送的队列 应用:可根据消息路由队列,如地区、不同应用 ## 4.1 配置 ~~~ @Configuration public class DirectMQConfig { @Bean public Queue directQueue1() { return new Queue("route-queue1", true); } @Bean public Queue directQueue2() { return new Queue("route-queue2", false); } //创建直连交换机,参数为交换机的名称 @Bean public DirectExchange directExchange() { return new DirectExchange("DIRECT_EXCHANGE"); } @Bean public Binding bindingDirectExchange1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1"); } @Bean public Binding bindingDirectExchange2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.2"); } } ~~~ ## 4.2 producer ~~~ @Service public class DirectMQSender { //注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作 @Autowired private AmqpTemplate amqpTemplate; //直接路由 public void send(String message) { //第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息 amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", "key1:" + message); amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.2", "key2:" + message); } } ~~~ ## 4.3 Consumer ~~~ @Service public class DirectMQReceiver { private static final Logger logger = LoggerFactory.getLogger(DirectMQReceiver.class); //此注解表示监听某个队列,参数为队列名 @RabbitListener(queues = "route-queue1") public void receive1(String message) { logger.info("queue1 receive : route direct message {}", message); } @RabbitListener(queues = "route-queue2") public void receive2(String message) { logger.info("queue2 receive : route direct message {}", message); } } ~~~ 控制台: ``` 2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#4-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue2 receive : route direct message key2:m1 2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#5-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue1 receive : route direct message key1:m1 ``` # 5. 主体模式TopicExchange 在key的基础上,提供交换机到队列的模式匹配 ## 5.1 配置 ~~~ topic.# 将发送到所有topic.开头的绑定队列 ~~~ ~~~ @Configuration public class TopicMQConfig { @Bean public Queue topicQueue1() { return new Queue("TOPIC_QUEUE1", true); } @Bean public Queue topicQueue2() { return new Queue("TOPIC_QUEUE2", true); } @Bean public TopicExchange topicExchange() { return new TopicExchange("TOPIC_EXCHANGE"); } //将topicQueue1与topicExchange交换机绑定 @Bean public Binding bindQueue1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1"); } //将topicQueue2与topicExchange交换机绑定,队列可以接收topic. 开头的routing_key @Bean public Binding bindQueue2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); } } ~~~ ## 5.2 producer ~~~ @Service public class TopicMQSender { //注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作 @Autowired private AmqpTemplate amqpTemplate; //直接路由 public void send(String message) { //第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息 amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message); amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.#", "#号匹配消息:"+message); } } ~~~ ## 5.3 Consumer ~~~ @Service public class TopicMQReceiver { private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class); @RabbitListener(queues = "TOPIC_QUEUE1") public void receiveQueue1(String message) { logger.info("receive : TOPIC_QUEUE1 {}", message); } @RabbitListener(queues ="TOPIC_QUEUE2") public void receiveQueue2(String message) { logger.info("receive : TOPIC_QUEUE2 {}", message); } } ~~~ 输出 ``` 2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 m1 2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 #号匹配消息:m1 2022-06-07 17:06:10.945 INFO 11812 --- [ntContainer#8-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE1 m1 ``` # 6. 参数设置 ## 1. 消费限额 指每次获取多少条消息,设置为1时,指每次接受一条消息,然后接受下一条。如果数据多时,可以适量增加改配置 ``` spring: rabbitmq: listener: simple: prefetch: 1 # qos=1, 默认250 ``` ## 2. 消息积压 >当队列数据消息积压时,可以增加消费者的并发,同时调大上边的消费限额 concurrency min-max 表示并发数,表示有多少个消费者处理队列里的消息 最小-最大数 ``` @RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”) public class DirectReceiver { @RabbitHandler public void process(Map testMessage){ System.out.println(Thread.currentThread().getName()+testMessage.toString()); } } ``` # 7. 配置json序列化与反序列化 ~~~ @Configuration public class RabbitMQConfig implements RabbitListenerConfigurer { //序列化 object -> json @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter mappingJackson2MessageConverter(){ return new MappingJackson2MessageConverter(); } // 反序列化 json -> object @Bean public RabbitTemplate jacksonRabbitTemplate(final ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } } ~~~