# RabbitMQ
基于AMQP协议,由Erlang语言开发,稳定性高,不丢失数据,与spring框架集成度高,使用非常广泛。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
【参考:https://www.jianshu.com/p/284b1b928ee1】
基于AMQP协议的消息队列的架构:
:-: ![](https://img.kancloud.cn/e2/9c/e29cf9b29532a57b9f291e31c7f8b0ae_898x521.png)
## 一、Windows系统安装RabbitMQ
1. 安装Erlang环境,并将bin目录添加进环境变量;下载地址:[https://www.erlang.org/downloads](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.erlang.org%2Fdownloads)
2. 安装RabbitMQ,并将sbin目录添加进环境变量;下载地址:[https://www.rabbitmq.com/install-windows.html](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.rabbitmq.com%2Finstall-windows.html)。
3. 激活可视化管理插件:
sbin目录下运行
~~~
rabbitmq-plugins.bat enable rabbitmq_management
~~~
4. 点击rabbitmq-server.bat启动服务,并在浏览器中访问http://localhost:15672,使用默认账号guest,密码guest登录。
## 二、消息队列通信模式
### 2.1 直连模式
:-: ![](https://img.kancloud.cn/ad/4f/ad4fa07d999e8a1cd8ba058cf18e53b1_344x85.png)
生成者生产的消息直接放在队列中,而不用通过交换机,消费者直接从队列中拿取数据。
### 2.2 任务模型
Work Queue,或者称为Task Queue, 任务模型。在某些场景下可能会导致生产的消息比消费的消息要多,这个时候就可以让多个消费者共享一个消息队列,每当一条消息被消费的时候就会消失,避免任务的重复执行。
:-: ![](https://img.kancloud.cn/3e/32/3e32100b76f55b63c14fc3d5aaa9e9f3_378x140.png)
在默认的情况下,会采用轮询的负载均衡方式给每个消费者平均分配消息。
### 2.3 广播模型
fanout 扇出或称广播,在这种模式中需要我们设置交换机,并且会为每个消费生成临时队列,这些临时队列为消费者所独有。这些临时队列中共享交换机中的数据。`即同个消息可以有多个消费者使用。`
:-: ![](https://img.kancloud.cn/09/aa/09aadd0925dc164dc39381d7735f8fff_626x196.png)
与任务模型不同的是每个消息被多个消费者消费。
### 2.4 订阅模型
routing 路由订阅模型(direct)
广播模型会将所有的消息都共享给所有的临时队列,有时候可能需要区分消费者能够消费的消息,这个时候就要用到订阅模型。例如在日志系统中,error级别的要持久化到磁盘中,而info,error,warning等级别的只要打印到控制台就行了,这个使用就可以使用订阅模型对不同的消费者能够消费的信息进行区分。**push推送模型**
:-: ![](https://img.kancloud.cn/58/c2/58c2461aef425bb226e8e0f9cad3a70e_737x218.png)
### 2.5 动态路由模型
Topic模型,与订阅模型相比,动态路由模型可以通过通配符的方式类配置routing key的名称。
这种模式下建议routing key的命名方式为:`name1.name2`
通配符的规则如下:
~~~
* 匹配一个单词
# 匹配零个或多个单词
~~~
## 三、与Spring Boot整合
1. 导入依赖
~~~
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.1</version>
</dependency>
~~~
2. 配置
~~~
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: demo2
username: demo2User
password: 123
~~~
3. 在生产者注入RabbitTemplate,调用API生产消息。
4. 在消费者中使用注解@RabbitListener表明是一个消费者。
### 3.1直连模式使用
1. 生产者
~~~
@SpringBootTest(classes = RabbitmqDemoApplication.class)
public class RabbitmqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void hello() {
rabbitTemplate.convertAndSend("hello", "hello for rabbitmq!");
}
}
~~~
convertAndSend参数中第一个参数是队列名称,第二个参数是要传递的消息。
2. 消费者
~~~
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloConsumer {
@RabbitHandler
public void receive(String message) {
System.out.println("message = " + message);
}
}
~~~
在这种模式中等到有消费者的时候才会创建消息队列。
@RabbitListener中监听队列“hello”的内容,@RabbitHandler注解处理数据的方法。@Queue表示具体的队列,其主要的参数有:
```
name:队列名称、默认
durable:是否进行持久化
exclusive:是否独占
autoDelete:是否自动删除
```
### 3.2 任务模式
1. 生产者
~~~
@Test
public void workTest() {
for (int i = 0; i < 1000; i++) {
rabbitTemplate.convertAndSend("work", "test work queue for rabbitmq!");
}
}
~~~
2. 消费者
~~~
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("消费者1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("消费者2 = " + message);
}
}
~~~
在一个类中配置多个消费者。
### 3.3 广播模型
需要用到交换机。
1. 生产者
~~~
@Test
public void publicTest() {
for (int i = 0; i < 100; i++) {
// 交换机名称、路由、消息对象
rabbitTemplate.convertAndSend("logs", "", "message:" + i);
}
}
~~~
2. 消费者
~~~
@Component
public class PublishConsumer {
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))})
public void receive1(String message) {
System.out.println("consumer1 = " + message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))})
public void receive2(String message) {
System.out.println("consumer2 = " + message);
}
}
~~~
`bindings属性`用来将该消费和交换机进行绑定
`@QueueBinding`队列绑定;
`value属性`:要绑定的队列,直接@Queue为临时生成;
`exchange属性`:要绑定的交换机;
`@Exchange()`: value属性为交换机的名称,type为类型,这种模式中设置为“fanout”。
### 3.4 路由模型
在广播模式的基础上添加路由。
1. 生产者
~~~
@Test
public void routingTest() {
String routeErrorKey = "error";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("routing", routeErrorKey,
"routing ["+ routeErrorKey +"] message for rabbitmq");
}
String routeInfoKey = "info";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("routing", routeInfoKey,
"routing ["+ routeInfoKey +"] message for rabbitmq");
}
}
~~~
2. 消费者
~~~
@Component
public class RoutingConsumer {
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "routing", type = "direct"), key = {"info"})})
public void receive1(String message) {
System.out.println("info routing = " + message);
}
@RabbitListener( bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "routing", type = "direct"), key = {"error"})})
public void receive2(String message) {
System.out.println("error routing = " + message);
}
}
~~~
设置交换机关注的key即可。
### 3.5 动态路由
key由通配符来设定:
1. 生产者
~~~
@Test
public void topicTest() {
String routKeyName = "user.save.delete";
rabbitTemplate.convertAndSend("topic", routKeyName, "topic [" + routKeyName + "] message for rabbitmq");
}
~~~
2. 消费者
~~~
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic", type = "topic"),
key = {"user.*"}
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic", type = "topic"),
key = {"user.#"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
~~~
## 四、应用场景
### 4.1 异步处理
`场景:用户注册后,可能需要发送注册邮件和短信,传统的方式有串行方式和并行方式`
* 串行方式: 将注册信息写入数据库后,发送邮件后再发送短信,三个任务完成后才返回给客户端。
:-: ![](https://img.kancloud.cn/54/5b/545bb203777c6b29f84020913889e097_660x131.png)
* 并行的方式:将注册信息写入数据库后,同时发送邮件和短信,以上三个任务完成后才返回给客户端。
:-: ![](https://img.kancloud.cn/0f/83/0f83e5746d9fc0a882dafc568eeb80b2_557x247.png)
~~~
发送邮件和短信并不是注册必须的,只是一种通知和确认,客户端不应该等待着两者的完成。
~~~
* 消息队列方式:将发邮件和短信的业务交给消息队列进行异步处理。
:-: ![](https://img.kancloud.cn/c1/e8/c1e8800bb0d2219fda4138fb976f3dba_805x224.png)
额外的时间为写入消息队列的时间。主程序并不进行等待。
`使用广播模型`
### 4.2 应用解耦
`场景:用户下订单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。`
:-: ![](https://img.kancloud.cn/5f/70/5f70c113192c8f37f01324431676a07b_377x103.png)
这样做的缺点:
当库存系统出现故障时,订单就会失败。订单系统和库存系统是高度耦合的存在。
引入消息队列后:
:-: ![](https://img.kancloud.cn/d6/59/d6593126a84263dbcae33659dabd514d_486x226.png)
* 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
* 库存系统:订阅下单信息,获取下单消息,进行库存操作。就是库存系统出现故障,订单也会保存在消息队列中,确保消息不丢失。
### 流量削峰
`场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。`
作用:
1. 可以控制活动人数,超过一定阈值时直接抛弃用户请求
2. 可以缓解短时间高流量压垮应用
:-: ![](https://img.kancloud.cn/45/d6/45d623881d53e34547b6f0c545f1416a_547x123.png)
- 第一章 Java基础
- ThreadLocal
- Java异常体系
- Java集合框架
- List接口及其实现类
- Queue接口及其实现类
- Set接口及其实现类
- Map接口及其实现类
- JDK1.8新特性
- Lambda表达式
- 常用函数式接口
- stream流
- 面试
- 第二章 Java虚拟机
- 第一节、运行时数据区
- 第二节、垃圾回收
- 第三节、类加载机制
- 第四节、类文件与字节码指令
- 第五节、语法糖
- 第六节、运行期优化
- 面试常见问题
- 第三章 并发编程
- 第一节、Java中的线程
- 第二节、Java中的锁
- 第三节、线程池
- 第四节、并发工具类
- AQS
- 第四章 网络编程
- WebSocket协议
- Netty
- Netty入门
- Netty-自定义协议
- 面试题
- IO
- 网络IO模型
- 第五章 操作系统
- IO
- 文件系统的相关概念
- Java几种文件读写方式性能对比
- Socket
- 内存管理
- 进程、线程、协程
- IO模型的演化过程
- 第六章 计算机网络
- 第七章 消息队列
- RabbitMQ
- 第八章 开发框架
- Spring
- Spring事务
- Spring MVC
- Spring Boot
- Mybatis
- Mybatis-Plus
- Shiro
- 第九章 数据库
- Mysql
- Mysql中的索引
- Mysql中的锁
- 面试常见问题
- Mysql中的日志
- InnoDB存储引擎
- 事务
- Redis
- redis的数据类型
- redis数据结构
- Redis主从复制
- 哨兵模式
- 面试题
- Spring Boot整合Lettuce+Redisson实现布隆过滤器
- 集群
- Redis网络IO模型
- 第十章 设计模式
- 设计模式-七大原则
- 设计模式-单例模式
- 设计模式-备忘录模式
- 设计模式-原型模式
- 设计模式-责任链模式
- 设计模式-过滤模式
- 设计模式-观察者模式
- 设计模式-工厂方法模式
- 设计模式-抽象工厂模式
- 设计模式-代理模式
- 第十一章 后端开发常用工具、库
- Docker
- Docker安装Mysql
- 第十二章 中间件
- ZooKeeper