企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
:-: ![](https://img.kancloud.cn/7e/42/7e422750a554ad297414aaf334d22e05_1519x244.png) 生产消费过程 **1. 交换机与队列绑定** ```java @Configuration public class RabbitConfig { public final static String DELAYED_EXCHANGE = "delayed.exchange"; public final static String DELAYED_QUEUE = "delayed.queue"; public final static String DELAYED_KEY = "delayed.key"; /** * 创建延迟交换机 */ @Bean("delayedExchange") public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>() {{ //声明延迟交换机类型 //x-delayed-type 是固定写法,不可自定义 put("x-delayed-type", ExchangeTypes.DIRECT); }}; //x-delayed-message是固定写法,不可自定义 return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } /** * 创建队列 */ @Bean("delayedQueue") public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } /** * 队列与交换机绑定 */ @Bean public Binding binding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_KEY).noargs(); } } ``` **2. 生产者生产消息** ```java @Slf4j @RestController @RequiredArgsConstructor public class ProducerController { final RabbitTemplate rabbitTemplate; @GetMapping("/delayed/produce") public void delayedProduce() { //延迟队列的单位是:毫秒 rabbitTemplate.convertAndSend(RabbitConfig.DELAYED_EXCHANGE, RabbitConfig.DELAYED_KEY, "1min后处理", processor -> { processor.getMessageProperties().setDelay(1 * 60 * 1000); return processor; }); log.info("[delayedProduce|生产了消息]: 1min后处理"); rabbitTemplate.convertAndSend(RabbitConfig.DELAYED_EXCHANGE, RabbitConfig.DELAYED_KEY, "2min后处理", processor -> { processor.getMessageProperties().setDelay(2 * 60 * 1000); return processor; }); log.info("[delayedProduce|生产了消息]: 2min后处理"); } } ``` **3. 消费者监听队列** ```java @Slf4j @Component public class RabbitMQListener { @RabbitListener(queues = RabbitConfig.DELAYED_QUEUE) public void delayedConsume(Message<String> message, Channel channel) { log.info("[delayedConsume|收到了消息]: {}", message.getPayload()); } } ``` **4. 测试结果** ``` 2023-11-17T21:04:15 : [delayedProduce|生产了消息]: 1min后处理 2023-11-17T21:04:15 : [delayedProduce|生产了消息]: 2min后处理 2023-11-17T21:05:15 : [delayedConsume|收到了消息]: 1min后处理 2023-11-17T21:06:15 : [delayedConsume|收到了消息]: 2min后处理 ```