[TOC]
# 1. routing_key规则
通过 Topic 交换机发送的消息的 routing_key 不能随意写,必须满足规则:<mark>它必须是一个单词列表,以点号`.`分隔开</mark>,这些单词可以是任意单词,比如:
```
stock.usd.nyse
nyse.vmw
quick.orange.rabbit
```
但这个单词列表最多不能超过 255 个字节。
<br/>
在这个规则列表中,其中有两个替换符是大家需要注意的:`*`可以代替一个单词;`#`可以替代零个或多个单词。
![](https://img.kancloud.cn/a0/bf/a0bf0b37cabc2b5eb73d1ad21567bd70_1304x269.jpg)
Q1 → 绑定的是中间带 orange,并且共有3个单词的key,`*.orange.*`。
Q2 → 绑定的是最后一个单词是 rabbit,并且共有3个单词的key,`*.*.rabbit`;还有就是第一个单词是 lazy 的多个单词的key,`lazy.#`。
<br/>
我们来看看上图队列绑定关系之间数据接收情况是怎么样的。
```
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
```
<br/>
当队列绑定关系是下列这种情况时需要引起注意:
(1)当一个队列绑定键是`#`,那么这个队列将接收所有数据,就有点像 fanout 了。
(2)如果队列绑定键当中没有出现`#`和`*`,那么该队列绑定类型就是 direct 了。
<br/>
# 2. Topic交换机演示
**1. 生产者**
```java
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
/*
* 声明一个 exchange。
* exchangeDeclare(String var1, String var2)
* var1: exchange名称
* var2: exchange类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/*
* Q1-->绑定的是中间带 orange 并且共有3个单词的key(*.orange.*)
* Q2-->绑定的是最后一个单词是 rabbit 并且共有3个单词(*.*.rabbit) 和 第一个单词是 lazy 的多个单词(lazy.#) 的key
*/
Map<String, String> bindingKeyMap = new HashMap<>(16);
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
System.out.println(EmitLogTopic.class.getSimpleName() + "[生产者发出消息]: " + message);
}
}
}
}
```
**2. 两个消费者**
(1)*`ReceiveLogsTopic01`*
```java
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明 Q1 队列与绑定关系
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String newMsg = "{队列名称: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey()
+ ", msg: " + message + "}";
System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[接收到的消息]: " + newMsg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)*`ReceiveLogsTopic02`*
```java
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明 Q2 队列与绑定关系
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String newMsg = "{队列名称: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey()
+ ", msg: " + message + "}";
System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[接收到的消息]: " + newMsg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 测试**
先启动两个消费者,再启动生产者并生产多条消息,控制台输出如下。
```
-----------EmitLogTopic生产者生产了多条消息-----------
EmitLogTopic[生产者发出消息]: 是四个单词不匹配任何绑定会被丢弃
EmitLogTopic[生产者发出消息]: 不匹配任何绑定不会被任何队列接收到会被丢弃
EmitLogTopic[生产者发出消息]: 被队列 Q1Q2 接收到
EmitLogTopic[生产者发出消息]: 被队列 Q2 接收到
EmitLogTopic[生产者发出消息]: 被队列 Q1Q2 接收到
EmitLogTopic[生产者发出消息]: 被队列 Q1 接收到
EmitLogTopic[生产者发出消息]: 虽然满足两个绑定但只被队列 Q2 接收一次
EmitLogTopic[生产者发出消息]: 是四个单词但匹配 Q2
-----------ReceiveLogsTopic01消费者收到的消息-----------
ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: lazy.orange.elephant, msg: 被队列 Q1Q2 接收到}
ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: quick.orange.rabbit, msg: 被队列 Q1Q2 接收到}
ReceiveLogsTopic01[接收到的消息]: {队列名称: Q1, routingKey: quick.orange.fox, msg: 被队列 Q1 接收到}
-----------ReceiveLogsTopic02消费者收到的消息-----------
ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.orange.elephant, msg: 被队列 Q1Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.brown.fox, msg: 被队列 Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: quick.orange.rabbit, msg: 被队列 Q1Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.pink.rabbit, msg: 虽然满足两个绑定但只被队列 Q2 接收一次}
ReceiveLogsTopic02[接收到的消息]: {队列名称: Q2, routingKey: lazy.orange.male.rabbit, msg: 是四个单词但匹配 Q2}
```
****
案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡