有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。
<br/>
但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
<br/>
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
<br/>
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的备胎,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎。
<br/>
当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
<br/>
:-: ![](https://img.kancloud.cn/8a/a0/8aa0e9638055d4fec91fc2134d0b268a_1377x401.jpg)
图1:架构图
演示图1的消息过程步骤如下:
**1. 配置类**
```java
@Configuration
public class BackupConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
public static final String KEY_1 = "key1";
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* backup.exchange交换机作为confirm.exchange交换机的备份交换机
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange) exchangeBuilder.build();
}
@Bean
public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(KEY_1);
}
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
@Bean("backQueue")
public Queue backQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
}
}
```
**2. 报警消费者**
```java
@Slf4j
@Component
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.info("报警发现不可路由消息:{}", msg);
}
}
```
**3. 测试**
(1)因为改变了 `confirm.exchange`的绑定属性,所以先将该交换机删除。
![](https://img.kancloud.cn/85/57/85577978e5db56c6e8a33068516f247e_1459x233.jpg)
(2)启动项目后访问 http://localhost:8080/return/send 生产了两条消息,控制台输出如下。
```
...] c.l.r.controller.ReturnController : 生产消息: message#key1
...] c.l.r.controller.ReturnController : 生产消息: message#key2
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.service.WarningConsumer : 报警发现不可路由消息:message#key2
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=1的消息
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=2的消息
```
可见不可路由消息`message#key2`被转发到备份交换机并被报警消费者消费了。
<br/>
>[info]如果 mandatory 参数与备份交换机一起使用则备份交换机优先级高。
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡