Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认就有些 Fanout exchange 类型。
![](https://img.kancloud.cn/78/73/7873426d7cbd11af1d524806c6ff00d5_1572x538.jpg)
<br/>
演示下图的 Fanout 交换机。
![](https://img.kancloud.cn/32/b2/32b28242cd01a895acf84c2555bee45f_1345x267.jpg)
步骤如下:
**1. 生产者**
```java
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
/*
* 声明一个 exchange。
* exchangeDeclare(String var1, String var2)
* var1: exchange名称
* var2: exchange类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息.");
while (sc.hasNext()) {
String message = sc.nextLine();
/*
* 发送一个消息。
* basicPublish(String var1, String var2, BasicProperties var3, byte[] var4)
* var1: 发送到那个交换机
* var2: 队列名称(这里不用指定)
* var3: 其他的参数信息
* var4: 发送的消息
*/
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(EmitLog.class.getSimpleName() + "[生产者发出消息]: " + message);
}
}
}
}
```
**2. 两个消费者**
(1)*`ReceiveLogs01`*
```java
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//临时队列
com.rabbitmq.client.AMQP.Queue.DeclareOk queue = channel.queueDeclare();
//临时队列名
String queueName = queue.getQueue();
//将队列与交互机绑定
//queueBind(String var1, String var2, String var3)
//var3参数是交换机与队列绑定的标识,称为routingkey(或binding key)
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(ReceiveLogs01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(ReceiveLogs01.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)*`ReceiveLogs02`*
```java
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
com.rabbitmq.client.AMQP.Queue.DeclareOk queue = channel.queueDeclare();
String queueName = queue.getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(ReceiveLogs02.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(ReceiveLogs02.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 测试**
先启动两个消费者,然后启动生产者并生产4条消息,控制台输出如下。
```
-----------EmitLog生产者生产了4条消息-----------
EmitLog[生产者发出消息]: 消息C1
EmitLog[生产者发出消息]: 消息C2
EmitLog[生产者发出消息]: 消息C3
EmitLog[生产者发出消息]: 消息C4
-----------ReceiveLogs01消费者收到了4条消息-----------
ReceiveLogs01[接收到的消息]: 消息C1
ReceiveLogs01[接收到的消息]: 消息C2
ReceiveLogs01[接收到的消息]: 消息C3
ReceiveLogs01[接收到的消息]: 消息C4
-----------ReceiveLogs02消费者也收到了4条一模一样消息-----------
ReceiveLogs02[接收到的消息]: 消息C1
ReceiveLogs02[接收到的消息]: 消息C2
ReceiveLogs02[接收到的消息]: 消息C3
ReceiveLogs02[接收到的消息]: 消息C4
```
****
案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡