企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
交换机根据消息header做出判断,是否投递给消费者 ## 1.配置 ~~~ @Configuration public class HeaderMQConfig { @Bean public Queue headersQueue() { return new Queue("HEADERS_QUEUE"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("HEADERS_EXCHANGE"); } @Bean public Binding bingHeadersQueue() { //map为绑定的规则 Map<String, Object> map = new HashMap<>(); map.put("headers1", "value1"); map.put("headers2", "value2"); //whereAll表示需要满足所有条件 return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match(); } } ~~~ whereAll:要求有这两个header消息,才可以投递,还有whereAny和where ## 2.producer 生产者要带这两个请求头,否则消息不会被交换机投递,少一个都不行,根据上边的配置要求 ~~~ @Service public class HeaderMQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("headers1", "value1"); messageProperties.setHeader("headers2", "value2"); //要发送的消息,第一个参数为具体的消息字节数组,第二个参数为消息规则 Message msg = new Message(message.getBytes(), messageProperties); //加入类似header的信息 amqpTemplate.convertAndSend("HEADERS_EXCHANGE", "", msg); } } ~~~ ## 3.consumer controller ~~~ @GetMapping("/headerExchange/{message}") public String HeaderExchange(@PathVariable("message") String message) { headerMQSender.send(message); return "success"; } ~~~ 消费者 ~~~ @Service public class HeaderMQReceiver { private static final Logger logger = LoggerFactory.getLogger(HeaderMQReceiver.class); @RabbitListener(queues = "HEADERS_QUEUE") public void receiveHeadersQueue(byte[] message) { logger.info("receive : HeadersQueue {}", new String(message)); } } ~~~ 输出: ``` 2022-06-07 17:55:15.647 INFO 3736 --- [ntContainer#0-1] c.t.m.r.service.header.HeaderMQReceiver : receive : HeadersQueue m1 ```