多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
上一节,我们创建了一个工作队列,work queue 会把message 平均分发给每一个worker,一个message只会分发给一个worker。 这一节,我们做些改变,我们把一个message分发给多个consumer。这种模式叫“publish/subscribe” 发布、订阅模式。 为了阐明这种模式,我们建立一个日志系统。它包含两个程序——一个发射日志信息,另一个接收并打印日志信息。 在这个日志系统中,每一个receiver程序都会获得日志信息。因此我们可以设置一个receiver接收并保存日志信息到硬盘,另一个接收并打印日志信息。 也就是说,published log message 会广播给所有的receiver (订阅者)。 ##Exchanges(交换机) 之前的教程里,我们的程序结构都是: - A producer is a user application that sends messages. - A queue is a buffer(缓存) that stores messages. - A consumer is a user application that receives messages. RabbitMQ的消息模型的核心思想是,生产者没有直接向队列发送任何消息。实际上,经常生产者甚至不知道一个消息将传递给任何队列。事实上,Producer只能发送message给**exchange**。exchange 很简单,一方面它从producers 接收messages,另一方面,它把messages 推送给queues。因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。 ![](http://www.rabbitmq.com/img/tutorials/exchanges.png) exchange 有以下几种:`direct`, `topic`, `headers` 和 `fanout`。这一节,我们主要使用最后一种——**fanout**。 下面我们定义个 名字叫logs,类型为fanout的exchange ``` channel.ExchangeDeclare("logs", "fanout"); ``` fanout exchange很简单。它会广播所有收到的message传递给它知道的queue。 **listing exchanges 列出exchange** 我们可以使用下面的命令 服务器上的exchanges ``` $ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done. ``` **Nameless(无名) exchange** 在之前章节的教程中,我们不知道exchange,但是我们仍然能够把message传递给queue,这是因为我们使用了默认的exchange。 之前我们发布message,用的代码如下所示: ``` var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); ``` 第一参数代表exchange的名字,这里使用空字符串表示用默认的或nameless exchange。message会被发送给routingKey指定的队列。 接下来,我们把它替换成发布到我们命名的exchange。 ``` var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); ``` ##Temporary(临时) queues 之前我们使用的queue都有一个特定的名字(hello 或者 task_queue?)。当你想在生产者和消费者之间共享队列,给queue命名是至关重要的。 但是 这种情况对于我们的日志系统是不适合的。我们想看到所有的日志信息,而不是仅仅是他的一个子集。我们也只对当前流动的感兴趣而不是旧的消息。 为了解决这个问题我们需要做两件事: 1、我们需要一个新的,空的队列,不管什么时候我们连接到Rabbit。这就需要,我们每次连接rabbti都要创建一个名字随机的队列,或者让服务器选择一个名字随机的队列给我们。 2、一旦我们consumer断开与queue的连接,queue应该自动删除。 在.NET client 我们提供了一个无参的 `queueDeclare()`方法,使用它,我们可以创建一个 不持久化,名字唯一,自动删除的队列。 ``` var queueName = channel.QueueDeclare().QueueName; ``` 这里queueName是随机生成的队列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg ##Bindings 绑定 ![](http://www.rabbitmq.com/img/tutorials/bindings.png) 我们已经创建了一个 fanout类型的exchange和所需的queue。现在,我们就需要告诉 exchange 发送messages 到我们指定的 queue。这里,exchange和queue的关系我们叫做binding(绑定)。 ``` channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); ``` >**Listing bindings** 你可以通过`rabbitmqctl list_bindings`命令,查看已经存在的bingding。 ##Putting it all together ![](http://www.rabbitmq.com/img/tutorials/python-three-overall.png) EmitLog.cs ``` using System; using RabbitMQ.Client; using System.Text; class EmitLog { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } } ``` ReceiveLogs.cs ``` using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } ``` 编译上面的代码 ``` $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs ``` 如果你想保存log到文件, ``` $ ReceiveLogs.exe > logs_from_rabbit.log ``` 发送日志: ``` $ EmitLog.exe first log ``` 使用 rabbitmqctl list_bindings命令 来查看我们创建的绑定。 ``` $ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done. ```