<mark>在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的</mark>。
<br/>
那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过在生产者设置`mandatory` 参数可以在当消息传递过程中不可到达目的地时将消息返回给生产者。
```java
public class RabbitTemplate extends ... {
//mandatory参数
private Expression mandatoryExpression = new ValueExpression(false);
```
<br/>
**1. 生产者**
```java
@Slf4j
@RestController
public class ReturnController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CustomCallBack customCallBack;
/**
* @PostConstruct 注解标记的方法可以在项目启动时时执行一次,这里在项目启动时
* 注入CustomCallBack对象到RabbitTemplate中去
*/
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(customCallBack);
//setMandatory(boolean mandatory)
//mandatory=true: 交换机无法将消息进行路由时,会将该消息返回给生产者
//mandatory=false: 如果发现消息无法进行路由,则直接丢弃
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnCallback(customCallBack);
}
@GetMapping("/return/send")
public void sendMessage() {
//指定消息id为1
CorrelationData corId = new CorrelationData("1");
//convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData)
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId);
log.info("生产消息: {}", "message#key1");
CorrelationData corId2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2);
log.info("生产消息: {}", "message#key2");
}
}
```
**2. 回调对象**
```java
@Slf4j
@Component
public class CustomCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* 不论交换机是否收到消息该方法都会被调用
*
* @param corData 消息相关数据
* @param ack 交换机是否收到消息。true收到、false没收到
* @param cause 收到或没收到的理由
*/
@Override
public void confirm(CorrelationData corData, boolean ack, String cause) {
if (ack) {
log.info("交换机已收到id={}的消息", corData.getId());
} else {
log.info("交换机未收到id={}的消息,原因是:{}", corData.getId(), cause);
}
}
/**
* 当消息无法路由到交换机时被调用
*
* @param message 消息
* @param replyCode 消息退回代码
* @param replyText 消息退回原因
* @param exchange 交换机名称
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText
, String exchange, String routingKey) {
log.info("消息被退回 -> message={}, replyCode={}, replyText={}, exchange={}, routingKey={}"
, new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
```
**3. 测试**
启动项目后访问 http://localhost:8080/return/send 生产了两条消息,控制台输出如下。
```
...] c.l.r.controller.ReturnController : 生产消息: message#key1
...] c.l.r.controller.ReturnController : 生产消息: message#key2
...] c.l.rabbitmq03.callback.CustomCallBack : 消息被退回 -> message=message#key2, replyCode=312, replyText=NO_ROUTE
, exchange=confirm.exchange, routingKey=key2
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=2的消息
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=1的消息
```
可见消息 `message#key2` 因为没有队列与之绑定无法路由,所以被退回给了生产者。
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡