ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
<mark>在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的</mark>。 <br/> 那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过在生产者设置`mandatory` 参数可以在当消息传递过程中不可到达目的地时将消息返回给生产者。 ```java public class RabbitTemplate extends ... { //mandatory参数 private Expression mandatoryExpression = new ValueExpression(false); ``` <br/> **1. 生产者** ```java @Slf4j @RestController public class ReturnController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private CustomCallBack customCallBack; /** * @PostConstruct 注解标记的方法可以在项目启动时时执行一次,这里在项目启动时 * 注入CustomCallBack对象到RabbitTemplate中去 */ @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(customCallBack); //setMandatory(boolean mandatory) //mandatory=true: 交换机无法将消息进行路由时,会将该消息返回给生产者 //mandatory=false: 如果发现消息无法进行路由,则直接丢弃 rabbitTemplate.setMandatory(true); //设置回退消息交给谁处理 rabbitTemplate.setReturnCallback(customCallBack); } @GetMapping("/return/send") public void sendMessage() { //指定消息id为1 CorrelationData corId = new CorrelationData("1"); //convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId); log.info("生产消息: {}", "message#key1"); CorrelationData corId2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2); log.info("生产消息: {}", "message#key2"); } } ``` **2. 回调对象** ```java @Slf4j @Component public class CustomCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { /** * 不论交换机是否收到消息该方法都会被调用 * * @param corData 消息相关数据 * @param ack 交换机是否收到消息。true收到、false没收到 * @param cause 收到或没收到的理由 */ @Override public void confirm(CorrelationData corData, boolean ack, String cause) { if (ack) { log.info("交换机已收到id={}的消息", corData.getId()); } else { log.info("交换机未收到id={}的消息,原因是:{}", corData.getId(), cause); } } /** * 当消息无法路由到交换机时被调用 * * @param message 消息 * @param replyCode 消息退回代码 * @param replyText 消息退回原因 * @param exchange 交换机名称 * @param routingKey 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText , String exchange, String routingKey) { log.info("消息被退回 -> message={}, replyCode={}, replyText={}, exchange={}, routingKey={}" , new String(message.getBody()), replyCode, replyText, exchange, routingKey); } } ``` **3. 测试** 启动项目后访问 http://localhost:8080/return/send 生产了两条消息,控制台输出如下。 ``` ...] c.l.r.controller.ReturnController : 生产消息: message#key1 ...] c.l.r.controller.ReturnController : 生产消息: message#key2 ...] c.l.rabbitmq03.callback.CustomCallBack : 消息被退回 -> message=message#key2, replyCode=312, replyText=NO_ROUTE , exchange=confirm.exchange, routingKey=key2 ...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=2的消息 ...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1 ...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=1的消息 ``` 可见消息 `message#key2` 因为没有队列与之绑定无法路由,所以被退回给了生产者。