多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
通过设置每条消息的 TTL 来打造延迟队列,本章通过 SpringBoot 演示下图的延迟队列架构。 ![](https://img.kancloud.cn/17/5f/175ffea84c32d2987062ade658bec026_1324x223.jpg) <br/> 步骤如下: **1. 创建SpringBoot项目** ```xml <dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> ``` **3. `resources/application.properties`** ```properties spring.rabbitmq.host=192.168.0.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` **4. 配置类** ```java @Configuration public class MsgTtlQueueConfig { public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC"; /** * 队列C绑定死信交换机Y */ @Bean("queueC") public Queue queueB() { Map<String, Object> args = new HashMap<>(16); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由key args.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } /** * 队列C绑定X交换机 */ @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { //with(routingKey) return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } } ``` **5. 生产者** ```java @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/msg/ttl/send") public void msgTtlSend(@RequestParam Map<String, String> params) { String message = "time: " + params.get("time") + ", message: " + params.get("message"); //convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor) rabbitTemplate.convertAndSend("X", "XC", message, processor -> { //设置消息的TTL processor.getMessageProperties().setExpiration(params.get("time")); return processor; }); } } ``` **6. 消费者** ```java @Slf4j @Component public class ConsumerService { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), "UTF-8"); log.info(msg); } } ``` **7. 测试** 启动项目后访问 http://localhost:8080/msg/ttl/send?time=20000&message=C1 ,生产者将生产一条消息,该消息将在 20000ms 后被消费者接收到。 ``` 2022-10-27 22:18:09.908 INFO 3184 --- [ntConsumerService : time: 20000, message: C1 ``` <br/> 如果在消息属性上设置 TTL 的方式,消息可能并不会按时死亡,因为 RabbitMQ <mark>只会检查第一个消息是否过期</mark>,如果过期则丢到死信队列。<mark>如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行</mark>。如果想打造不存在这个限制的延迟队列就需要 RabbitMQ 的[延迟插件](https://www.kancloud.cn/king_om/x_1_mq/2483280)来支持了。 <br/> 如果将队列 TTL 延迟队列和消息 TTL 延迟队列用在一个项目中,便可以优化为如下的架构。 ![](https://img.kancloud.cn/20/f2/20f24f70beb14fe5c14d12d9a28d4cea_1399x342.jpg)