🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
Work Queues(工作队列) 大体结构如下所示: ![](http://www.rabbitmq.com/img/tutorials/python-two.png) 在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。 本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。 使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。 ##Preparation 准备 上一节的教程,我们发送已一条包含“Hello World”的消息。这一节我们要发送一个耗时的任务。为了简单起见,我们使用 `Thread.Sleep()`方法, ###发送端 为了简单起见,我们修改上一节的`Send.cs`的代码,从命令行获取message的参数。重命名文件为“NewTask.cs” ``` var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); ``` 其中 `GetMessage(string[] args)`方法定义如下: ``` private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } ``` ###接收端 接下来修改上衣节的`Receive.cs`文件,我们重命名为“Worker.cs” ``` var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); ``` 然后编译代码。 ##Round-robin dispatching 循环调度 使用Task Queue的一个优点就是可以很容易的平均分配任务。如果queue里有堆积过多的任务,我们可以添加更多的Worker就行了。 接下来,我们运行两个Worker.cs的实例,分别为C1和C2 C1 ``` shell1$ Worker.exe Worker [*] Waiting for messages. To exit press CTRL+C ``` C2 ``` shell2$ Worker.exe Worker [*] Waiting for messages. To exit press CTRL+C ``` 在接下来我们运行 NewTask.exe 来发布任务 ``` shell3$ NewTask.exe First message. shell3$ NewTask.exe Second message.. shell3$ NewTask.exe Third message... shell3$ NewTask.exe Fourth message.... shell3$ NewTask.exe Fifth message..... ``` 我们会看到任务的分配情况 C1 ``` shell1$ Worker.exe [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' ``` C2 ``` shell2$ Worker.exe [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' ``` 默认,RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。 ##Message acknowledgment 执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了? 当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。 但是我们不想丢失任何 task。如果一个Worker死了,我们想把任务分发给其他的Worker。 为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。 如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。 默认Message acknowLedgments 是打开的,之前的例子,我们是显式的关闭了(设置 noAck=true)。 下面修改代码,打开Message acknowLedgments。 ``` var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); ``` 运行上面的代码,如果我们kill 一个Worker,message 不会丢失,他会被分发给其他Worker。 > **Forgotten acknowledgment 遗失acknowledgment** 丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。 为了 调试这种错误, 你可以使用`rabbitmqctl`来打印出 `messages_unacknowledged` 的message信息 > ``` $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done. >``` ##Message durability 持久化 通过上面的ACK配置,当consumer 死亡的时候,task 不会丢失。但是如果RabbitMQ服务停了,task 仍然会丢失。 这里我们就要持久化 task的信息了。 首先,我们确保RabbitMQ不会丢失我们的queue ``` channel.QueueDeclare(queue: "hello", durable: true, //持久化 exclusive: false, autoDelete: false, arguments: null); ``` 尽管我们定义名字叫hello 队列要持久化,但是仍然不会生效。这是因为我们已经定义了一个没有持久化的名字叫hello 队列。RabbitMQ 不允许重新定义(用不同的参数)一个已经存在的队列,会报错。因此这里我们应该另外定义一个队列, 例如 task_queue ``` channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); ``` >NOTE:queue 持久化的修改,producer 和consumer的代码都要修改。 通过上面的代码设置我们的queue,即使RabbitMQ重启也不会丢失。 接下来,我们来持久化message。 ``` var properties = channel.CreateBasicProperties(); properties.Persistent=true; ``` >NOTE:尽管我们设置message持久化了,但是这也不能完全保证message不会丢失。 >这是由于RabbitMQ保存message到硬盘是需要时间的,如果再此期间RabbitMQ服务挂了,message就丢失了。不过对于一般的程序已经足够了。如果要一个更强壮的方案,你可是使用[publisher confirms](https://www.rabbitmq.com/confirms.html) ##Fair dispatch 公平调度 也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。 这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。 ![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png) 为了解决上面的问题,我们可以使用 `basicQos`方法 设置 `prefetchCount=1`。这个设置会告诉RabbitMQ 每次给Workder只分配一个task,只有当task执行完了,才分发下一个任务。 ``` channel.BasicQos(0, 1, false); ``` >NOTE:注意queue的size > 如果所有的Worker都busy,你的queue会填满,因此你需要监测queue的情况,添加更多的worker 或者采用其他的策略 ##演示 全部代码 NewTask.cs ``` using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } ``` Worker.cs ``` using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } ```