案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
****
[TOC]
# 1. 开启发布确认
发布确认默认是没有开启的,如果要开启需要在生产者调用方法 `channel.confirmSelect`,每当你想开启要发布确认,都需要在 channel 上调用该方法。
<br/>
# 2. 发布确认策略
发布确认策略共有三种:单个确认发布、批量确认发布、异步确认发布。
## 2.1 单个确认发布
这是一种简单的确认方式,它是一种<mark>同步确认发布</mark>的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
```java
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
}
```
<br/>
这种确认方式有一个最大的缺点就是:**发布速度特别的慢**,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
<br/>
## 2.2 批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:**当发生故障导致发布出现问题时,不知道是哪个消息出现问题了**,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是**同步的**,也一样阻塞消息的发布。
```java
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
}
```
<br/>
## 2.3 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
![](https://img.kancloud.cn/19/40/19409e173c27234d97c50ec3dabb9ca2_1397x597.jpg)
```java
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/*
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目,只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/*
* 确认收到消息的一个回调
* sequenceNumber: 消息序列号
* multiple: 为true则可以确认小于等于当前序列号的消息、false只确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
} else {
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
/*
* 没有被确认消息的一个回调
*/
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
/*
* 添加一个异步确认的监听器
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/*
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
```
<br/>
## 2.4 如何处理异步未确认消息
最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 `ConcurrentLinkedQueue` 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
<br/>
## 2.5 3种发布确认速度对比
* 单独发布消息:同步等待确认,简单,但吞吐量非常有限。
* 批量发布消息: 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
* 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
```java
public static void main(String[] args) throws Exception {
publishMessageIndividually();
publishMessageBatch();
publishMessageAsync();
// 运行结果
// 发布1000个单独确认消息,耗时457ms
// 发布1000个批量确认消息,耗时172ms
// 发布1000个异步确认消息,耗时64ms
}
```
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡