🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延迟队列。对此我们可以使用 rabbitmq_delayed_message_exchange 插件来实现一个通用的延迟队列。 <br/> 步骤如下: [TOC] # 1. 安装延迟队列插件 **1. 下载:https://www.rabbitmq.com/community-plugins.html** ![](https://img.kancloud.cn/8a/f2/8af27d6cb8515a34bd50b1639f4770c0_1260x213.jpg) ![](https://img.kancloud.cn/a7/1e/a71ee33270e6873b220eebb5b55923bd_1259x206.jpg) **2. 上传插件到 RabbitMQ 的插件目录** ```shell /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins ``` **3. 运行下面的命令让插件生效** ```shell rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` ![](https://img.kancloud.cn/28/3c/283ce57cf9e20c73fcbf3049927c96dd_1346x319.jpg) <br/> # 2. 打造延迟队列 :-: ![](https://img.kancloud.cn/ab/84/ab84268652333d0387545d66364d6189_1275x212.jpg) 架构图 delayed.exchange 是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。 <br/> 步骤如下: **1. 创建SpringBoot项目** ```xml <dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> ``` **2. `resources/application.properties`** ```properties spring.rabbitmq.host=192.168.0.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` **3. 配置类** ```java @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; /** * 声明队列 */ @Bean("delayedQueue") public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } /** * 声明延迟交换机 */ @Bean("delayedExchange") public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(16); //延迟交换机类型。x-delayed-type是固定写法,不可自定义 args.put("x-delayed-type", ExchangeTypes.DIRECT); //x-delayed-message是固定写法,不可自定义 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } /** * 队列与交换机绑定 */ @Bean public Binding binding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } } ``` **4. 生产者** ```java @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("/plugins/send") public void pluginsSend() { Map<Integer, String> data = new HashMap<>(16); data.put(20000, "ttl为20000的消息"); data.put(2000, "ttl为2000的消息"); for (Map.Entry<Integer, String> entry : data.entrySet()) { //convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor) rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, entry.getValue(), processor -> { processor.getMessageProperties().setDelay(entry.getKey()); return processor; }); log.info(entry.getValue()); } } } ``` **5. 消费者** ```java @Slf4j @Component public class ConsumerService { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message) { String msg = new String(message.getBody()); log.info(msg); } } ``` **6. 测试** 启动项目后访问 http://localhost:8080/plugins/send ,生产者生产了两条消息,控制台输出如下。 ``` 2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl为20000的消息 2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl为2000的消息 2022-10-27 22:38:23.420 INFO 7604 --- [nservice.ConsumerService : ttl为2000的消息 2022-10-27 22:38:41.398 INFO 7604 --- [nservice.ConsumerService : ttl为20000的消息 ``` 第二个消息TTL时间短,所以先被消费掉,符合预期。