ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认就有些 Fanout exchange 类型。 ![](https://img.kancloud.cn/78/73/7873426d7cbd11af1d524806c6ff00d5_1572x538.jpg) <br/> 演示下图的 Fanout 交换机。 ![](https://img.kancloud.cn/32/b2/32b28242cd01a895acf84c2555bee45f_1345x267.jpg) 步骤如下: **1. 生产者** ```java public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { /* * 声明一个 exchange。 * exchangeDeclare(String var1, String var2) * var1: exchange名称 * var2: exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Scanner sc = new Scanner(System.in); System.out.println("请输入信息."); while (sc.hasNext()) { String message = sc.nextLine(); /* * 发送一个消息。 * basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) * var1: 发送到那个交换机 * var2: 队列名称(这里不用指定) * var3: 其他的参数信息 * var4: 发送的消息 */ channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(EmitLog.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)*`ReceiveLogs01`* ```java public class ReceiveLogs01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //临时队列 com.rabbitmq.client.AMQP.Queue.DeclareOk queue = channel.queueDeclare(); //临时队列名 String queueName = queue.getQueue(); //将队列与交互机绑定 //queueBind(String var1, String var2, String var3) //var3参数是交换机与队列绑定的标识,称为routingkey(或binding key) channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(ReceiveLogs01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(ReceiveLogs01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` (2)*`ReceiveLogs02`* ```java public class ReceiveLogs02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); com.rabbitmq.client.AMQP.Queue.DeclareOk queue = channel.queueDeclare(); String queueName = queue.getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(ReceiveLogs02.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(ReceiveLogs02.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** 先启动两个消费者,然后启动生产者并生产4条消息,控制台输出如下。 ``` -----------EmitLog生产者生产了4条消息----------- EmitLog[生产者发出消息]: 消息C1 EmitLog[生产者发出消息]: 消息C2 EmitLog[生产者发出消息]: 消息C3 EmitLog[生产者发出消息]: 消息C4 -----------ReceiveLogs01消费者收到了4条消息----------- ReceiveLogs01[接收到的消息]: 消息C1 ReceiveLogs01[接收到的消息]: 消息C2 ReceiveLogs01[接收到的消息]: 消息C3 ReceiveLogs01[接收到的消息]: 消息C4 -----------ReceiveLogs02消费者也收到了4条一模一样消息----------- ReceiveLogs02[接收到的消息]: 消息C1 ReceiveLogs02[接收到的消息]: 消息C2 ReceiveLogs02[接收到的消息]: 消息C3 ReceiveLogs02[接收到的消息]: 消息C4 ``` **** 案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01