如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延迟队列。对此我们可以使用 rabbitmq_delayed_message_exchange 插件来实现一个通用的延迟队列。
<br/>
步骤如下:
[TOC]
# 1. 安装延迟队列插件
**1. 下载:https://www.rabbitmq.com/community-plugins.html**
![](https://img.kancloud.cn/8a/f2/8af27d6cb8515a34bd50b1639f4770c0_1260x213.jpg)
![](https://img.kancloud.cn/a7/1e/a71ee33270e6873b220eebb5b55923bd_1259x206.jpg)
**2. 上传插件到 RabbitMQ 的插件目录**
```shell
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
```
**3. 运行下面的命令让插件生效**
```shell
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
![](https://img.kancloud.cn/28/3c/283ce57cf9e20c73fcbf3049927c96dd_1346x319.jpg)
<br/>
# 2. 打造延迟队列
:-: ![](https://img.kancloud.cn/ab/84/ab84268652333d0387545d66364d6189_1275x212.jpg)
架构图
delayed.exchange 是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
<br/>
步骤如下:
**1. 创建SpringBoot项目**
```xml
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
```
**2. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
```
**3. 配置类**
```java
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
/**
* 声明队列
*/
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
/**
* 声明延迟交换机
*/
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>(16);
//延迟交换机类型。x-delayed-type是固定写法,不可自定义
args.put("x-delayed-type", ExchangeTypes.DIRECT);
//x-delayed-message是固定写法,不可自定义
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
/**
* 队列与交换机绑定
*/
@Bean
public Binding binding(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
```
**4. 生产者**
```java
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("/plugins/send")
public void pluginsSend() {
Map<Integer, String> data = new HashMap<>(16);
data.put(20000, "ttl为20000的消息");
data.put(2000, "ttl为2000的消息");
for (Map.Entry<Integer, String> entry : data.entrySet()) {
//convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor)
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, entry.getValue(), processor -> {
processor.getMessageProperties().setDelay(entry.getKey());
return processor;
});
log.info(entry.getValue());
}
}
}
```
**5. 消费者**
```java
@Slf4j
@Component
public class ConsumerService {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info(msg);
}
}
```
**6. 测试**
启动项目后访问 http://localhost:8080/plugins/send ,生产者生产了两条消息,控制台输出如下。
```
2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl为20000的消息
2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl为2000的消息
2022-10-27 22:38:23.420 INFO 7604 --- [nservice.ConsumerService : ttl为2000的消息
2022-10-27 22:38:41.398 INFO 7604 --- [nservice.ConsumerService : ttl为20000的消息
```
第二个消息TTL时间短,所以先被消费掉,符合预期。
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡