上一节,我们创建了一个工作队列,work queue 会把message 平均分发给每一个worker,一个message只会分发给一个worker。
这一节,我们做些改变,我们把一个message分发给多个consumer。这种模式叫“publish/subscribe” 发布、订阅模式。
为了阐明这种模式,我们建立一个日志系统。它包含两个程序——一个发射日志信息,另一个接收并打印日志信息。
在这个日志系统中,每一个receiver程序都会获得日志信息。因此我们可以设置一个receiver接收并保存日志信息到硬盘,另一个接收并打印日志信息。
也就是说,published log message 会广播给所有的receiver (订阅者)。
##Exchanges(交换机)
之前的教程里,我们的程序结构都是:
- A producer is a user application that sends messages.
- A queue is a buffer(缓存) that stores messages.
- A consumer is a user application that receives messages.
RabbitMQ的消息模型的核心思想是,生产者没有直接向队列发送任何消息。实际上,经常生产者甚至不知道一个消息将传递给任何队列。事实上,Producer只能发送message给**exchange**。exchange 很简单,一方面它从producers 接收messages,另一方面,它把messages 推送给queues。因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。
![](http://www.rabbitmq.com/img/tutorials/exchanges.png)
exchange 有以下几种:`direct`, `topic`, `headers` 和 `fanout`。这一节,我们主要使用最后一种——**fanout**。
下面我们定义个 名字叫logs,类型为fanout的exchange
```
channel.ExchangeDeclare("logs", "fanout");
```
fanout exchange很简单。它会广播所有收到的message传递给它知道的queue。
**listing exchanges 列出exchange**
我们可以使用下面的命令 服务器上的exchanges
```
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
```
**Nameless(无名) exchange**
在之前章节的教程中,我们不知道exchange,但是我们仍然能够把message传递给queue,这是因为我们使用了默认的exchange。
之前我们发布message,用的代码如下所示:
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
```
第一参数代表exchange的名字,这里使用空字符串表示用默认的或nameless exchange。message会被发送给routingKey指定的队列。
接下来,我们把它替换成发布到我们命名的exchange。
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
```
##Temporary(临时) queues
之前我们使用的queue都有一个特定的名字(hello 或者 task_queue?)。当你想在生产者和消费者之间共享队列,给queue命名是至关重要的。
但是 这种情况对于我们的日志系统是不适合的。我们想看到所有的日志信息,而不是仅仅是他的一个子集。我们也只对当前流动的感兴趣而不是旧的消息。
为了解决这个问题我们需要做两件事:
1、我们需要一个新的,空的队列,不管什么时候我们连接到Rabbit。这就需要,我们每次连接rabbti都要创建一个名字随机的队列,或者让服务器选择一个名字随机的队列给我们。
2、一旦我们consumer断开与queue的连接,queue应该自动删除。
在.NET client 我们提供了一个无参的 `queueDeclare()`方法,使用它,我们可以创建一个 不持久化,名字唯一,自动删除的队列。
```
var queueName = channel.QueueDeclare().QueueName;
```
这里queueName是随机生成的队列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg
##Bindings 绑定
![](http://www.rabbitmq.com/img/tutorials/bindings.png)
我们已经创建了一个 fanout类型的exchange和所需的queue。现在,我们就需要告诉 exchange 发送messages 到我们指定的 queue。这里,exchange和queue的关系我们叫做binding(绑定)。
```
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
```
>**Listing bindings**
你可以通过`rabbitmqctl list_bindings`命令,查看已经存在的bingding。
##Putting it all together
![](http://www.rabbitmq.com/img/tutorials/python-three-overall.png)
EmitLog.cs
```
using System;
using RabbitMQ.Client;
using System.Text;
class EmitLog
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
}
```
ReceiveLogs.cs
```
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogs
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
```
编译上面的代码
```
$ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
$ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs
```
如果你想保存log到文件,
```
$ ReceiveLogs.exe > logs_from_rabbit.log
```
发送日志:
```
$ EmitLog.exe first log
```
使用 rabbitmqctl list_bindings命令 来查看我们创建的绑定。
```
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
```