Work Queues(工作队列)
大体结构如下所示:
![](http://www.rabbitmq.com/img/tutorials/python-two.png)
在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。
本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。
使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。
##Preparation 准备
上一节的教程,我们发送已一条包含“Hello World”的消息。这一节我们要发送一个耗时的任务。为了简单起见,我们使用
`Thread.Sleep()`方法,
###发送端
为了简单起见,我们修改上一节的`Send.cs`的代码,从命令行获取message的参数。重命名文件为“NewTask.cs”
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
```
其中 `GetMessage(string[] args)`方法定义如下:
```
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
```
###接收端
接下来修改上衣节的`Receive.cs`文件,我们重命名为“Worker.cs”
```
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
```
然后编译代码。
##Round-robin dispatching 循环调度
使用Task Queue的一个优点就是可以很容易的平均分配任务。如果queue里有堆积过多的任务,我们可以添加更多的Worker就行了。
接下来,我们运行两个Worker.cs的实例,分别为C1和C2
C1
```
shell1$ Worker.exe
Worker
[*] Waiting for messages. To exit press CTRL+C
```
C2
```
shell2$ Worker.exe
Worker
[*] Waiting for messages. To exit press CTRL+C
```
在接下来我们运行 NewTask.exe 来发布任务
```
shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....
```
我们会看到任务的分配情况
C1
```
shell1$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
```
C2
```
shell2$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
```
默认,RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。
##Message acknowledgment
执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了?
当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。
但是我们不想丢失任何 task。如果一个Worker死了,我们想把任务分发给其他的Worker。
为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。
如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。
默认Message acknowLedgments 是打开的,之前的例子,我们是显式的关闭了(设置 noAck=true)。
下面修改代码,打开Message acknowLedgments。
```
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
```
运行上面的代码,如果我们kill 一个Worker,message 不会丢失,他会被分发给其他Worker。
> **Forgotten acknowledgment 遗失acknowledgment**
丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。
为了 调试这种错误, 你可以使用`rabbitmqctl`来打印出 `messages_unacknowledged` 的message信息
> ```
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
>```
##Message durability 持久化
通过上面的ACK配置,当consumer 死亡的时候,task 不会丢失。但是如果RabbitMQ服务停了,task 仍然会丢失。
这里我们就要持久化 task的信息了。
首先,我们确保RabbitMQ不会丢失我们的queue
```
channel.QueueDeclare(queue: "hello",
durable: true, //持久化
exclusive: false,
autoDelete: false,
arguments: null);
```
尽管我们定义名字叫hello 队列要持久化,但是仍然不会生效。这是因为我们已经定义了一个没有持久化的名字叫hello 队列。RabbitMQ 不允许重新定义(用不同的参数)一个已经存在的队列,会报错。因此这里我们应该另外定义一个队列,
例如 task_queue
```
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
```
>NOTE:queue 持久化的修改,producer 和consumer的代码都要修改。
通过上面的代码设置我们的queue,即使RabbitMQ重启也不会丢失。
接下来,我们来持久化message。
```
var properties = channel.CreateBasicProperties();
properties.Persistent=true;
```
>NOTE:尽管我们设置message持久化了,但是这也不能完全保证message不会丢失。
>这是由于RabbitMQ保存message到硬盘是需要时间的,如果再此期间RabbitMQ服务挂了,message就丢失了。不过对于一般的程序已经足够了。如果要一个更强壮的方案,你可是使用[publisher confirms](https://www.rabbitmq.com/confirms.html)
##Fair dispatch 公平调度
也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。
这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。
![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png)
为了解决上面的问题,我们可以使用 `basicQos`方法 设置 `prefetchCount=1`。这个设置会告诉RabbitMQ 每次给Workder只分配一个task,只有当task执行完了,才分发下一个任务。
```
channel.BasicQos(0, 1, false);
```
>NOTE:注意queue的size
> 如果所有的Worker都busy,你的queue会填满,因此你需要监测queue的情况,添加更多的worker 或者采用其他的策略
##演示
全部代码
NewTask.cs
```
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
```
Worker.cs
```
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
```