[TOC]
# 1.简单工作模式
> **无交换机**、只有一个队列一个消费者
> 1.声明一个队列
> 2.生产者向队列发送消息
> 3.消费者消费消息,如果队列有多个消费者则变成了work模式
1.声明一个队列
~~~
@Configuration
public class SimpleQueueConfig {
/**
* 定义简单队列名.
*/
private final String simpleQueue = "queue_simple";
@Bean
public Queue simpleQueue() {
return new Queue(simpleQueue);
}
}
~~~
# 自动创建队列
```
//1. 手动创建,需在RabbitMQ中手动创建myQueue1 队列,否则报错
@RabbitListener(queues = “myQueue1”)
public void process1(String message){
log.info(“MqReceiver1: {}”, message);
}
//2. 自动创建队列
@RabbitListener(queuesToDeclare = @Queue(“myQueue2”))
public void process2(String message){
log.info(“MqReceiver2: {}”, message);
}
//3. 自动创建队列,Exchange 与 Queue绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(“myQueue3”),
exchange = @Exchange(“testExChange”)
))
public void process3(String message){
log.info(“MqReceiver3: {}”, message);
}
```
2.生产者向队列中发送消息
~~~
@Slf4j
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "简单消息" + i;
log.info("我是生产信息:{}", message);
rabbitTemplate.convertAndSend( "queue_simple", message);
}
}
}
~~~
3.消费者从队列中取到消息
~~~
@Slf4j
@Component
public class SimpleConsumers {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消费信息:{}", new String(message.getBody()));
}
}
~~~
4.controller
~~~
@RequestMapping("/simple")
public String sendSimpleMsg(){
simpleProducer.sendMessage();
return "success";
}
~~~
访问接口,输出
```
2022-06-07 14:58:57.380 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息0
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息1
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息2
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息3
2022-06-07 14:58:57.388 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息4
2022-06-07 14:58:57.397 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息0
2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息1
2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息2
2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息3
2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息4
```
# 2.work队列
就是简单模式(也没有交换机),多了一个消费者,来抢占消息
~~~
@Slf4j
@Component
public class WorkConsumers1 {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是work消费信息{}", new String(message.getBody()));
}
}
~~~
访问简单模式的controller,输出如下,两个消费者都在消费消息
```
2022-06-07 15:29:21.002 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息0
2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息1
2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息2
2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息3
2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生产信息:简单消息4
2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息0
2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消费信息简单消息1
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息2
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消费信息简单消息3
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消费信息:简单消息4
```
# 3. 发布订阅模式FanoutExchange
> **多了一个交换机**
> 1.创建队列
> 2.创建交换机FanoutExchange
> **3.将队列绑定到交换机**,没有route-key,只要绑定到交换机,就给发消息
> 4.生产者发送数据到交换机
> 5.交换机把消息广播到所有绑定的队列
> 6.队列的消费者抢占消费消息
## **3.1 配置队列和交换机**
如下 生命了三个队列,绑定到了同一个交换机
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue("queue1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("queue2", false);
}
@Bean
public Queue fanoutQueue3() {
return new Queue("queue3", true);
}
//创建扇形交换机,参数为交换机的名称
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
//将三个队列都与该交换机绑定起来,无需binding_key
@Bean
public Binding bindingFanoutExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
}
~~~
## **3.2 生产者**
~~~
@Service
public class FanoutMQSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String message) {
//广播消息,无需指定routing_key
amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message);
}
}
~~~
## **3.3 消费者**
controller
~~~
@GetMapping("/fanoutExchange/{message}")
public String fanoutExchange(@PathVariable("message") String message) {
fanoutMQSender.send(message);
return "success";
}
~~~
消费者代码
~~~
@Service
public class FanoutMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(FanoutMQReceiver.class);
//queue1订阅者1
@RabbitListener(queues = "queue1")
public void receive1(String message) {
logger.info("queue1 receive : fanout message {}", message);
}
//queue1订阅者2
@RabbitListener(queues = "queue1")
public void receive1_1(String message) {
logger.info("queue1_1 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue2")
public void receive2(String message) {
logger.info("queue2 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue3")
public void receive3(String message) {
logger.info("queue3 receive : fanout message {}", message);
}
}
~~~
队列queue1,有两个消费者抢占消费,如下
```
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 001
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 001
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 001
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 002
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 002
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#0-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1_1 receive : fanout message 002
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 003
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 003
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 003
```
界面统计未被消费的消息
![](https://img.kancloud.cn/2d/3d/2d3da0a53958750a791fe65c9ae0951b_1176x380.png)
## 3.4 新应用接入
如果有新的应用也想订阅消息怎么办?如一个新的springboot项目
### 3.4.1 将自己创建的队列绑定到已有的交换机
1.应用自己生命一个队列
2.将队列绑定到上边的交换机
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue newQuene() {
return new Queue("new_queue", true);
}
//创建扇形交换机,参数为交换机的名称
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(newQuene()).to(fanoutExchange());
}
}
~~~
### 3.4.2 订阅自己的队列即可
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue newQuene() {
return new Queue("new_queue", true);
}
//创建扇形交换机,参数为交换机的名称
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(newQuene()).to(fanoutExchange());
}
}
~~~
原有项目
![](https://img.kancloud.cn/36/1d/361d2fc1fbedcbdc737156805393ae9c_1234x293.png)
新项目,接收到交换机发来的消息消息
![](https://img.kancloud.cn/e1/a9/e1a91bec7a7be74c9c75bc24a622c6b7_1366x221.png)
* [ ] 消费者自动创建队列,没有配置的情况下
项目中引入了RabbitMQ,但是在加了@bean配置交换机和queue,启动项目却没自动化创建队列
原因:RabbitMQ懒加载模式, 需要配置消费者监听才会创建
```
@RabbitListener(queues = "short_link.add.link.queue")
```
另外一种方式(若Mq中无相应名称的队列,会自动创建Queue)
```
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
```
# 4.路由模式DirectExchange
1.队列和交换机的绑定多了一个key
2.生产者发送消息到交换机带着一个key,交换机根据key,选择消息发送的队列
应用:可根据消息路由队列,如地区、不同应用
## 4.1 配置
~~~
@Configuration
public class DirectMQConfig {
@Bean
public Queue directQueue1() {
return new Queue("route-queue1", true);
}
@Bean
public Queue directQueue2() {
return new Queue("route-queue2", false);
}
//创建直连交换机,参数为交换机的名称
@Bean
public DirectExchange directExchange() {
return new DirectExchange("DIRECT_EXCHANGE");
}
@Bean
public Binding bindingDirectExchange1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1");
}
@Bean
public Binding bindingDirectExchange2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.2");
}
}
~~~
## 4.2 producer
~~~
@Service
public class DirectMQSender {
//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
@Autowired
private AmqpTemplate amqpTemplate;
//直接路由
public void send(String message) {
//第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息
amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", "key1:" + message);
amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.2", "key2:" + message);
}
}
~~~
## 4.3 Consumer
~~~
@Service
public class DirectMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(DirectMQReceiver.class);
//此注解表示监听某个队列,参数为队列名
@RabbitListener(queues = "route-queue1")
public void receive1(String message) {
logger.info("queue1 receive : route direct message {}", message);
}
@RabbitListener(queues = "route-queue2")
public void receive2(String message) {
logger.info("queue2 receive : route direct message {}", message);
}
}
~~~
控制台:
```
2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#4-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue2 receive : route direct message key2:m1
2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#5-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue1 receive : route direct message key1:m1
```
# 5. 主体模式TopicExchange
在key的基础上,提供交换机到队列的模式匹配
## 5.1 配置
~~~
topic.# 将发送到所有topic.开头的绑定队列
~~~
~~~
@Configuration
public class TopicMQConfig {
@Bean
public Queue topicQueue1() {
return new Queue("TOPIC_QUEUE1", true);
}
@Bean
public Queue topicQueue2() {
return new Queue("TOPIC_QUEUE2", true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("TOPIC_EXCHANGE");
}
//将topicQueue1与topicExchange交换机绑定
@Bean
public Binding bindQueue1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
}
//将topicQueue2与topicExchange交换机绑定,队列可以接收topic. 开头的routing_key
@Bean
public Binding bindQueue2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
}
~~~
## 5.2 producer
~~~
@Service
public class TopicMQSender {
//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
@Autowired
private AmqpTemplate amqpTemplate;
//直接路由
public void send(String message) {
//第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message);
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.#", "#号匹配消息:"+message);
}
}
~~~
## 5.3 Consumer
~~~
@Service
public class TopicMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class);
@RabbitListener(queues = "TOPIC_QUEUE1")
public void receiveQueue1(String message) {
logger.info("receive : TOPIC_QUEUE1 {}", message);
}
@RabbitListener(queues ="TOPIC_QUEUE2")
public void receiveQueue2(String message) {
logger.info("receive : TOPIC_QUEUE2 {}", message);
}
}
~~~
输出
```
2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 m1
2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 #号匹配消息:m1
2022-06-07 17:06:10.945 INFO 11812 --- [ntContainer#8-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE1 m1
```
# 6. 参数设置
## 1. 消费限额
指每次获取多少条消息,设置为1时,指每次接受一条消息,然后接受下一条。如果数据多时,可以适量增加改配置
```
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # qos=1, 默认250
```
## 2. 消息积压
>当队列数据消息积压时,可以增加消费者的并发,同时调大上边的消费限额
concurrency min-max 表示并发数,表示有多少个消费者处理队列里的消息 最小-最大数
```
@RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”)
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage){
System.out.println(Thread.currentThread().getName()+testMessage.toString());
}
}
```
# 7. 配置json序列化与反序列化
~~~
@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {
//序列化 object -> json
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter(){
return new MappingJackson2MessageConverter();
}
// 反序列化 json -> object
@Bean
public RabbitTemplate jacksonRabbitTemplate(final ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
~~~