[TOC]
# 1. 消息可靠投递
1. 生产者准备好需要投递的消息。
2. 生产者与RabbitMQ服务器建立连接。
3. 生产者发送消息。
4. RabbitMQ服务器接收到消息,并将其路由到指定队列。
5. RabbitMQ服务器发起回调,告知生产者消息发送成功。
![](https://img.kancloud.cn/93/4f/934f37b6c1e8f1197b5d936713c564ec_606x234.png)
**所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。**
![](https://img.kancloud.cn/ec/42/ec42072c6177bfbffdbab0314cf7f3b6_981x575.png)
为了避免争议,补充说明一下,如果没有设置Mandatory参数,是不需要先路由消息才发起回调的,服务器收到消息后就会进行回调确认。
2、3、5步都是通过TCP连接进行交互,有网络调用的地方就会有事故,网络波动随时都有可能发生,不管是内部机房停电,还是外部光缆被切,网络事故无法预测,虽然这些都是小概率事件,但对于订单等敏感数据处理来说,这些情况下导致消息丢失都是不可接受的。
# 2. 实现可靠投递
默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说,默认情况下生产者是不知道消息有没有正确地到达服务器。
那么如何解决这个问题呢?
对此,RabbitMQ中有一些相关的解决方案:
1. 使用事务机制来让生产者感知消息被成功投递到服务器。
2. 通过生产者确认机制实现。
在RabbitMQ中,所有确保消息可靠投递的机制都会对性能产生一定影响,如使用不当,可能会对吞吐量造成重大影响,只有通过执行性能基准测试,才能在确定性能与可靠投递之间的平衡。
在使用可靠投递前,需要先思考以下问题:
1. 消息发布时,保证消息进入队列的重要性有多高?
2. 如果消息无法进行路由,是否应该将该消息返回给发布者?
3. 如果消息无法被路由,是否应该将其发送到其他地方稍后再重新进行路由?
4. 如果RabbitMQ服务器崩溃了,是否可以接受消息丢失?
5. RabbitMQ在处理新消息时是否应该确认它已经为发布者执行了所有请求的路由和持久化?
6. 消息发布者是否可以批量投递消息?
7. 在可靠投递上是否有可以接受的平衡性?是否可以接受一部分的不可靠性来提升性能?
只考虑平衡性不考虑性能是不行的,至于这个平衡的度具体如何把握,就要具体情况具体分析了,比如像订单数据这样敏感的信息,对可靠性的要求自然要比一般的业务消息对可靠性的要求高的多,因为订单数据是跟钱直接相关的,可能会导致直接的经济损失。
## 2.1 RabbitMQ的事务机制
RabbitMQ是支持AMQP事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一方法。
在SpringBoot项目中,使用RabbitMQ事务其实很简单,只需要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true即可。
配置文件如下:
RabbitMQ是支持AMQP事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一方法。
在SpringBoot项目中,使用RabbitMQ事务其实很简单,只需要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true即可。
配置文件如下:
~~~
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
~~~
先来配置一下交换机和队列,以及事务管理器。
~~~
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明业务队列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
}
// 声明业务队列绑定关系
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 配置启用rabbitmq事务
* @param connectionFactory
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
~~~
然后创建一个消费者,来监听消息,用以判断消息是否成功发送。
~~~
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息:{}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
~~~
然后是消息生产者:
~~~
@Slf4j
@Component
public class BusinessMsgProducer{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setChannelTransacted(true);
}
@Transactional
public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
log.info("msg:{}", msg);
if (msg != null && msg.contains("exception"))
throw new RuntimeException("surprise!");
log.info("消息已发送 {}" ,msg);
}
}
~~~
这里有两个注意的地方:
1. 在初始化方法里,通过使用`rabbitTemplate.setChannelTransacted(true);`来开启事务。
2. 在发送消息的方法上加上`@Transactional`注解,这样在该方法中发生异常时,消息将不会发送。
在controller中加一个接口来生产消息:
~~~
@RestController
public class BusinessController {
@Autowired
private BusinessMsgProducer producer;
@RequestMapping("send")
public void sendMsg(String msg){
producer.sendMsg(msg);
}
}
~~~
来验证一下:
~~~
msg:1
消息已发送 1
收到业务消息:1
msg:2
消息已发送 2
收到业务消息:2
msg:3
消息已发送 3
收到业务消息:3
msg:exception
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause
java.lang.RuntimeException: surprise!
at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
...
~~~
当`msg`的值为`exception`时, 在调用`rabbitTemplate.convertAndSend`方法之后,程序抛出了异常,消息并没有发送出去,而是被当前事务回滚了。
当然,你可以将事务管理器注释掉,或者将初始化方法的开启事务注释掉,这样事务就不会生效,即使在调用了发送消息方法之后,程序发生了异常,消息也会被正常发送和消费。
RabbitMQ中的事务使用起来虽然简单,但是对性能的影响是不可忽视的,因为每次事务的提交都是阻塞式的等待服务器处理返回结果,而默认模式下,客户端是不需要等待的,直接发送就完事了,除此之外,事务消息需要比普通消息多4次与服务器的交互,这就意味着会占用更多的处理时间,所以如果对消息处理速度有较高要求时,尽量不要采用事务机制。
## 2.2 RabbitMQ的生产者确认机制
1.发送确认:生产者-Broker
2.路由确认:交换机-队列
以上两个环节消息失败,都可以回调生产者
### 2.2.1 投递失败回调(confirm)
通过生产者确认机制,生产者可以在消息被服务器成功接收时得到反馈,并有机会处理未被成功接收的消息。
在Springboot中开启RabbitMQ的生产者确认模式也很简单,只多了一行配置:
~~~
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
publisher-confirms: true
~~~
`publisher-confirms: true`即表示开启生产者确认模式。
然后将消息生产者的代表进行部分修改:
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息确认成功, id:{}", id);
} else {
log.error("消息未成功投递, id:{}, cause:{}", id, s);
}
}
}
~~~
让生产者继承自`RabbitTemplate.ConfirmCallback`类,然后实现其`confirm`方法,即可用其接收服务器回调。
### 2.2.2 路由失败回调(return机制)
消息发送到Exchange后,没有找到绑定的队列,投递消息失败才执行return消息确认机制
1. yml配置增加`spring.rabbitmq.publisher-returns=true`
2. 实现接口,当有消息路由失败时,回调
![](https://img.kancloud.cn/23/f4/23f4f7c8e18eca164ca0cf7cfc7cc896_620x393.png)
## mandatory 参数
设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。
![](https://img.kancloud.cn/2a/62/2a62db40b23b7e2e54919b08a2e5665b_1234x642.png)那么如何设置这个参数呢?在发送消息的时候,只需要在初始化方法添加一行代码即可:
~~~
rabbitTemplate.setMandatory(true);
~~~
开启之后我们再重新运行前面的代码:
~~~
消息id:19729f33-15c4-4c1b-8d48-044c301e2a8e, msg:1
消息id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1
消息确认成功, id:19729f33-15c4-4c1b-8d48-044c301e2a8e
Returned message but no callback available
消息确认成功, id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb
收到业务消息:1
~~~
我们看到中间多了一行提示`Returned message but no callback available`这是什么意思呢?
我们上面提到,设置 mandatory 参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受该消息。
为了进行回调,我们需要实现一个接口`RabbitTemplate.ReturnCallback`。
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息确认成功, id:{}", id);
} else {
log.error("消息未成功投递, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
~~~
然后我们再来重新运行一次:
~~~
消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1
消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1
消息确认成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef
消息无法被路由,被服务器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2
消息确认成功, id:85c771cb-c88f-47dd-adea-f0da57138423
收到业务消息:1
~~~
可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:`NO_ROUTE`。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回回调会在消息的确认回调之前。
## ***0***|***1*****备份交换机**
有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
不要慌,在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。
什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
听的不太明白?没关系,看个图就知道是怎么回事了。
![](https://img.kancloud.cn/a9/d6/a9d6da99f5fa5b8ff21e80eee6ef5334_1334x614.png)
接下来,我们就来设置一下备份交换机:
~~~
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";
public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";
public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";
public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";
public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";
// 声明业务 Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 声明备份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME)
.durable(true);
return (FanoutExchange)exchangeBuilder.build();
}
// 声明业务队列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
}
// 声明业务队列绑定关系
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
// 声明备份队列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();
}
// 声明报警队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();
}
// 声明备份队列绑定关系
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明备份报警队列绑定关系
@Bean
public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
~~~
这里我们使用`ExchangeBuilder`来创建交换机,并为其设置备份交换机:
~~~
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
~~~
为业务交换机绑定了一个队列,为备份交换机绑定了两个队列,一个用来存储不可投递消息,待之后人工处理,一个专门用来做报警用途。
接下来,分别为业务交换机和备份交换机创建消费者:
~~~
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息:{}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
~~~
~~~
@Slf4j
@Component
public class BusinessWaringConsumer {
@RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.error("发现不可路由消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
接下来我们分别发送一条可路由消息和不可路由消息:
~~~
@Slf4j
@Component
public class BusinessMsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
}
~~~
消息如下:
~~~
消息id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1
消息id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1
收到业务消息:1
发现不可路由消息:1
~~~
这里仅仅使用 error 日志配合日志系统进行报警,如果是敏感数据,可以使用邮件、钉钉、短信、电话等报警方式来提高时效性。
那么问题来了,mandatory 参数与备份交换机可以一起使用吗?设置 mandatory 参数会让交换机将不可路由消息退回给生产者,而备份交换机会让交换机将不可路由消息转发给它,那么如果两者同时开启,消息究竟何去何从??
emmm,想这么多干嘛,试试不就知道了。
修改一下生产者即可:
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息确认成功, id:{}", id);
} else {
log.error("消息未成功投递, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
~~~
再来测试一下:
~~~
消息id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1
消息id:d8c9e010-e120-46da-a42e-1ba21026ff06, msg:1
消息确认成功, id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4
消息确认成功, id:d8c9e010-e120-46da-a42e-1ba21026ff06
发现不可路由消息:1
收到业务消息:1
~~~
可以看到,两条消息都可以收到确认成功回调,但是不可路由消息不会被回退给生产者,而是直接转发给备份交换机。可见备份交换机的处理优先级更高。
## ***0***|***1*****总结**