企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
通过设置队列的 TTL 来打造延时队列,本章通过 SpringBoot 演示下图的延迟队列架构。 ![](https://img.kancloud.cn/bc/62/bc62dd079ec92e1c7d0aa043609164b5_1433x245.jpg) 创建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct。再创建一个死信队列 QD。 <br/> 步骤如下: **1. 创建SpringBoot项目** ```xml <dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> ``` **2. `resources/application.properties`** ```properties spring.rabbitmq.host=192.168.0.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` **3. 配置类** ```java @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 队列 A 绑定到对应的死信交换机 */ @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(16); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } /** * 队列 A 绑定 X 交换机 */ @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { // with(routingKey) return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } /** * 队列 B 绑定到对应的死信交换机 */ @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } /** * 队列 B 绑定 X 交换机 */ @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { // with(routingKey) return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } /** * 死信队列 QD 绑定死信交换机Y */ @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { // with(routingKey) return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } } ``` **4. 生产者** ```java @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/ttl/send") public void ttlSend() { //convertAndSend(String exchange, String routingKey, Object object) rabbitTemplate.convertAndSend("X", "XA", "ttl为10S队列消息"); rabbitTemplate.convertAndSend("X", "XB", "ttl为40S队列消息"); } } ``` **5. 消费者** ```java @Slf4j @Component public class ConsumerService { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), "UTF-8"); log.info(msg); } } ``` **6. 测试** 启动项目后访问 http://localhost:8080/ttl/send 生产者将生产两条消息。消费者消费的消息如下。 ``` 2022-10-27 21:58:26 : ttl为10S队列消息 2022-10-27 21:58:56 : ttl为40S队列消息 ``` 第一条消息在 10S 后变成了死信消息,然后被消费者消费掉。 第二条消息在 40S 之后变成了死信消息,然后被消费掉。 这样一个延时队列就打造完成了。