多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 1. routing_key规则 通过 Topic 交换机发送的消息的 routing_key 不能随意写,必须满足规则:<mark>它必须是一个单词列表,以点号`.`分隔开</mark>,这些单词可以是任意单词,比如: ``` stock.usd.nyse nyse.vmw quick.orange.rabbit ``` 但这个单词列表最多不能超过 255 个字节。 <br/> 在这个规则列表中,其中有两个替换符是大家需要注意的:`*`可以代替一个单词;`#`可以替代零个或多个单词。 ![](https://img.kancloud.cn/a0/bf/a0bf0b37cabc2b5eb73d1ad21567bd70_1304x269.jpg) Q1 → 绑定的是中间带 orange,并且共有3个单词的key,`*.orange.*`。 Q2 → 绑定的是最后一个单词是 rabbit,并且共有3个单词的key,`*.*.rabbit`;还有就是第一个单词是 lazy 的多个单词的key,`lazy.#`。 <br/> 我们来看看上图队列绑定关系之间数据接收情况是怎么样的。 ``` quick.orange.rabbit 被队列 Q1Q2 接收到 lazy.orange.elephant 被队列 Q1Q2 接收到 quick.orange.fox 被队列 Q1 接收到 lazy.brown.fox 被队列 Q2 接收到 lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次 quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃 quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃 lazy.orange.male.rabbit 是四个单词但匹配 Q2 ``` <br/> 当队列绑定关系是下列这种情况时需要引起注意: (1)当一个队列绑定键是`#`,那么这个队列将接收所有数据,就有点像 fanout 了。 (2)如果队列绑定键当中没有出现`#`和`*`,那么该队列绑定类型就是 direct 了。 <br/> # 2. Topic交换机演示 **1. 生产者** ```java public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { /* * 声明一个 exchange。 * exchangeDeclare(String var1, String var2) * var1: exchange名称 * var2: exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /* * Q1-->绑定的是中间带 orange 并且共有3个单词的key(*.orange.*) * Q2-->绑定的是最后一个单词是 rabbit 并且共有3个单词(*.*.rabbit) 和 第一个单词是 lazy 的多个单词(lazy.#) 的key */ Map<String, String> bindingKeyMap = new HashMap<>(16); bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println(EmitLogTopic.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)*`ReceiveLogsTopic01`* ```java public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明 Q1 队列与绑定关系 String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); String newMsg = "{队列名称: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey() + ", msg: " + message + "}"; System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[接收到的消息]: " + newMsg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` (2)*`ReceiveLogsTopic02`* ```java public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明 Q2 队列与绑定关系 String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); String newMsg = "{队列名称: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey() + ", msg: " + message + "}"; System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[接收到的消息]: " + newMsg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** 先启动两个消费者,再启动生产者并生产多条消息,控制台输出如下。 ``` -----------EmitLogTopic生产者生产了多条消息----------- EmitLogTopic[生产者发出消息]: 是四个单词不匹配任何绑定会被丢弃 EmitLogTopic[生产者发出消息]: 不匹配任何绑定不会被任何队列接收到会被丢弃 EmitLogTopic[生产者发出消息]: 被队列 Q1Q2 接收到 EmitLogTopic[生产者发出消息]: 被队列 Q2 接收到 EmitLogTopic[生产者发出消息]: 被队列 Q1Q2 接收到 EmitLogTopic[生产者发出消息]: 被队列 Q1 接收到 EmitLogTopic[生产者发出消息]: 虽然满足两个绑定但只被队列 Q2 接收一次 EmitLogTopic[生产者发出消息]: 是四个单词但匹配 Q2 -----------ReceiveLogsTopic01消费者收到的消息----------- ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: lazy.orange.elephant, msg: 被队列 Q1Q2 接收到} ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: quick.orange.rabbit, msg: 被队列 Q1Q2 接收到} ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: quick.orange.fox, msg: 被队列 Q1 接收到} -----------ReceiveLogsTopic02消费者收到的消息----------- ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.orange.elephant, msg: 被队列 Q1Q2 接收到} ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.brown.fox, msg: 被队列 Q2 接收到} ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: quick.orange.rabbit, msg: 被队列 Q1Q2 接收到} ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.pink.rabbit, msg: 虽然满足两个绑定但只被队列 Q2 接收一次} ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.orange.male.rabbit, msg: 是四个单词但匹配 Q2} ``` **** 案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01