ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
上一节,我们创建了一个日志系统,我们能够分发log信息给每个订阅者。 这一节,我们在其上添加额外的功能——只订阅log信息的一个子集。例如:我们只把至关重要的错误日志信息,记录到文件,而所有的日志信息都可以在控制带输出。 ##Bindings 上一节,我们已经定义了绑定,你可以回忆代码如下: ``` channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); ``` binding是exchange与queue之间的关系,简单的来说就是。queue对message来自指定的exchange的感兴趣。 Binding可以指定routingKey参数。为了避免和BasicPublish参数疑惑,我们可以叫它 binding key。因此我们可以创建一个带key的bingding ``` channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black"); ``` bingding key的意义取决于 exchange的类型。fanout类型的exchange会忽略这个值。 ##Direct exchange 之前的日志系统只能给分发message给所有的consumer。我们想根据message的log lever来过滤message。例如,我们只想记录重要的错误日志信息到FILE,其他的不记录到文件。 但是fanout类型exchange 不够灵活,它只能盲目的分发。 因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。 大体结构如下所示: ![](http://www.rabbitmq.com/img/tutorials/direct-exchange.png) 我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。 因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。 ##Multiple bindings ![](http://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png) 多个队列绑定同样的key是合法的。在这种情况下,direct 类型的exchage的行为和fanout表现的一样。 ##Emitting logs 首先创建 exchange ``` channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); ``` 接下来发送 message ``` var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); ``` 为了简化,我们的serverity 可以是"info","warning","error"。 ##Subscribing 订阅 接收消息的工作就像在前面的教程中,有一个例外,我们要创建一个新绑定,绑定到每个严重性我们感兴趣的。 ``` var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } ``` ##putting ti all together ![](http://www.rabbitmq.com/img/tutorials/python-four.png) EmitLogDirect.cs ``` using System; using System.Linq; using RabbitMQ.Client; using System.Text; class EmitLogDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` ReceiveLogsDirect.cs ``` using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogsDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } ``` 编译他们。 如果你想保存warning,error的log到file。 ``` $ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log ``` 如果你想 发射一条 error log message ``` $ EmitLogDirect.exe error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.' ```