企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 发布/订阅 一次向许多消费者发送消息 ### 任务 我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发送日志消息,第二个是接收并打印它们。 即:发布的日志消息将被广播给所有的接收者。 ### Exchanges(交换机) RabbitMQ中消息传递模型的核心思想是**生产者永远不会将任何消息直接发送到队列中**。 代替的操作是:**producer**发送消息给**Exchanges**, 交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们排队。交易所必须知道如何处理收到的消息。是否应该附加到特定的队列?它应该附加到多少队列中?还是应该丢弃。这些规则是由**交换类型**定义的 。 ![](http://www.rabbitmq.com/img/tutorials/exchanges.png =400x120) #### 交换类型 direct, topic, headers and fanout。 这里使用fanout举例子: 创建一个这种类型的交换,并将其称为日志: ``` channel.exchangeDeclare("logs","fanout"); ``` 交换非常简单。只是将所有收到的消息广播到它所知道的所有队列中。这正是我们记录器所需要的。 ``` //第一个参数是交易所的名称。空字符串表示默认或无名交换 channel.basicPublish("logs","",null,message.getBytes()); ``` #### 临时队列 对于上面的日志,我们希望了解所有日志消息,而不仅仅是其中的一部分。也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。 1. 首先,每当我们连接到mq,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者,甚至更好 - 让服务器为我们选择一个随机队列名称。 2. 其次,一旦我们断开消费者,队列应该被自动删除。 在Java客户端中,当我们不给queueDeclare()提供参数时,我们 用一个生成的名称创建一个非持久的,独占的自动删除队列: 用一个生成的名称创建一个非持久的,独占的自动删除队列(此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。) `String queueName = channel.queueDeclare().getQueue();` ### 绑定 ![](http://www.rabbitmq.com/img/tutorials/bindings.png =400x120) 我们已经创建了一个**fanout**交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系被称为绑定。 ``` channel.queueBind(queueName, "logs", ""); //从现在起,日志交换将把消息附加到我们的队列中 ``` #### 列出绑定 `./rabbitmqctl list_bindings` ![](http://www.rabbitmq.com/img/tutorials/python-three-overall.png =400x150) 发出日志消息的生产者程序与前面的教程没有什么不同。最重要的变化是,我们现在要发布消息到我们的日志交换,而不是无名的。发送时我们需要提供一个路由密钥,但是对于fanout交换,它的**值将被忽略**。 ### 代码 **EmitLog.java** ``` public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个交换 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "hello exchange"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } ``` **ReceiveLogs** ``` public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 随机生成一个队列 String queueName = channel.queueDeclare().getQueue(); // 把队列和交换机绑定 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ```