## 工作队列
在工作中分配任务(竞争消费者模式)。
创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
![](http://www.rabbitmq.com/img/tutorials/python-two.png =400x120)
#### 需求
工作队列:工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,安排稍后完成任务。把一个任务封装成一个消息并发送给一个队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。
#### 举例
把字符串中的点数作为它的复杂度。每一个点将占到“工作”的一秒钟。例如,Hello ...描述的假任务 将需要三秒钟。
修改Send.java代码,以允许从命令行发送任意消息。
##### 生产者代码
```
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
String message = getMessage(argv);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] argv) {
if (argv.length < 1)
return "Hello World!";
return joinStrings(argv, ".");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
```
##### 消费者代码
```
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
//伪造每个点有一个任务要执行。它将处理交付的消息并执行任务.
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
```
### 循环调度
使用任务队列的优点之一是能够轻松地平行工作。如果我们积压工作,我们可以增加更多的工人,这样可以轻松扩展。
我们尝试同时运行两个工作者实例。他们都会从队列中得到消息。
**做法**
将消费者的main方法执行二次,得到二个执行中的运行任务。并开始执行生产者的main方法。结果:默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法(round-robin)。
### 消息确认
当消息发送出去,到达其中一个消费者时,如果恰巧这个时候,消费者死亡,那么就会导致该条消息不会被处理。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认(告知)告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时上网,则会迅速重新发送给其他消费者。
手动消息确认默认打开。
`channel.basicConsume(QUEUE_NAME, true, consumer);`//默认为false为打开状态
即:第二个参数为false时-》即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会丢失任何东西。工人死后不久,所有未确认的消息将被重新发送。
### 消息持久性
如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,此时需要将队列和消息标记为持久以确保消息不会丢失
#### 1.持久队列
注意 生产者消费者的队列定义都要设置为true,当生产者不同发送时,一旦启动了消费者,就会自动的进行消息的消费。
即使RabbitMQ重新启动,task_queue队列也不会丢失
**send**
```
boolean durable = true ; //代表队列持久化
channel.queueDeclare("task_queue",durable,假,假,空);
```
#### 2.消息持久性
现在我们需要将消息标记为持久的 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
```
channel.basicPublish("","task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
```
### 公平派遣
当任务的难度不一致时,一个工人可能会一直做务比较重的任务,而另一个工人几乎不会做什么事,RabbitMQ不知道任何关于这个,并将仍然均匀地发送消息。
发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地把第n条消息分发给第n个消费者。
![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png =400x120)
#### 解决方案
我们可以使用basicQos方法`prefetchCount = 1`设置。
这告诉RabbitMQ一次不能给一个工作者多个消息。或者换句话说,不要向工作人员发送新消息,直到处理并确认了前一个消息。
```
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1 ;
channel.basicQos(prefetchCount);
```