# **1. 分析数据丢失的原因**
![](https://img.kancloud.cn/99/34/9934d9a1a9db8098cdbb72b079ee4a9e_681x180.png)
可以看出,一条消息整个过程要经历两次的网络传输:**从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者**。
**在消费者未消费前存储在队列(Queue)中**。
所以可以知道,有三个场景下是会发生消息丢失的:
* 存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。
* 生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。
* 消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。
针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。
![](https://img.kancloud.cn/00/dd/00ddeecbc84b2b44de389e59ef40efc9_685x204.png)
# 2. 防丢失策略
## 2.1 持久化
为了防止rabbitmq故障,导致数据丢失,详见消息持久化
## 2.2 生产者
**在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据**。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:
![](https://img.kancloud.cn/95/f5/95f58764711980a333f7a66795bf7e50_658x303.png)
从上图中可以看到是通过两个回调函数**confirm()、returnedMessage()** 进行通知。
一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数**confirm()**。第二步从Exchange路由分配到Queue中,对应回调函数则是**returnedMessage()**。
### 2.2.1 配置生产者消费回调
~~~text
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
# publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
~~~
### 2.2.2 回调函数
~~~text
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* 监听消息是否到达Exchange
*
* @param correlationData 包含消息的唯一标识的对象
* @param ack true 标识 ack,false 标识 nack
* @param cause nack 投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投递成功~消息Id:{}", correlationData.getId());
} else {
logger.error("消息投递失败,Id:{},错误提示:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息没有路由到队列,获得返回的消息");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
~~~
### 2.2.3 生产者实现
~~~text
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
~~~
大功告成!接下来我们进行测试,发送一条消息,我们可以控制台:
![](https://img.kancloud.cn/6b/d4/6bd440fe1b0ac605831950c5115988e2_1250x95.png)
**假设发送一条信息没有路由匹配到队列,可以看到如下信息:**
![](https://img.kancloud.cn/81/6f/816fa29a2db6cf68a1702abe994157ac_720x73.png)
## 2.3 消费端
消费者需要回复ack