企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 1. 多重绑定 Direct 交换机可以选择将哪些消息发送到哪些队列上,不将哪些消息发送到哪些队列上。 ![](https://img.kancloud.cn/02/17/0217f71ead5745dd4ba519354104b0ab_1405x340.jpg) 上图中,我们可以看到 X 绑定了两个队列,交换机类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。 <br/> 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black、green 的消息会被发布到队列 Q2,其他没有被绑定的消息将被丢弃。 <br/> :-: ![](https://img.kancloud.cn/a9/c1/a9c1de04f8ce241d4b45d32e1463192b_1359x345.jpg) 多重绑定 当 exchange 类型是 direct,但是它绑定的多个队列的 key 都相同,在这种情况下虽然 exchange 类型是 direct,但是它表现的就和 fanout 交换机有点类似了,就跟广播差不多。 <br/> # 2. Direct交换机演示 演示下图的 Direct 交换机。 ![](https://img.kancloud.cn/6f/2e/6f2e5866315ddb14ae99353971b37c9d_1434x438.jpg) 步骤如下: **1. 生产者** ```java public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { //声明exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建多个 bindingKey Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info", "info信息"); bindingKeyMap.put("warning", "warning信息"); bindingKeyMap.put("error", "error信息"); bindingKeyMap.put("debug", "debug信息"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); /* * 发送消息。 * basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) * var1: 交换机名称 * var2: bindingKey名称 * var3: 其他参数 * var4: 发送的消息 */ channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println(EmitLogDirect.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)*`ReceiveLogsDirect01`* ```java public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "queue.error"; //声明一个队列 channel.queueDeclare(queueName, false, false, false, null); //队列与交换机绑定,绑定的key=error channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println(ReceiveLogsDirect01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); message = "bindKey=" + delivery.getEnvelope().getRoutingKey() + ", msg=" + message; System.out.println(ReceiveLogsDirect01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` (2)*`ReceiveLogsDirect02`* ```java public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "queue.console"; channel.queueDeclare(queueName, false, false, false, null); //注意:这里绑定了两个key channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); System.out.println(ReceiveLogsDirect02.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); message = "bindKey=" + delivery.getEnvelope().getRoutingKey() + ", msg=" + message; System.out.println(ReceiveLogsDirect02.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** 先启动两个消费者,再启动生产者并生产4条消息,控制台输出如下。 ``` -----------EmitLogDirect生产者生产了4条消息----------- EmitLogDirect[生产者发出消息]: debug信息 EmitLogDirect[生产者发出消息]: warning信息 EmitLogDirect[生产者发出消息]: error信息 EmitLogDirect[生产者发出消息]: info信息 -----------ReceiveLogsDirect01消费者收到了error消息----------- ReceiveLogsDirect01[接收到的消息]: bindKey=error, msg=error信息 -----------ReceiveLogsDirect02消费者收到warning与info共两条消息----------- ReceiveLogsDirect02[接收到的消息]: bindKey=warning, msg=warning信息 ReceiveLogsDirect02[接收到的消息]: bindKey=info, msg=info信息 ``` 由于`debug`消息没有与任何队列绑定,所以被丢弃了。 **** 案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01