案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
****
![](https://img.kancloud.cn/2e/67/2e6750ce18e53e926d773feefc21ba2f_1391x536.jpg)
将通过不同的死信来源演示上图的消息消费过程。
[TOC]
# 1. 消息 TTL 过期
**1. 生产者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间为 10000 ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//生产者生产10条消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message);
}
}
}
}
```
**2. 两个消费者**
(1)C1消费者
```java
public class Consumer01 {
//普通交换机
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>(16);
//正常队列设置死信交换机。x-dead-letter-exchange是固定值,不可以随便写
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信routing-key。x-dead-letter-routing-key是固定值,不可以随便写
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消费者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 测试**
(1)先启动C1消费者,然后将其关闭。
(2)启动生产者生产10条消息,就得到如下信息。
![](https://img.kancloud.cn/e6/b5/e6b5f292d5c347c2a4ddffd8d459260b_1194x619.jpg)
(3)启动C2消费者,C2消费了死信队列的消息。
```
-----------Consumer02消费者收到了死信队列的消息-----------
Consumer02[接收到死信队列的消息]: info1
Consumer02[接收到死信队列的消息]: info2
Consumer02[接收到死信队列的消息]: info3
Consumer02[接收到死信队列的消息]: info4
Consumer02[接收到死信队列的消息]: info5
Consumer02[接收到死信队列的消息]: info6
Consumer02[接收到死信队列的消息]: info7
Consumer02[接收到死信队列的消息]: info8
Consumer02[接收到死信队列的消息]: info9
Consumer02[接收到死信队列的消息]: info10
```
<br/>
# 2. 队列达到最大长度
**1. 生产者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//生产者生产10条消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message);
}
}
}
}
```
**2. 两个消费者**
(1)C1消费者
```java
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//声明死信和普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>(16);
//正常队列设置死信交换机。x-dead-letter-exchange是固定写法,不可以自定义
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信routing-key。x-dead-letter-routing-key是固定写法,不可以自定义
params.put("x-dead-letter-routing-key", "lisi");
//设置正常队列长度为6
params.put("x-max-length", 6);
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消费者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 测试**
(1)删除队列。
```shell
[root@localhost ~]# rabbitmqctl delete_queue dead-queue
[root@localhost ~]# rabbitmqctl delete_queue normal-queue
```
(2)先启动C1消费者,然后将其关闭。
(3)启动生产者生产10条消息,就得到如下信息。
![](https://img.kancloud.cn/51/98/5198102296c93acef9f6c98af8cdab36_1458x138.jpg)
可以看到因为在C1中设置队列最大长度为6(`params.put("x-max-length", 6)`),所以正常队列只能存放6条消息,超过6条消息的被存储到了死信队列中。
(4)启动C2消费者,C2消费了死信队列的消息。
```
-----------Consumer02消费者收到了死信队列的消息-----------
Consumer02[接收到死信队列的消息]: info1
Consumer02[接收到死信队列的消息]: info2
Consumer02[接收到死信队列的消息]: info3
Consumer02[接收到死信队列的消息]: info4
```
(5)启动C1消费者,消费正常队列中的6条消息。
```
-----------Consumer01消费者收到了死信队列的消息-----------
Consumer01[接收到的消息]: info5
Consumer01[接收到的消息]: info6
Consumer01[接收到的消息]: info7
Consumer01[接收到的消息]: info8
Consumer01[接收到的消息]: info9
Consumer01[接收到的消息]: info10
```
<br/>
# 3. 消息被拒
消息被拒只需要在消费端调用下面的方法即可。
```java
channel.basicReject(long var1, boolean var3)
```
**1. 生产者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//生产者生产10条消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message);
}
}
}
}
```
**2. 两个消费者**
(1)C1消费者
```java
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//声明死信和普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机。x-dead-letter-exchange是固定写法,不可以随便定义
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信。x-dead-letter-routing-key是固定写法,不可以随便定义。
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) {
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message + ", 但拒绝签收该消息.");
//拒绝消费info5消息
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消费者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 测试**
(1)删除队列。
```shell
[root@localhost ~]# rabbitmqctl delete_queue dead-queue
[root@localhost ~]# rabbitmqctl delete_queue normal-queue
```
(2)启动C1消费者,C2消费者,最后启动生产者。
```
-----------Producer生产者生产了10条消息-----------
Producer[生产者发出消息]: info1
Producer[生产者发出消息]: info2
Producer[生产者发出消息]: info3
Producer[生产者发出消息]: info4
Producer[生产者发出消息]: info5
Producer[生产者发出消息]: info6
Producer[生产者发出消息]: info7
Producer[生产者发出消息]: info8
Producer[生产者发出消息]: info9
Producer[生产者发出消息]: info10
-----------Consumer01消费者拒绝签收info5消息-----------
Consumer01[接收到的消息]: info1
Consumer01[接收到的消息]: info2
Consumer01[接收到的消息]: info3
Consumer01[接收到的消息]: info4
Consumer01[接收到的消息]: info5, 但拒绝签收该消息.
Consumer01[接收到的消息]: info6
Consumer01[接收到的消息]: info7
Consumer01[接收到的消息]: info8
Consumer01[接收到的消息]: info9
Consumer01[接收到的消息]: info10
-----------Consumer02消费者收到了死信队列的消息-----------
Consumer02[接收到死信队列的消息]: info5
```
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡