ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # 1. 消息应答概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者在处理一个长的任务时只完成了部分突然挂掉了,会发生什么情况。 <br/> RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。 <br/> 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:<mark>消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了</mark>。 <br/> 消息应答方式有:自动应答和手动应答两种方式。 <br/> # 2. 自动应答 消息发送后立即被认为已经传送成功,这种模式需要在<mark>高吞吐量和数据传输安全性方面做权衡</mark>,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。 <br/> 另一方面这种模式可能导致消费者过载,<mark>因为没有对传递的消息数量进行限制</mark>,有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,<mark>所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用</mark>。 <br/> # 3. 手动应答 ## 3.1 消息自动重新入队 如果消费者由于某些原因失去连接(如通道已关闭、连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 了解到消息未完全处理,将对消息重新排队。 <br/> 如果此时其他消费者可以处理,该条消息将会很快被重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。 <br/> ## 3.2 手动应答实现 消息默认采用的是自动应答,改用手动应答有如下好处。 (1)消息消费过程中数据不丢失。 (2)可以批量应答并且减少网络拥堵。 <br/> 实现手动应答的 API 如下。 ```java public interface Channel extends ShutdownNotifier, AutoCloseable { // 用于肯定确认 // RabbitMQ 已知道该消息被处理成功,可以将其丢弃了 void basicAck(long var1, boolean var3) throws IOException; // 消费者拒绝消费 void basicNack(long var1, boolean var3, boolean var4) throws IOException; // 消费者拒绝消费 // 与 basicNack 相比少一个参数var4,当消息不被处理时直接丢弃 void basicReject(long var1, boolean var3) throws IOException; } ``` 这三个方法都有一个参数 var3,是 multiple 域。 当`var3=true`代表批量应答 channel 上未应答的消息。比如 channel 上有正在传送 tag 为 5、6、7、8 的消息。假设当前 tag=8,那么此时 5 — 8 的这些还未应答的消息都会被确认收到消息应答。 <br/> 当`var3=false`时,只会应答 tag=8 的消息, 5、6、7 这三个消息依然不会被确认收到消息应答。 <br/> 实现手动应答代码演示如下: **1. 工具类** ```java public class RabbitMQUtils { /** * 连接RabbitMQ服务器 */ public static Channel getChannel() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } } ``` ```java public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } ``` **2. 生产者** ```java public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); //发送消息 channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } } } ``` **3. 两个消费者** 在消费端调用手动应答 API 实现手动应答。 ```java public class Worker03 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); System.out.println("C1 等待接收消息处理时间较短"); //消息消费的时候如何处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:" + message); /** * 消息应答。 * basicAck(long var1, boolean var3) * var1: 消息标记tag * var3: 是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; /** * basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) * var1: 队列名称 * var2: 应答方式。true自动应答、false手动应答 */ channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }); } } ``` ```java public class Worker04 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); System.out.println("C2 等待接收消息处理时间较长"); //消息消费的时候如何处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:" + message); /** * 消息应答。 * basicAck(long var1, boolean var3) * var1: 消息标记tag * var3: 是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; /** * basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) * var1: 队列名称 * var2: 应答方式。true自动应答、false手动应答 */ channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }); } } ``` **4. 测试** (1)启动生产者 Task02,生产4条消息。 ![](https://img.kancloud.cn/89/d2/89d247ff917157dda9d042719ca7f6b1_1452x266.jpg) (2)启动两个消费者。 正常情况下两个消费者按照轮询机制分别获取两条消息。 ![](https://img.kancloud.cn/26/a8/26a8fbcfb011890c574d5b293f4ed739_1284x108.jpg) ![](https://img.kancloud.cn/91/14/9114ebde2740af04a190958c19be521f_1484x100.jpg) <br/> 演示消息自动重新入队:当生产者 Task02 发送消息 C4 的时候,立即把消费者 Worker04 停掉,就会发现本来应该由 Worker04 消费的消息 C4,被 Worker03 消费了,实现了消息自动重新入队。 ![](https://img.kancloud.cn/59/34/59347c41922ca3f6e08d7b81618ad3fa_1350x135.jpg) ![](https://img.kancloud.cn/4d/14/4d14649cd4d283d6c7ba84f023eb29e3_1368x84.jpg)