企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
:-: ![](https://img.kancloud.cn/24/9a/249a8de6f0d9d9ee86d59fc2c9ce0421_1434x357.jpg) 生产消费过程 **1. 声明队列与交换机的绑定关系** ```java @Configuration public class RabbitConfig { public final static String SECOND_EXCHAGE = "second.exchange"; public final static String MARS_QUEUE = "mars.queue"; public final static String MERCURY_QUEUE = "mercury.queue"; public final static String MARS_SECOND_KEY = "mars.second.key"; public final static String MERCURY_SECOND01_KEY = "mercury.second01.key"; public final static String MERCURY_SECOND02_KEY = "mercury.second02.key"; /** * 创建Direct交换机 */ @Bean("secondExchange") public DirectExchange secondExchange() { return new DirectExchange(SECOND_EXCHAGE); } /** * 创建mars队列 */ @Bean("marsQueue") public Queue marsQueue() { return new Queue(MARS_QUEUE); } /** * 创建mercury队列 */ @Bean("mercuryQueue") public Queue mercuryQueue() { return new Queue(MERCURY_QUEUE); } /** * mars队列与交换机绑定 */ @Bean public Binding marsBinding(@Qualifier("marsQueue") Queue queue, @Qualifier("secondExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(MARS_SECOND_KEY); } /** * mercury队列与交换机绑定,key=MERCURY_SECOND01_KEY */ @Bean public Binding mercury01Binding(@Qualifier("mercuryQueue") Queue queue, @Qualifier("secondExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(MERCURY_SECOND01_KEY); } /** * mercury队列与交换机绑定,key=MERCURY_SECOND02_KEY */ @Bean public Binding mercury02Binding(@Qualifier("mercuryQueue") Queue queue, @Qualifier("secondExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(MERCURY_SECOND02_KEY); } } ``` **2. 生产者** ```java @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/direct/produce") public void directProduce() { Map<String, Object> msgBody = new HashMap<>(16); msgBody.put("name", "zhangsan"); msgBody.put("age", 25); msgBody.put("key", RabbitConfig.MARS_SECOND_KEY); //convertAndSend(String exchange, String routingKey, Object object) rabbitTemplate.convertAndSend(RabbitConfig.SECOND_EXCHAGE, RabbitConfig.MARS_SECOND_KEY, msgBody); log.info("directProduce[生产了消息]:{}", msgBody); msgBody.put("key", RabbitConfig.MERCURY_SECOND01_KEY); rabbitTemplate.convertAndSend(RabbitConfig.SECOND_EXCHAGE, RabbitConfig.MERCURY_SECOND01_KEY, msgBody); log.info("directProduce[生产了消息]:{}", msgBody); msgBody.put("key", RabbitConfig.MERCURY_SECOND02_KEY); rabbitTemplate.convertAndSend(RabbitConfig.SECOND_EXCHAGE, RabbitConfig.MERCURY_SECOND02_KEY, msgBody); log.info("directProduce[生产了消息]:{}", msgBody); } } ``` **3. 消费者** ```java @Slf4j @Service public class ConsumerService { /** * 监听mars队列 */ @RabbitListener(queues = RabbitConfig.MARS_QUEUE) public void marsConsume(Message<Map<String, Object>> message, Channel channel) { Map<String, Object> msgBody = message.getPayload(); log.info("marsConsume[收到了消息]:{}", msgBody); } /** * 监听mercury队列 */ @RabbitListener(queues = RabbitConfig.MERCURY_QUEUE) public void mercuryConsume(Message<Map<String, Object>> message, Channel channel) { Map<String, Object> msgBody = message.getPayload(); log.info("mercuryConsume[收到了消息]:{}", msgBody); } } ``` **4. 测试结果** ``` : directProduce[生产了消息]:{name=zhangsan, age=25, key=mars.second.key} : directProduce[生产了消息]:{name=zhangsan, age=25, key=mercury.second01.key} : directProduce[生产了消息]:{name=zhangsan, age=25, key=mercury.second02.key} : marsConsume[收到了消息]:{name=zhangsan, age=25, key=mars.second.key} : mercuryConsume[收到了消息]:{name=zhangsan, age=25, key=mercury.second01.key} : mercuryConsume[收到了消息]:{name=zhangsan, age=25, key=mercury.second02.key} ```