💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
:-: ![](https://img.kancloud.cn/fe/32/fe320e4910513d1d958fbc5c12e4de27_1357x187.jpg) 图1:架构图 ![](https://img.kancloud.cn/5e/4e/5e4e24e553bae2d459812007c2931295_1396x341.jpg) 图2:确认机制 下面演示图1的消费过程,图2是发布确认的确认机制。 <br/> 步骤如下: **1. 先创建一个SpringBoot项目** ```xml <dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> ``` **2. `resources/application.properties`** ```properties spring.rabbitmq.host=192.168.0.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.publisher-confirm-type=correlated ``` **`spring.rabbitmq.publisher-confirm-type`** 属性的取值如下: * `NONE`:禁用发布确认模式,是默认值。 * `CORRELATED`:发布消息成功到交换器后会触发回调方法。 * `SIMPLE`:经测试有两种效果,其一和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。 **3. 配置类** ```java @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String KEY_1 = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } /** * 交换机与队列绑定 */ @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(KEY_1); } } ``` **4. 生产者** ```java @Slf4j @RestController public class ProducerController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private CustomCallBack customCallBack; /** * @PostConstruct 注解标记的方法可以在项目启动时时执行一次,这里在项目启动时 * 注入CustomCallBack对象到RabbitTemplate中去 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(customCallBack); } @GetMapping("/confirm/send") public void confirmSend() { //指定消息id为1 CorrelationData corId = new CorrelationData("1"); //convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId); log.info("生产消息: {}", "message#key1"); CorrelationData corId2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2); log.info("生产消息: {}", "message#key2"); } } ``` **5. 回调对象** ```java @Slf4j @Component public class CustomCallBack implements RabbitTemplate.ConfirmCallback { /** * 不论交换机是否收到消息该方法都会被调用 * * @param corData 消息相关数据 * @param ack 交换机是否收到消息。true收到、false没收到 * @param cause 收到或没收到的理由 */ @Override public void confirm(CorrelationData corData, boolean ack, String cause) { if (ack) { log.info("交换机已收到id={}的消息", corData.getId()); } else { log.info("交换机未收到id={}的消息,原因是:{}", corData.getId(), cause); } } } ``` **6. 消费者** ```java @Slf4j @Component public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMsg(Message message) { String msg = new String(message.getBody()); log.info("收到消息: {}", msg); } } ``` **7. 测试** 启动项目后访问 http://localhost:8080/confirm/send 生产了两条消息,控制台输出如下。 ``` ...] c.l.r.controller.ProducerController : 生产消息: message#key1 ...] c.l.r.controller.ProducerController : 生产消息: message#key2 ...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1 ...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=2的消息 ...] c.l.rabbitmq03.callback.CustomCallBack : 交换机已收到id=1的消息 ``` 可以看到,生产者发送了两条消息,第一条消息的 RoutingKey 为 `key1`,第二条消息的 RoutingKey 为`key2`,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所以第二条消息被直接丢弃了。