企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 工作队列 在工作中分配任务(竞争消费者模式)。 创建一个工作队列,用于在多个工作人员之间分配耗时的任务。 ![](http://www.rabbitmq.com/img/tutorials/python-two.png =400x120) #### 需求 工作队列:工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,安排稍后完成任务。把一个任务封装成一个消息并发送给一个队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。 #### 举例 把字符串中的点数作为它的复杂度。每一个点将占到“工作”的一秒钟。例如,Hello ...描述的假任务 将需要三秒钟。 修改Send.java代码,以允许从命令行发送任意消息。 ##### 生产者代码 ``` public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { String message = getMessage(argv); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] argv) { if (argv.length < 1) return "Hello World!"; return joinStrings(argv, "."); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ``` ##### 消费者代码 ``` Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); //伪造每个点有一个任务要执行。它将处理交付的消息并执行任务. private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } ``` ### 循环调度 使用任务队列的优点之一是能够轻松地平行工作。如果我们积压工作,我们可以增加更多的工人,这样可以轻松扩展。 我们尝试同时运行两个工作者实例。他们都会从队列中得到消息。 **做法** 将消费者的main方法执行二次,得到二个执行中的运行任务。并开始执行生产者的main方法。结果:默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法(round-robin)。 ### 消息确认 当消息发送出去,到达其中一个消费者时,如果恰巧这个时候,消费者死亡,那么就会导致该条消息不会被处理。 为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认(告知)告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。 如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时上网,则会迅速重新发送给其他消费者。 手动消息确认默认打开。 `channel.basicConsume(QUEUE_NAME, true, consumer);`//默认为false为打开状态 即:第二个参数为false时-》即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会丢失任何东西。工人死后不久,所有未确认的消息将被重新发送。 ### 消息持久性 如果RabbitMQ服务器停止,我们的任务仍然会丢失。 当RabbitMQ退出或崩溃时,它会忘记队列和消息,此时需要将队列和消息标记为持久以确保消息不会丢失 #### 1.持久队列 注意 生产者消费者的队列定义都要设置为true,当生产者不同发送时,一旦启动了消费者,就会自动的进行消息的消费。 即使RabbitMQ重新启动,task_queue队列也不会丢失 **send** ``` boolean durable = true ; //代表队列持久化 channel.queueDeclare("task_queue",durable,假,假,空); ``` #### 2.消息持久性 现在我们需要将消息标记为持久的 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。 ``` channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); ``` ### 公平派遣 当任务的难度不一致时,一个工人可能会一直做务比较重的任务,而另一个工人几乎不会做什么事,RabbitMQ不知道任何关于这个,并将仍然均匀地发送消息。 发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地把第n条消息分发给第n个消费者。 ![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png =400x120) #### 解决方案 我们可以使用basicQos方法`prefetchCount = 1`设置。 这告诉RabbitMQ一次不能给一个工作者多个消息。或者换句话说,不要向工作人员发送新消息,直到处理并确认了前一个消息。 ``` System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); int prefetchCount = 1 ; channel.basicQos(prefetchCount); ```