🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 1. 死信队列 > 在发送异常或者网络波动时,消息不能被处理,导致业务异常,所以要把这种异常消息发送到死信队列处理这些异常消息,补救 ## 1.1 死信和死信队列 一、死信 > 当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。 1. 消息被否定确认,使用`channel.basicNack`或`channel.basicReject`,并且此时`requeue`属性被设置为`false`。 2. 消息在队列的存活时间超过设置的TTL时间(队列ttl-所有消息都生效,单条消息ttl)。 3. 消息队列的消息数量已经超过最大队列长度。 二、死信队列 > **“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。** 三、配置死信队列 1. 配置业务队列,绑定到业务交换机上 2. 为业务队列配置死信交换机和路由key 3. 为死信交换机配置死信队列 4. 交换机可以为任何类型【Direct、Fanout、Topic】 ## 1.2 实例 ### 1.2.1 配置 **1.手动确认消息** ``` spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual ``` **2.配置交换机和队列** 一个正常交换机绑定两个业务队列A和B(广播模式);A和B绑定一个死信交换机,由不同的key绑定, A队列消费者产生一个死信消息 ~~~ package com.tuna.mq.rabbitmq.config.deadletter; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea"; public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb"; public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb"; // 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange() { return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明业务队列A @Bean("businessQueueA") public Queue businessQueueA() { Map<String, Object> args = new HashMap<>(2); //x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 声明业务队列B @Bean("businessQueueB") public Queue businessQueueB() { Map<String, Object> args = new HashMap<>(2); //x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build(); } // 声明死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB() { return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 声明业务队列A绑定关系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } // 声明业务队列B绑定关系 @Bean public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } } ~~~ ### 1.2.2 消费者 1.正常消费者,Ack=false消息失败,A队列中的消息将会发到死信队列 ~~~ @Slf4j @Component public class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息A:{}", msg); boolean ack = true; Exception exception = null; try { if (msg.contains("deadletter")) { throw new RuntimeException("dead letter exception"); } } catch (Exception e) { ack = false; exception = e; } if (!ack) { log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = BUSINESS_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { log.info("收到业务消息B:{}", new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ~~~ 2.死信消费者 ~~~ @Component public class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { System.out.println("收到死信消息A:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到死信消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ~~~ ### 1.2.3 生产者 ~~~ @Component public class BusinessMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg) { rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg); } } ~~~ controller ~~~ @Autowired private BusinessMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg) { sender.sendMsg(msg); } ~~~ 1.发送正常消息 http://localhost:8080/mq/sendmsg?msg=msg 2.产生一个死信消息,如下A队列产生一个死信消息,他对应的死信队列会接收到这个消息 http://localhost:8080/mq/sendmsg?msg=deadletter ``` 2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息B:msg 2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息A:msg 2022-06-13 15:12:23.739 INFO 3920 --- [nio-8080-exec-4] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-KnERgpHzUz4thORpw0BXIQ identity=566c4da9] started 2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息A:deadletter 2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到业务消息B:deadletter 2022-06-13 15:12:23.744 ERROR 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 消息消费发生异常,error msg:dead letter exception java.lang.RuntimeException: dead letter exception at com.tuna.mq.rabbitmq.service.deadletter.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:26) at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_311] 收到死信消息A:deadletter ``` # 2. 延时队列 1. 先进先出的有序队列 2. 用来存放需要在指定时间被处理的元素的队列 ## 2.1 场景 可用于延时性较低、数据量大的定时任务:多少时间之后,做... 1. 订单在十分钟之内未支付则自动取消。 2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3. 账单在一周内未支付,则自动结算。 4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。 5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。 6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。 7. 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗? 如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。 但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所 有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。更重要的一点是,不!优!雅!没错,作为一名有追求的程序员,始终应该追求更优雅的架构和更优雅的代码风格,写代码要像写诗一样优美。 ## 2.2 实例 给消息设置了ttl,死信即为延时 1.生产者发送消息到交换机--->业务队列 2.在ttl事件内没有消费者读取消息,消息进入死信(延时)队列 3.延时队列消费者消费延时消息 有三种实现方式: 一、队列上设置超时,不投灵活 二、消息上设置超时,在超市时间点上不一定能发送超时消息 三、插件的方式 ### 2.2.1 插件的方式 1. 声明一个`x-delayed-message`类型的交换机 2. 发布带`x-delay`(指定延期时间)消息头的消息 3. 在超过制定时间后,消息发送到指定队列,被绑定这个队列的消费者消费 1.下载 打开官网下载:​ ​http://www.rabbitmq.com/community-plugins.html​ ,注意rabbitmq和插件的版本对应,3.x.x -3.x.x 2.将插件放入rabbitmq的plugins目录 ``` docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez 38ada6e58bba:/plugins ``` 3.启动插件 ``` docker exec -it 38ada6e58bba /bin/bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` 代码示例 1.配置插件支持的交换机,并绑定队列 ~~~ @Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); //指定低层交换机类型交换机类型 args.put("x-delayed-type", "direct"); //插件封装的交换机 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } } ~~~ args可以配置选项: ``` String name, 交换机名称 String type, 交换机消息类型(x-delayed-message) boolean durable, 是否持久化 boolean autoDelete,是否删除 Map arguments // 队列中的消息什么时候会自动被删除? arguments.put("x-message-ttl",10000); //设置过期时间 arguments.put("x-expires", 10000); //x-expires用于当多长时间没有消费者访问该队列的时候,该队列会自动删除, //x-max-length: arguments.put("x-max-length", 4); 用于指定队列的长度,如果不指定,可以认为是无限长,例如指定队列的长度是4,当超过4条消息,前面的消息将被删除,给后面的消息腾位 ``` 2.生产者发送延时消息 ~~~ @Slf4j @Component public class DelyMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMsg(String msg, Integer delayTime) { log.info("dely sender send a message: {}",msg); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> { a.getMessageProperties().setDelay(delayTime); return a; }); } } ~~~ 3. 消费者 ~~~ @Component public class DelyMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DelyMessageConsumer.class); @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveHeadersQueue(String message) { logger.info("dely consumer received message: {}", new String(message)); } } ~~~ 4. controller ~~~ @RequestMapping("dely/{message}/{timeOut}") public String dely(@PathVariable("message") String message,@PathVariable("timeOut") Integer timeOut) { delyMessageSender.sendDelayMsg(message, timeOut); return "success"; } ~~~ http://localhost:8080/mq/dely/hello/30000 输出 时隔30秒 ``` 2022-07-04 09:40:34.033 INFO 6316 --- [io-8080-exec-10] c.t.m.r.service.dely.DelyMessageSender : dely sender send a message: hello 2022-07-04 09:41:04.036 INFO 6316 --- [ntContainer#2-1] c.t.m.r.s.dely.DelyMessageConsumer : dely consumer received message: hello ```