:-: ![](https://img.kancloud.cn/fe/32/fe320e4910513d1d958fbc5c12e4de27_1357x187.jpg)
图1:架构图
![](https://img.kancloud.cn/5e/4e/5e4e24e553bae2d459812007c2931295_1396x341.jpg)
图2:确认机制
下面演示图1的消费过程,图2是发布确认的确认机制。
<br/>
步骤如下:
**1. 先创建一个SpringBoot项目**
```xml
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
```
**2. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.publisher-confirm-type=correlated
```
**`spring.rabbitmq.publisher-confirm-type`** 属性的取值如下:
* `NONE`:禁用发布确认模式,是默认值。
* `CORRELATED`:发布消息成功到交换器后会触发回调方法。
* `SIMPLE`:经测试有两种效果,其一和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
**3. 配置类**
```java
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String KEY_1 = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 交换机与队列绑定
*/
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(KEY_1);
}
}
```
**4. 生产者**
```java
@Slf4j
@RestController
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CustomCallBack customCallBack;
/**
* @PostConstruct 注解标记的方法可以在项目启动时时执行一次,这里在项目启动时
* 注入CustomCallBack对象到RabbitTemplate中去
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(customCallBack);
}
@GetMapping("/confirm/send")
public void confirmSend() {
//指定消息id为1
CorrelationData corId = new CorrelationData("1");
//convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData)
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId);
log.info("生产消息: {}", "message#key1");
CorrelationData corId2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2);
log.info("生产消息: {}", "message#key2");
}
}
```
**5. 回调对象**
```java
@Slf4j
@Component
public class CustomCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 不论交换机是否收到消息该方法都会被调用
*
* @param corData 消息相关数据
* @param ack 交换机是否收到消息。true收到、false没收到
* @param cause 收到或没收到的理由
*/
@Override
public void confirm(CorrelationData corData, boolean ack, String cause) {
if (ack) {
log.info("交换机已收到id={}的消息", corData.getId());
} else {
log.info("交换机未收到id={}的消息,原因是:{}", corData.getId(), cause);
}
}
}
```
**6. 消费者**
```java
@Slf4j
@Component
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("收到消息: {}", msg);
}
}
```
**7. 测试**
启动项目后访问 http://localhost:8080/confirm/send 生产了两条消息,控制台输出如下。
```
...] c.l.r.controller.ProducerController : 生产消息: message#key1
...] c.l.r.controller.ProducerController : 生产消息: message#key2
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=2的消息
...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=1的消息
```
可以看到,生产者发送了两条消息,第一条消息的 RoutingKey 为 `key1`,第二条消息的 RoutingKey 为`key2`,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所以第二条消息被直接丢弃了。
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡