多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01 **** ![](https://img.kancloud.cn/2e/67/2e6750ce18e53e926d773feefc21ba2f_1391x536.jpg) 将通过不同的死信来源演示上图的消息消费过程。 [TOC] # 1. 消息 TTL 过期 **1. 生产者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间为 10000 ms AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); //生产者生产10条消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)C1消费者 ```java public class Consumer01 { //普通交换机 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //声明普通交换机和死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(16); //正常队列设置死信交换机。x-dead-letter-exchange是固定值,不可以随便写 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信routing-key。x-dead-letter-routing-key是固定值,不可以随便写 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } } ``` (2)C2消费者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** (1)先启动C1消费者,然后将其关闭。 (2)启动生产者生产10条消息,就得到如下信息。 ![](https://img.kancloud.cn/e6/b5/e6b5f292d5c347c2a4ddffd8d459260b_1194x619.jpg) (3)启动C2消费者,C2消费了死信队列的消息。 ``` -----------Consumer02消费者收到了死信队列的消息----------- Consumer02[接收到死信队列的消息]: info1 Consumer02[接收到死信队列的消息]: info2 Consumer02[接收到死信队列的消息]: info3 Consumer02[接收到死信队列的消息]: info4 Consumer02[接收到死信队列的消息]: info5 Consumer02[接收到死信队列的消息]: info6 Consumer02[接收到死信队列的消息]: info7 Consumer02[接收到死信队列的消息]: info8 Consumer02[接收到死信队列的消息]: info9 Consumer02[接收到死信队列的消息]: info10 ``` <br/> # 2. 队列达到最大长度 **1. 生产者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //生产者生产10条消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)C1消费者 ```java public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(16); //正常队列设置死信交换机。x-dead-letter-exchange是固定写法,不可以自定义 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信routing-key。x-dead-letter-routing-key是固定写法,不可以自定义 params.put("x-dead-letter-routing-key", "lisi"); //设置正常队列长度为6 params.put("x-max-length", 6); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } } ``` (2)C2消费者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** (1)删除队列。 ```shell [root@localhost ~]# rabbitmqctl delete_queue dead-queue [root@localhost ~]# rabbitmqctl delete_queue normal-queue ``` (2)先启动C1消费者,然后将其关闭。 (3)启动生产者生产10条消息,就得到如下信息。 ![](https://img.kancloud.cn/51/98/5198102296c93acef9f6c98af8cdab36_1458x138.jpg) 可以看到因为在C1中设置队列最大长度为6(`params.put("x-max-length", 6)`),所以正常队列只能存放6条消息,超过6条消息的被存储到了死信队列中。 (4)启动C2消费者,C2消费了死信队列的消息。 ``` -----------Consumer02消费者收到了死信队列的消息----------- Consumer02[接收到死信队列的消息]: info1 Consumer02[接收到死信队列的消息]: info2 Consumer02[接收到死信队列的消息]: info3 Consumer02[接收到死信队列的消息]: info4 ``` (5)启动C1消费者,消费正常队列中的6条消息。 ``` -----------Consumer01消费者收到了死信队列的消息----------- Consumer01[接收到的消息]: info5 Consumer01[接收到的消息]: info6 Consumer01[接收到的消息]: info7 Consumer01[接收到的消息]: info8 Consumer01[接收到的消息]: info9 Consumer01[接收到的消息]: info10 ``` <br/> # 3. 消息被拒 消息被拒只需要在消费端调用下面的方法即可。 ```java channel.basicReject(long var1, boolean var3) ``` **1. 生产者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //生产者生产10条消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生产者发出消息]: " + message); } } } } ``` **2. 两个消费者** (1)C1消费者 ```java public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机。x-dead-letter-exchange是固定写法,不可以随便定义 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信。x-dead-letter-routing-key是固定写法,不可以随便定义。 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); if (message.equals("info5")) { System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message + ", 但拒绝签收该消息."); //拒绝消费info5消息 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> { }); } } ``` (2)C2消费者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信队列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信队列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 测试** (1)删除队列。 ```shell [root@localhost ~]# rabbitmqctl delete_queue dead-queue [root@localhost ~]# rabbitmqctl delete_queue normal-queue ``` (2)启动C1消费者,C2消费者,最后启动生产者。 ``` -----------Producer生产者生产了10条消息----------- Producer[生产者发出消息]: info1 Producer[生产者发出消息]: info2 Producer[生产者发出消息]: info3 Producer[生产者发出消息]: info4 Producer[生产者发出消息]: info5 Producer[生产者发出消息]: info6 Producer[生产者发出消息]: info7 Producer[生产者发出消息]: info8 Producer[生产者发出消息]: info9 Producer[生产者发出消息]: info10 -----------Consumer01消费者拒绝签收info5消息----------- Consumer01[接收到的消息]: info1 Consumer01[接收到的消息]: info2 Consumer01[接收到的消息]: info3 Consumer01[接收到的消息]: info4 Consumer01[接收到的消息]: info5, 但拒绝签收该消息. Consumer01[接收到的消息]: info6 Consumer01[接收到的消息]: info7 Consumer01[接收到的消息]: info8 Consumer01[接收到的消息]: info9 Consumer01[接收到的消息]: info10 -----------Consumer02消费者收到了死信队列的消息----------- Consumer02[接收到死信队列的消息]: info5 ```