上一节,我们创建了一个日志系统,我们能够分发log信息给每个订阅者。
这一节,我们在其上添加额外的功能——只订阅log信息的一个子集。例如:我们只把至关重要的错误日志信息,记录到文件,而所有的日志信息都可以在控制带输出。
##Bindings
上一节,我们已经定义了绑定,你可以回忆代码如下:
```
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
```
binding是exchange与queue之间的关系,简单的来说就是。queue对message来自指定的exchange的感兴趣。
Binding可以指定routingKey参数。为了避免和BasicPublish参数疑惑,我们可以叫它 binding key。因此我们可以创建一个带key的bingding
```
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: "black");
```
bingding key的意义取决于 exchange的类型。fanout类型的exchange会忽略这个值。
##Direct exchange
之前的日志系统只能给分发message给所有的consumer。我们想根据message的log lever来过滤message。例如,我们只想记录重要的错误日志信息到FILE,其他的不记录到文件。
但是fanout类型exchange 不够灵活,它只能盲目的分发。
因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。
大体结构如下所示:
![](http://www.rabbitmq.com/img/tutorials/direct-exchange.png)
我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。
因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。
##Multiple bindings
![](http://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png)
多个队列绑定同样的key是合法的。在这种情况下,direct 类型的exchage的行为和fanout表现的一样。
##Emitting logs
首先创建 exchange
```
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
```
接下来发送 message
```
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
```
为了简化,我们的serverity 可以是"info","warning","error"。
##Subscribing 订阅
接收消息的工作就像在前面的教程中,有一个例外,我们要创建一个新绑定,绑定到每个严重性我们感兴趣的。
```
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
```
##putting ti all together
![](http://www.rabbitmq.com/img/tutorials/python-four.png)
EmitLogDirect.cs
```
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip( 1 ).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
```
ReceiveLogsDirect.cs
```
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
```
编译他们。
如果你想保存warning,error的log到file。
```
$ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log
```
如果你想 发射一条 error log message
```
$ EmitLogDirect.exe error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
```