通过设置每条消息的 TTL 来打造延迟队列,本章通过 SpringBoot 演示下图的延迟队列架构。
![](https://img.kancloud.cn/17/5f/175ffea84c32d2987062ade658bec026_1324x223.jpg)
<br/>
步骤如下:
**1. 创建SpringBoot项目**
```xml
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
```
**3. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
```
**4. 配置类**
```java
@Configuration
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
/**
* 队列C绑定死信交换机Y
*/
@Bean("queueC")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(16);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
/**
* 队列C绑定X交换机
*/
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
//with(routingKey)
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
```
**5. 生产者**
```java
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/msg/ttl/send")
public void msgTtlSend(@RequestParam Map<String, String> params) {
String message = "time: " + params.get("time") + ", message: " + params.get("message");
//convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor)
rabbitTemplate.convertAndSend("X", "XC", message, processor -> {
//设置消息的TTL
processor.getMessageProperties().setExpiration(params.get("time"));
return processor;
});
}
}
```
**6. 消费者**
```java
@Slf4j
@Component
public class ConsumerService {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
log.info(msg);
}
}
```
**7. 测试**
启动项目后访问 http://localhost:8080/msg/ttl/send?time=20000&message=C1 ,生产者将生产一条消息,该消息将在 20000ms 后被消费者接收到。
```
2022-10-27 22:18:09.908 INFO 3184 --- [ntConsumerService : time: 20000, message: C1
```
<br/>
如果在消息属性上设置 TTL 的方式,消息可能并不会按时死亡,因为 RabbitMQ <mark>只会检查第一个消息是否过期</mark>,如果过期则丢到死信队列。<mark>如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行</mark>。如果想打造不存在这个限制的延迟队列就需要 RabbitMQ 的[延迟插件](https://www.kancloud.cn/king_om/x_1_mq/2483280)来支持了。
<br/>
如果将队列 TTL 延迟队列和消息 TTL 延迟队列用在一个项目中,便可以优化为如下的架构。
![](https://img.kancloud.cn/20/f2/20f24f70beb14fe5c14d12d9a28d4cea_1399x342.jpg)
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡