多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # RabbitMQ整体架构 * Server:又称为Broker。接收客户端连接,实现AMQP的服务器实体。 * Connection:连接,应用程序与Broker的网络连接。 * Channel:信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。 * Message:消息。服务器和应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成。 ![](https://img.kancloud.cn/7d/54/7d54d1f5401e497ca5324eeb218a624b_1598x910.png) -- ![](https://box.kancloud.cn/2d33bb527e9b23d56073660e66baea5d_1407x776.png) * 左侧 P 代表⽣产者,也就是往 RabbitMQ 发消息的程序。 * 中间即是 RabbitMQ,其中包括了交换机和队列。 * 右侧 C 代表消费者,也就是往 RabbitMQ 拿消息的程序 那么,其中⽐较重要的概念有 4 个,分别为:虚拟主机、交换机、队列和绑定。 * 虚拟主机:⼀个虚拟主机持有⼀组交换机、队列和绑定,为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,**⽤户只能在虚拟主机的粒度进⾏权限控制**。因此,如果需要禁⽌ A 组访问 B 组的交换 机/队列/绑定,必须为 A 和 B 分别创建⼀个虚拟主机,每⼀个 RabbitMQ 服务器都有⼀个默认的虚拟主 机“/”。 * 交换机:Exchange ⽤于转发消息,但是它不会做存储,如果没有 Queue bind 到 Exchange 的话,它会 直接丢弃掉 Producer 发送过来的消息。 这⾥有⼀个⽐较重要的概念:**路由键** 。消息到交换机的时候,交互机会转发到对应的队列中,那么究 竟转发到哪个队列,就要根据该路由键。 * 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系 # 交换机(Exchange) **特别的Exchange** 默认的Exchange(名字为空,AMQP default) 1. 默认的`Exchange`不能进行`Binding`操作 2. 任何发送到该`Exchange`的消息都会被转发到`Routing key`指定的`Queue`中 3. 如果`vhost`中不存在`Routing key`中指定的队列名,则该消息会被抛弃。 ![](https://img.kancloud.cn/cf/b1/cfb1f73d6258d009d7a4e48d565114ec_295x305.png) 交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启⽤ ack 模式后,交换机找不 到队列,会返回错误。交换机有四种类型:Direct、topic、Headers and Fanout * Direct:其类型的⾏为是“先匹配、再投送”,即在绑定时设定⼀个 **routing\_key**,消息的 **routing\_key** 匹配时,才会被交换器投送到绑定的队列中去。 * Topic:按规则转发消息(最灵活) * Headers:设置 header attribute 参数类型的交换机。 * Fanout:转发消息到所有绑定队列。 ## Direct Exchange Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据 key 全⽂匹配去寻找队列。**完全匹配** ![](https://img.kancloud.cn/a7/fe/a7fe40818139bee3c49c1776efe7060f_528x181.png) ## Topic Exchange Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义⼀种路由模式, 那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。 在这种交换机模式下: * 路由键必须是⼀串字符,⽤句号(`.`)隔开,⽐如 `agreements.us`,或者 `agreements.eu.stockholm`等 * 路由模式必须包含⼀个 星号(`* `),主要⽤于匹配路由键指定位置的⼀个单词,⽐如,⼀个路由模式是 这样⼦,`agreements..b.*`,那么就只能匹配路由键是这样⼦的,第⼀个单词是 agreements,第四个单 词是 b;井号(`#`)就表示相当于⼀个或者多个单词,例如⼀个匹配模式是 `agreements.eu.berlin.#`,那 么,以 `agreements.eu.berlin` 开头的路由键都是可以的。 具体代码发送的时候还是⼀样,第⼀个参数表示交换机,第⼆个参数表示 routing key,第三个参数即消息。 如下: ~~~ rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is itMQ!"); ~~~ Topic 和 Direct 类似, 只是匹配上⽀持了“模式”,在“点分”的 routing_key 形式中, 可以使⽤两个通配符: * `* `表示⼀个单词; * `#`表示零个或多个词 单词和单词之间需要用.隔开。 `*,# `只能写在.号左右,且不能挨着字符 ![](https://img.kancloud.cn/ac/b0/acb033e83041eb57107ce3ee66b8304e_619x286.png) ## Headers Exchange Headers 也是根据规则匹配,相较于 Direct 和 Topic 固定地使⽤ routing_key,headers 则是⼀个⾃定义匹配 规则的类型,根据发送的消息内容中的headers属性进行匹配. 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue 将消息中的`headers`与该`Exchange`相关联的所有`Binging`中的参数进行匹配,如果匹配上了,则发送到该`Binding`对应的`Queue`中 **匹配规则** 如果`Binding`中的 `x-match = all`:表示所有的键值对都匹配才能转发到消息。 `x-match = any`: 表示只要有键值对匹配就能转发消息。 **注意** 1. `Binging`的时候,至少需要指定两个参数,其中的一个是`x-match = all`或`x-match = any`。 2. `Binging`的时候,不需要指定`Routing key` 3. 发送消息的时候,不需要指定`Routing key` 4. 转发消息的时候,忽略`Routing key` 5. 如果是`x-match = all`则发送的`headers`不能比`bingding`的参数少,否则匹配不上 ## Fanout Exchange `Fanout Exchange`这种`exchange`效率最高,`fanout > direct > topic` Fanout Exchange 消息⼴播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果 配置了 `routing_key` 会被忽略. **需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。** ![](https://img.kancloud.cn/45/40/4540afe8986c30ec3bd36166b83fa749_621x237.png) # Queues属性 * Durability:是否持久化,Durable是,Transient是否。如果不持久化,那么在服务器宕机或重启之后Queue就会丢失。 * Auto delete:如果选择yes,当最后一个消费者不在监听Queue的时候,该Queue就会自动删除,一般选择false。 * Arguments:AMQP协议留给AMQP实现者扩展使用的。 x-message-ttl:一个消息推送到队列中的存活时间。设置的值之后还没消费就会被删除。 x-expires:在自动删除该队列的时候,可以使用该队列的时间。 x-max-length:在队列头部删除元素之前,队列可以包含多少个(就绪)消息,如果再次向队列中发送消息,会删除最早的那条消息,用来控制队列中消息的数量。 x-max-length-bytes:在队列头部删除元素之前,队列的总消息体的大小,用来控制队列中消息的总大小。 x-dead-letter-exchange:当消息被拒绝或者消息过期,消息重新发送到的交换机(Exchange)的可选名称。 x-dead-letter-routing-key:当消息被拒绝或者消息过期,消息重新发送到的交换机绑定的Route key的名称,如果没有设置则使用之前的Route key。 x-max-priority:队列支持的最大优先级数,如果没有设置则不支持消息优先级 x-queue-mode:将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少RAM使用; 如果未设置,队列将保持在内存中的缓存,以尽可能快地传递消息。 x-queue-master-locator:将队列设置为主位置模式,确定在节点集群上声明队列主节点所在的规则。 # Message详解 消息。服务器和应用程序之间传送的数据,本质上就是一段数据,由Properties和Payload(body)组成。 ![](https://img.kancloud.cn/af/ee/afee2fc72da28adca018b3f58c5dd324_518x192.png) Delivery mode:是否持久化,如果未设置持久化,转发到queue中并未消费则重启服务或者服务宕机则消息丢失。 Headers:头信息,是由一个或多个健值对组成的,当固定的Properties不满足我们需要的时候,可以自己扩展。 **Properties(属性)** content\_type:传输协议 content\_encoding:编码方式 priority:优先级 correlation\_id:rpc属性,请求的唯一标识。 reply\_to:rpc属性, expiration:消息的过期时间 message\_id:消息的id timestamp:消息的时间戳 ... # 持久化 如何保证消息的不丢失,三个地方做到持久化。 1. Exchange需要持久化。 2. Queue需要持久化。 3. Message需要持久化。 # 消息流转 ![](https://box.kancloud.cn/f7590053496249c71f4fce5492736748_1449x656.png) 把消息路由到Exchange的指定Message Queue 假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示: 1. P1生产消息,发送给服务器端的Exchange 2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1 3. Queue1收到消息,将消息发送给订阅者C1 4. C1收到消息,发送ACK给队列确认收到消息 5. Queue1收到ACK,删除队列中缓存的此条消息