[TOC]
# 1. 死信队列
> 在发送异常或者网络波动时,消息不能被处理,导致业务异常,所以要把这种异常消息发送到死信队列处理这些异常消息,补救
## 1.1 死信和死信队列
一、死信
> 当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。
1. 消息被否定确认,使用`channel.basicNack`或`channel.basicReject`,并且此时`requeue`属性被设置为`false`。
2. 消息在队列的存活时间超过设置的TTL时间(队列ttl-所有消息都生效,单条消息ttl)。
3. 消息队列的消息数量已经超过最大队列长度。
二、死信队列
> **“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。**
三、配置死信队列
1. 配置业务队列,绑定到业务交换机上
2. 为业务队列配置死信交换机和路由key
3. 为死信交换机配置死信队列
4. 交换机可以为任何类型【Direct、Fanout、Topic】
## 1.2 实例
### 1.2.1 配置
**1.手动确认消息**
```
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
```
**2.配置交换机和队列**
一个正常交换机绑定两个业务队列A和B(广播模式);A和B绑定一个死信交换机,由不同的key绑定,
A队列消费者产生一个死信消息
~~~
package com.tuna.mq.rabbitmq.config.deadletter;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(2);
//x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明业务队列B
@Bean("businessQueueB")
public Queue businessQueueB() {
Map<String, Object> args = new HashMap<>(2);
//x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 声明业务队列B绑定关系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
~~~
### 1.2.2 消费者
1.正常消费者,Ack=false消息失败,A队列中的消息将会发到死信队列
~~~
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("deadletter")) {
throw new RuntimeException("dead letter exception");
}
} catch (Exception e) {
ack = false;
exception = e;
}
if (!ack) {
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
log.info("收到业务消息B:{}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
2.死信消费者
~~~
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
### 1.2.3 生产者
~~~
@Component
public class BusinessMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg) {
rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
}
}
~~~
controller
~~~
@Autowired
private BusinessMessageSender sender;
@RequestMapping("sendmsg")
public void sendMsg(String msg) {
sender.sendMsg(msg);
}
~~~
1.发送正常消息
http://localhost:8080/mq/sendmsg?msg=msg
2.产生一个死信消息,如下A队列产生一个死信消息,他对应的死信队列会接收到这个消息
http://localhost:8080/mq/sendmsg?msg=deadletter
```
2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息B:msg
2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息A:msg
2022-06-13 15:12:23.739 INFO 3920 --- [nio-8080-exec-4] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-KnERgpHzUz4thORpw0BXIQ identity=566c4da9] started
2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息A:deadletter
2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息B:deadletter
2022-06-13 15:12:23.744 ERROR 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 消息消费发生异常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
at com.tuna.mq.rabbitmq.service.deadletter.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:26)
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_311]
收到死信消息A:deadletter
```
# 2. 延时队列
1. 先进先出的有序队列
2. 用来存放需要在指定时间被处理的元素的队列
## 2.1 场景
可用于延时性较低、数据量大的定时任务:多少时间之后,做...
1. 订单在十分钟之内未支付则自动取消。
2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3. 账单在一周内未支付,则自动结算。
4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
7. 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。
但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所
有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。更重要的一点是,不!优!雅!没错,作为一名有追求的程序员,始终应该追求更优雅的架构和更优雅的代码风格,写代码要像写诗一样优美。
## 2.2 实例
给消息设置了ttl,死信即为延时
1.生产者发送消息到交换机--->业务队列
2.在ttl事件内没有消费者读取消息,消息进入死信(延时)队列
3.延时队列消费者消费延时消息
有三种实现方式:
一、队列上设置超时,不投灵活
二、消息上设置超时,在超市时间点上不一定能发送超时消息
三、插件的方式
### 2.2.1 插件的方式
1. 声明一个`x-delayed-message`类型的交换机
2. 发布带`x-delay`(指定延期时间)消息头的消息
3. 在超过制定时间后,消息发送到指定队列,被绑定这个队列的消费者消费
1.下载
打开官网下载: http://www.rabbitmq.com/community-plugins.html ,注意rabbitmq和插件的版本对应,3.x.x -3.x.x
2.将插件放入rabbitmq的plugins目录
```
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez 38ada6e58bba:/plugins
```
3.启动插件
```
docker exec -it 38ada6e58bba /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
代码示例
1.配置插件支持的交换机,并绑定队列
~~~
@Configuration
public class DelayedRabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delay.queue";
public static final String DELAYED_EXCHANGE_NAME = "delay.exchange";
public static final String DELAYED_ROUTING_KEY = "delay.routingkey";
@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
//指定低层交换机类型交换机类型
args.put("x-delayed-type", "direct");
//插件封装的交换机
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
~~~
args可以配置选项:
```
String name, 交换机名称
String type, 交换机消息类型(x-delayed-message)
boolean durable, 是否持久化
boolean autoDelete,是否删除
Map arguments
// 队列中的消息什么时候会自动被删除?
arguments.put("x-message-ttl",10000);
//设置过期时间
arguments.put("x-expires", 10000);
//x-expires用于当多长时间没有消费者访问该队列的时候,该队列会自动删除,
//x-max-length:
arguments.put("x-max-length", 4);
用于指定队列的长度,如果不指定,可以认为是无限长,例如指定队列的长度是4,当超过4条消息,前面的消息将被删除,给后面的消息腾位
```
2.生产者发送延时消息
~~~
@Slf4j
@Component
public class DelyMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMsg(String msg, Integer delayTime) {
log.info("dely sender send a message: {}",msg);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
}
~~~
3. 消费者
~~~
@Component
public class DelyMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DelyMessageConsumer.class);
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveHeadersQueue(String message) {
logger.info("dely consumer received message: {}", new String(message));
}
}
~~~
4. controller
~~~
@RequestMapping("dely/{message}/{timeOut}")
public String dely(@PathVariable("message") String message,@PathVariable("timeOut") Integer timeOut) {
delyMessageSender.sendDelayMsg(message, timeOut);
return "success";
}
~~~
http://localhost:8080/mq/dely/hello/30000
输出 时隔30秒
```
2022-07-04 09:40:34.033 INFO 6316 --- [io-8080-exec-10] c.t.m.r.service.dely.DelyMessageSender : dely sender send a message: hello
2022-07-04 09:41:04.036 INFO 6316 --- [ntContainer#2-1] c.t.m.r.s.dely.DelyMessageConsumer : dely consumer received message: hello
```