ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # 1. 什么是持久化 使用手动应答可以处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后生产者发送过来的消息不丢失。 <br/> 默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要<mark>将队列和消息都标记为持久化</mark>。 <br/> # 2. 队列持久化 之前我们创建的队列都是非持久化的,rabbitmq 如果重启该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候将`var2=true`。 ```java try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { /** * 声明一个队列。 * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) * var1: 队列名称 * var2: 队列里面的消息是否持久化。false(默认)不持久化、true持久化 * var3: 该队列是否只供一个消费者进行消费,即消息是否进行共享。false共享、true共享 * var4: 是否自动删除。即当最后一个消费者断开连接后该队列是否自动删除。true自动删除、false不删除 * var5:其他参数 */ channel.queueDeclare("queue.hello", true, false, false, null); ... ``` <br/> >[info]注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会报出下面的错误。 ![](https://img.kancloud.cn/99/2a/992a4c93ba3c544fff5e503338e9007b_1556x50.jpg) ```shell -- 删除queueName队列的命令 rabbitmqctl delete_queue queueName ``` <br/> 以下为控制台中持久化与非持久化队列的 UI 显示区、这个时候即使重启 rabbitmq 队列也依然存在。 ![](https://img.kancloud.cn/21/bf/21bf58ff7ca5efc18e6daf187c89358c_1468x286.jpg) <br/> # 3. 消息持久化 要想让消息持久化需要在生产者添加属性`MessageProperties.PERSISTENT_TEXT_PLAIN` 。 ```java /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); ``` 将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。 <br/> 持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边的[发布确认](https://www.kancloud.cn/king_om/x_1_mq/2483259)章节。 <br/> # 4. 不公平分发 在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的话就会让处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。 <br/> 为了避免这种情况,我们可以在消费者设置参数 `channel.basicQos(1)`。 ```java Channel channel = RabbitMQUtils.getChannel(); //消息消费的时候如何处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); channel.basicQos(1); ... }; ``` <br/> 消费者设置 `channel.basicQos(1)`后,可以看到 channel 的 Prefetch 的数量增加。 ![](https://img.kancloud.cn/c0/e8/c0e8f86330409ce45cc53badc8bbd9f0_1467x562.jpg) ![](https://img.kancloud.cn/29/e5/29e552724548706c3388a9de3fe0098c_1444x284.jpg) 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配其它任务给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。 <br/> # 5. 预取值 RabbitMQ 消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息。另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能<mark>限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题<mark>。 <br/> 这个时候就可以在消费者通过使用 `channel.basicQos` 方法设置<mark>预取计数</mark>值来完成。<mark>该值定义通道上允许的未确认消息的最大数量</mark>。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知到这个情况并再发送一条消息。 ![](https://img.kancloud.cn/ac/55/ac559bb619c714a8cadc4a54b4d45010_1300x260.jpg) <br/> 消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然<mark>自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗</mark>(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同, 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。 <br/> 预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。