## 发布/订阅
一次向许多消费者发送消息
### 任务
我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发送日志消息,第二个是接收并打印它们。
即:发布的日志消息将被广播给所有的接收者。
### Exchanges(交换机)
RabbitMQ中消息传递模型的核心思想是**生产者永远不会将任何消息直接发送到队列中**。
代替的操作是:**producer**发送消息给**Exchanges**,
交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们排队。交易所必须知道如何处理收到的消息。是否应该附加到特定的队列?它应该附加到多少队列中?还是应该丢弃。这些规则是由**交换类型**定义的 。
![](http://www.rabbitmq.com/img/tutorials/exchanges.png =400x120)
#### 交换类型
direct, topic, headers and fanout。
这里使用fanout举例子:
创建一个这种类型的交换,并将其称为日志:
```
channel.exchangeDeclare("logs","fanout");
```
交换非常简单。只是将所有收到的消息广播到它所知道的所有队列中。这正是我们记录器所需要的。
```
//第一个参数是交易所的名称。空字符串表示默认或无名交换
channel.basicPublish("logs","",null,message.getBytes());
```
#### 临时队列
对于上面的日志,我们希望了解所有日志消息,而不仅仅是其中的一部分。也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。
1. 首先,每当我们连接到mq,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者,甚至更好 - 让服务器为我们选择一个随机队列名称。
2. 其次,一旦我们断开消费者,队列应该被自动删除。
在Java客户端中,当我们不给queueDeclare()提供参数时,我们 用一个生成的名称创建一个非持久的,独占的自动删除队列:
用一个生成的名称创建一个非持久的,独占的自动删除队列(此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。)
`String queueName = channel.queueDeclare().getQueue();`
### 绑定
![](http://www.rabbitmq.com/img/tutorials/bindings.png =400x120)
我们已经创建了一个**fanout**交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系被称为绑定。
```
channel.queueBind(queueName, "logs", "");
//从现在起,日志交换将把消息附加到我们的队列中
```
#### 列出绑定
`./rabbitmqctl list_bindings`
![](http://www.rabbitmq.com/img/tutorials/python-three-overall.png =400x150)
发出日志消息的生产者程序与前面的教程没有什么不同。最重要的变化是,我们现在要发布消息到我们的日志交换,而不是无名的。发送时我们需要提供一个路由密钥,但是对于fanout交换,它的**值将被忽略**。
### 代码
**EmitLog.java**
```
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个交换
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "hello exchange";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
```
**ReceiveLogs**
```
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 把队列和交换机绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
```