💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# RabbitMQ 基于AMQP协议,由Erlang语言开发,稳定性高,不丢失数据,与spring框架集成度高,使用非常广泛。 AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。 【参考:https://www.jianshu.com/p/284b1b928ee1】 基于AMQP协议的消息队列的架构: :-: ![](https://img.kancloud.cn/e2/9c/e29cf9b29532a57b9f291e31c7f8b0ae_898x521.png) &nbsp; ## 一、Windows系统安装RabbitMQ 1. 安装Erlang环境,并将bin目录添加进环境变量;下载地址:[https://www.erlang.org/downloads](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.erlang.org%2Fdownloads) 2. 安装RabbitMQ,并将sbin目录添加进环境变量;下载地址:[https://www.rabbitmq.com/install-windows.html](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.rabbitmq.com%2Finstall-windows.html)。 3. 激活可视化管理插件: sbin目录下运行 ~~~ rabbitmq-plugins.bat enable rabbitmq_management ~~~ 4. 点击rabbitmq-server.bat启动服务,并在浏览器中访问http://localhost:15672,使用默认账号guest,密码guest登录。 &nbsp; ## 二、消息队列通信模式 ### 2.1 直连模式 :-: ![](https://img.kancloud.cn/ad/4f/ad4fa07d999e8a1cd8ba058cf18e53b1_344x85.png) 生成者生产的消息直接放在队列中,而不用通过交换机,消费者直接从队列中拿取数据。 ### 2.2 任务模型 Work Queue,或者称为Task Queue, 任务模型。在某些场景下可能会导致生产的消息比消费的消息要多,这个时候就可以让多个消费者共享一个消息队列,每当一条消息被消费的时候就会消失,避免任务的重复执行。 :-: ![](https://img.kancloud.cn/3e/32/3e32100b76f55b63c14fc3d5aaa9e9f3_378x140.png) 在默认的情况下,会采用轮询的负载均衡方式给每个消费者平均分配消息。 &nbsp; ### 2.3 广播模型 fanout 扇出或称广播,在这种模式中需要我们设置交换机,并且会为每个消费生成临时队列,这些临时队列为消费者所独有。这些临时队列中共享交换机中的数据。`即同个消息可以有多个消费者使用。` :-: ![](https://img.kancloud.cn/09/aa/09aadd0925dc164dc39381d7735f8fff_626x196.png) 与任务模型不同的是每个消息被多个消费者消费。 &nbsp; ### 2.4 订阅模型 routing 路由订阅模型(direct) 广播模型会将所有的消息都共享给所有的临时队列,有时候可能需要区分消费者能够消费的消息,这个时候就要用到订阅模型。例如在日志系统中,error级别的要持久化到磁盘中,而info,error,warning等级别的只要打印到控制台就行了,这个使用就可以使用订阅模型对不同的消费者能够消费的信息进行区分。**push推送模型** :-: ![](https://img.kancloud.cn/58/c2/58c2461aef425bb226e8e0f9cad3a70e_737x218.png) ### 2.5 动态路由模型 Topic模型,与订阅模型相比,动态路由模型可以通过通配符的方式类配置routing key的名称。 这种模式下建议routing key的命名方式为:`name1.name2` 通配符的规则如下: ~~~  * 匹配一个单词  # 匹配零个或多个单词 ~~~ &nbsp; ## 三、与Spring Boot整合 1. 导入依赖 ~~~ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.1</version> </dependency> ~~~ 2. 配置 ~~~ spring: rabbitmq: host: localhost port: 5672 virtual-host: demo2 username: demo2User password: 123 ~~~ 3. 在生产者注入RabbitTemplate,调用API生产消息。 4. 在消费者中使用注解@RabbitListener表明是一个消费者。 &nbsp; ### 3.1直连模式使用 1. 生产者 ~~~ @SpringBootTest(classes = RabbitmqDemoApplication.class) public class RabbitmqTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void hello() { rabbitTemplate.convertAndSend("hello", "hello for rabbitmq!"); } } ~~~ convertAndSend参数中第一个参数是队列名称,第二个参数是要传递的消息。 2. 消费者 ~~~ @Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloConsumer { @RabbitHandler public void receive(String message) { System.out.println("message = " + message); } } ~~~ 在这种模式中等到有消费者的时候才会创建消息队列。 @RabbitListener中监听队列“hello”的内容,@RabbitHandler注解处理数据的方法。@Queue表示具体的队列,其主要的参数有: ``` name:队列名称、默认 durable:是否进行持久化 exclusive:是否独占 autoDelete:是否自动删除 ``` &nbsp; ### 3.2 任务模式 1. 生产者 ~~~ @Test public void workTest() { for (int i = 0; i < 1000; i++) { rabbitTemplate.convertAndSend("work", "test work queue for rabbitmq!"); } } ~~~ 2. 消费者 ~~~ @Component public class WorkConsumer { @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message) { System.out.println("消费者1 = " + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message) { System.out.println("消费者2 = " + message); } } ~~~ 在一个类中配置多个消费者。 &nbsp; ### 3.3 广播模型 需要用到交换机。 1. 生产者 ~~~ @Test public void publicTest() { for (int i = 0; i < 100; i++) { // 交换机名称、路由、消息对象 rabbitTemplate.convertAndSend("logs", "", "message:" + i); } } ~~~ 2. 消费者 ~~~ @Component public class PublishConsumer { @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "logs", type = "fanout"))}) public void receive1(String message) { System.out.println("consumer1 = " + message); } @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "logs", type = "fanout"))}) public void receive2(String message) { System.out.println("consumer2 = " + message); } } ~~~ `bindings属性`用来将该消费和交换机进行绑定 `@QueueBinding`队列绑定; `value属性`:要绑定的队列,直接@Queue为临时生成; `exchange属性`:要绑定的交换机; `@Exchange()`: value属性为交换机的名称,type为类型,这种模式中设置为“fanout”。 &nbsp; ### 3.4 路由模型 在广播模式的基础上添加路由。 1. 生产者 ~~~ @Test public void routingTest() { String routeErrorKey = "error"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend("routing", routeErrorKey, "routing ["+ routeErrorKey +"] message for rabbitmq"); } String routeInfoKey = "info"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend("routing", routeInfoKey, "routing ["+ routeInfoKey +"] message for rabbitmq"); } } ~~~ 2. 消费者 ~~~ @Component public class RoutingConsumer { @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "routing", type = "direct"), key = {"info"})}) public void receive1(String message) { System.out.println("info routing = " + message); } @RabbitListener( bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "routing", type = "direct"), key = {"error"})}) public void receive2(String message) { System.out.println("error routing = " + message); } } ~~~ 设置交换机关注的key即可。 &nbsp; ### 3.5 动态路由 key由通配符来设定: 1. 生产者 ~~~ @Test public void topicTest() { String routKeyName = "user.save.delete"; rabbitTemplate.convertAndSend("topic", routKeyName, "topic [" + routKeyName + "] message for rabbitmq"); } ~~~ 2. 消费者 ~~~ @Component public class TopicConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.*"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.#"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } } ~~~ &nbsp; ## 四、应用场景 ### 4.1 异步处理 `场景:用户注册后,可能需要发送注册邮件和短信,传统的方式有串行方式和并行方式` * 串行方式: 将注册信息写入数据库后,发送邮件后再发送短信,三个任务完成后才返回给客户端。 :-: ![](https://img.kancloud.cn/54/5b/545bb203777c6b29f84020913889e097_660x131.png) * 并行的方式:将注册信息写入数据库后,同时发送邮件和短信,以上三个任务完成后才返回给客户端。 :-: ![](https://img.kancloud.cn/0f/83/0f83e5746d9fc0a882dafc568eeb80b2_557x247.png) ~~~  发送邮件和短信并不是注册必须的,只是一种通知和确认,客户端不应该等待着两者的完成。 ~~~ * 消息队列方式:将发邮件和短信的业务交给消息队列进行异步处理。 :-: ![](https://img.kancloud.cn/c1/e8/c1e8800bb0d2219fda4138fb976f3dba_805x224.png) 额外的时间为写入消息队列的时间。主程序并不进行等待。 `使用广播模型` &nbsp; ### 4.2 应用解耦 `场景:用户下订单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。` :-: ![](https://img.kancloud.cn/5f/70/5f70c113192c8f37f01324431676a07b_377x103.png) 这样做的缺点: 当库存系统出现故障时,订单就会失败。订单系统和库存系统是高度耦合的存在。 引入消息队列后: :-: ![](https://img.kancloud.cn/d6/59/d6593126a84263dbcae33659dabd514d_486x226.png) * 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 * 库存系统:订阅下单信息,获取下单消息,进行库存操作。就是库存系统出现故障,订单也会保存在消息队列中,确保消息不丢失。 &nbsp; ### 流量削峰 `场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。` 作用: 1. 可以控制活动人数,超过一定阈值时直接抛弃用户请求 2. 可以缓解短时间高流量压垮应用 :-: ![](https://img.kancloud.cn/45/d6/45d623881d53e34547b6f0c545f1416a_547x123.png)