[TOC]
# RabbitMQ整体架构
* Server:又称为Broker。接收客户端连接,实现AMQP的服务器实体。
* Connection:连接,应用程序与Broker的网络连接。
* Channel:信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
* Message:消息。服务器和应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成。
![](https://img.kancloud.cn/7d/54/7d54d1f5401e497ca5324eeb218a624b_1598x910.png)
--
![](https://box.kancloud.cn/2d33bb527e9b23d56073660e66baea5d_1407x776.png)
* 左侧 P 代表⽣产者,也就是往 RabbitMQ 发消息的程序。
* 中间即是 RabbitMQ,其中包括了交换机和队列。
* 右侧 C 代表消费者,也就是往 RabbitMQ 拿消息的程序
那么,其中⽐较重要的概念有 4 个,分别为:虚拟主机、交换机、队列和绑定。
* 虚拟主机:⼀个虚拟主机持有⼀组交换机、队列和绑定,为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,**⽤户只能在虚拟主机的粒度进⾏权限控制**。因此,如果需要禁⽌ A 组访问 B 组的交换 机/队列/绑定,必须为 A 和 B 分别创建⼀个虚拟主机,每⼀个 RabbitMQ 服务器都有⼀个默认的虚拟主 机“/”。
* 交换机:Exchange ⽤于转发消息,但是它不会做存储,如果没有 Queue bind 到 Exchange 的话,它会 直接丢弃掉 Producer 发送过来的消息。
这⾥有⼀个⽐较重要的概念:**路由键** 。消息到交换机的时候,交互机会转发到对应的队列中,那么究 竟转发到哪个队列,就要根据该路由键。
* 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系
# 交换机(Exchange)
**特别的Exchange**
默认的Exchange(名字为空,AMQP default)
1. 默认的`Exchange`不能进行`Binding`操作
2. 任何发送到该`Exchange`的消息都会被转发到`Routing key`指定的`Queue`中
3. 如果`vhost`中不存在`Routing key`中指定的队列名,则该消息会被抛弃。
![](https://img.kancloud.cn/cf/b1/cfb1f73d6258d009d7a4e48d565114ec_295x305.png)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启⽤ ack 模式后,交换机找不 到队列,会返回错误。交换机有四种类型:Direct、topic、Headers and Fanout
* Direct:其类型的⾏为是“先匹配、再投送”,即在绑定时设定⼀个 **routing\_key**,消息的 **routing\_key** 匹配时,才会被交换器投送到绑定的队列中去。
* Topic:按规则转发消息(最灵活)
* Headers:设置 header attribute 参数类型的交换机。
* Fanout:转发消息到所有绑定队列。
## Direct Exchange
Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据 key 全⽂匹配去寻找队列。**完全匹配**
![](https://img.kancloud.cn/a7/fe/a7fe40818139bee3c49c1776efe7060f_528x181.png)
## Topic Exchange
Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义⼀种路由模式, 那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
* 路由键必须是⼀串字符,⽤句号(`.`)隔开,⽐如 `agreements.us`,或者 `agreements.eu.stockholm`等
* 路由模式必须包含⼀个 星号(`* `),主要⽤于匹配路由键指定位置的⼀个单词,⽐如,⼀个路由模式是 这样⼦,`agreements..b.*`,那么就只能匹配路由键是这样⼦的,第⼀个单词是 agreements,第四个单 词是 b;井号(`#`)就表示相当于⼀个或者多个单词,例如⼀个匹配模式是 `agreements.eu.berlin.#`,那 么,以 `agreements.eu.berlin` 开头的路由键都是可以的。
具体代码发送的时候还是⼀样,第⼀个参数表示交换机,第⼆个参数表示 routing key,第三个参数即消息。 如下:
~~~
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is itMQ!");
~~~
Topic 和 Direct 类似, 只是匹配上⽀持了“模式”,在“点分”的 routing_key 形式中, 可以使⽤两个通配符:
* `* `表示⼀个单词;
* `#`表示零个或多个词
单词和单词之间需要用.隔开。
`*,# `只能写在.号左右,且不能挨着字符
![](https://img.kancloud.cn/ac/b0/acb033e83041eb57107ce3ee66b8304e_619x286.png)
## Headers Exchange
Headers 也是根据规则匹配,相较于 Direct 和 Topic 固定地使⽤ routing_key,headers 则是⼀个⾃定义匹配 规则的类型,根据发送的消息内容中的headers属性进行匹配.
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue
将消息中的`headers`与该`Exchange`相关联的所有`Binging`中的参数进行匹配,如果匹配上了,则发送到该`Binding`对应的`Queue`中
**匹配规则**
如果`Binding`中的
`x-match = all`:表示所有的键值对都匹配才能转发到消息。
`x-match = any`: 表示只要有键值对匹配就能转发消息。
**注意**
1. `Binging`的时候,至少需要指定两个参数,其中的一个是`x-match = all`或`x-match = any`。
2. `Binging`的时候,不需要指定`Routing key`
3. 发送消息的时候,不需要指定`Routing key`
4. 转发消息的时候,忽略`Routing key`
5. 如果是`x-match = all`则发送的`headers`不能比`bingding`的参数少,否则匹配不上
## Fanout Exchange
`Fanout Exchange`这种`exchange`效率最高,`fanout > direct > topic`
Fanout Exchange 消息⼴播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果 配置了 `routing_key` 会被忽略.
**需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。**
![](https://img.kancloud.cn/45/40/4540afe8986c30ec3bd36166b83fa749_621x237.png)
# Queues属性
* Durability:是否持久化,Durable是,Transient是否。如果不持久化,那么在服务器宕机或重启之后Queue就会丢失。
* Auto delete:如果选择yes,当最后一个消费者不在监听Queue的时候,该Queue就会自动删除,一般选择false。
* Arguments:AMQP协议留给AMQP实现者扩展使用的。
x-message-ttl:一个消息推送到队列中的存活时间。设置的值之后还没消费就会被删除。
x-expires:在自动删除该队列的时候,可以使用该队列的时间。
x-max-length:在队列头部删除元素之前,队列可以包含多少个(就绪)消息,如果再次向队列中发送消息,会删除最早的那条消息,用来控制队列中消息的数量。
x-max-length-bytes:在队列头部删除元素之前,队列的总消息体的大小,用来控制队列中消息的总大小。
x-dead-letter-exchange:当消息被拒绝或者消息过期,消息重新发送到的交换机(Exchange)的可选名称。
x-dead-letter-routing-key:当消息被拒绝或者消息过期,消息重新发送到的交换机绑定的Route key的名称,如果没有设置则使用之前的Route key。
x-max-priority:队列支持的最大优先级数,如果没有设置则不支持消息优先级
x-queue-mode:将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少RAM使用; 如果未设置,队列将保持在内存中的缓存,以尽可能快地传递消息。
x-queue-master-locator:将队列设置为主位置模式,确定在节点集群上声明队列主节点所在的规则。
# Message详解
消息。服务器和应用程序之间传送的数据,本质上就是一段数据,由Properties和Payload(body)组成。
![](https://img.kancloud.cn/af/ee/afee2fc72da28adca018b3f58c5dd324_518x192.png)
Delivery mode:是否持久化,如果未设置持久化,转发到queue中并未消费则重启服务或者服务宕机则消息丢失。
Headers:头信息,是由一个或多个健值对组成的,当固定的Properties不满足我们需要的时候,可以自己扩展。
**Properties(属性)**
content\_type:传输协议
content\_encoding:编码方式
priority:优先级
correlation\_id:rpc属性,请求的唯一标识。
reply\_to:rpc属性,
expiration:消息的过期时间
message\_id:消息的id
timestamp:消息的时间戳
...
# 持久化
如何保证消息的不丢失,三个地方做到持久化。
1. Exchange需要持久化。
2. Queue需要持久化。
3. Message需要持久化。
# 消息流转
![](https://box.kancloud.cn/f7590053496249c71f4fce5492736748_1449x656.png)
把消息路由到Exchange的指定Message Queue
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
1. P1生产消息,发送给服务器端的Exchange
2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
3. Queue1收到消息,将消息发送给订阅者C1
4. C1收到消息,发送ACK给队列确认收到消息
5. Queue1收到ACK,删除队列中缓存的此条消息
- 基础
- 编译和安装
- classpath到底是什么?
- 编译运行
- 安装
- sdkman多版本
- jabba多版本
- java字节码查看
- 数据类型
- 简介
- 整形
- char和int
- 变量和常量
- 大数值运算
- 基本类型包装类
- Math类
- 内存划分
- 位运算符
- 方法相关
- 方法重载
- 可变参数
- 方法引用
- 面向对象
- 定义
- 继承和覆盖
- 接口和抽象类
- 接口定义增强
- 内建函数式接口
- 多态
- 泛型
- final和static
- 内部类
- 包
- 修饰符
- 异常
- 枚举类
- 代码块
- 对象克隆
- BeanUtils
- java基础类
- scanner类
- Random类
- System类
- Runtime类
- Comparable接口
- Comparator接口
- MessageFormat类
- NumberFormat
- 数组相关
- 数组
- Arrays
- string相关
- String
- StringBuffer
- StringBuilder
- 正则
- 日期类
- Locale类
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新时间日期API
- 简介
- LocalDate,LocalTime,LocalDateTime
- Instant时间点
- 带时区的日期,时间处理
- 时间间隔
- 日期时间校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判断集合唯一
- Map和Entry
- stack类
- Collections集合工具类
- Stream数据流
- foreach不能修改内部元素
- of方法
- IO
- File类
- 字节流stream
- 字符流Reader
- IO流分类
- 转换流
- 缓冲流
- 流的操作规律
- properties
- 序列化流与反序列化流
- 打印流
- System类对IO支持
- commons-IO
- IO流总结
- NIO
- 异步与非阻塞
- IO通信
- Unix的IO模型
- epoll对于文件描述符操作模式
- 用户空间和内核空间
- NIO与普通IO的主要区别
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代码
- 多线程
- 创建线程
- 线程常用方法
- 线程池相关
- 线程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的几种线程池
- 线程安全
- 线程同步的几种方法
- synchronized
- 死锁
- lock接口
- ThreadLoad
- ReentrantLock
- 读写锁
- 锁的相关概念
- volatile
- 释放锁和不释放锁的操作
- 等待唤醒机制
- 线程状态
- 守护线程和普通线程
- Lamda表达式
- 反射相关
- 类加载器
- 反射
- 注解
- junit注解
- 动态代理
- 网络编程相关
- 简介
- UDP
- TCP
- 多线程socket上传图片
- NIO
- JDBC相关
- JDBC
- 预处理
- 批处理
- 事务
- properties配置文件
- DBUtils
- DBCP连接池
- C3P0连接池
- 获得MySQL自动生成的主键
- Optional类
- Jigsaw模块化
- 日志相关
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 简介
- 仓库
- 目录结构
- 常用命令
- 生命周期
- idea配置
- jar包冲突
- 依赖范围
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多环境
- 自定义插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 乱码
- session和cookie
- cookie
- session
- jsp
- 简介
- 注释
- 方法,成员变量
- 指令
- 动作标签
- 隐式对象
- EL
- JSTL
- javaBean
- listener监听器
- Filter过滤器
- 图片验证码
- HttpUrlConnection
- 国际化
- 文件上传
- 文件下载
- spring
- 简介
- Bean
- 获取和实例化
- 属性注入
- 自动装配
- 继承和依赖
- 作用域
- 使用外部属性文件
- spel
- 前后置处理器
- 生命周期
- 扫描规则
- 整合多个配置文件
- 注解
- 简介
- 注解分层
- 类注入
- 分层和作用域
- 初始化方法和销毁方法
- 属性
- 泛型注入
- Configuration配置文件
- aop
- aop的实现
- 动态代理实现
- cglib代理实现
- aop名词
- 简介
- aop-xml
- aop-注解
- 代理方式选择
- jdbc
- 简介
- JDBCTemplate
- 事务
- 整合
- junit整合
- hibernate
- 简介
- hibernate.properties
- 实体对象三种状态
- 检索方式
- 简介
- 导航对象图检索
- OID检索
- HQL
- Criteria(QBC)
- Query
- 缓存
- 事务管理
- 关系映射
- 注解
- 优化
- MyBatis
- 简介
- 入门程序
- Mapper动态代理开发
- 原始Dao开发
- Mapper接口开发
- SqlMapConfig.xml
- map映射文件
- 输出返回map
- 输入参数
- pojo包装类
- 多个输入参数
- resultMap
- 动态sql
- 关联
- 一对一
- 一对多
- 多对多
- 整合spring
- CURD
- 占位符和sql拼接以及参数处理
- 缓存
- 延迟加载
- 注解开发
- springMVC
- 简介
- RequestMapping
- 参数绑定
- 常用注解
- 响应
- 文件上传
- 异常处理
- 拦截器
- springBoot
- 配置
- 热更新
- java配置
- springboot配置
- yaml语法
- 运行
- Actuator 监控
- 多环境配置切换
- 日志
- 日志简介
- logback和access
- 日志文件配置属性
- 开机自启
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查询
- 复杂查询
- 多数据源的支持
- Repository分析
- JpaSpecificationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 动态sql
- 关联映射
- 使用xml
- spring容器
- 整合druid
- 整合邮件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 请求
- restful
- 拦截器
- 常用注解
- 参数校验
- 自定义filter
- websocket
- 响应
- 异常错误处理
- 文件下载
- 常用注解
- 页面
- Thymeleaf组件
- 基本对象
- 内嵌对象
- 上传文件
- 单元测试
- 模拟请求测试
- 集成测试
- 源码解析
- 自动配置原理
- 启动流程分析
- 源码相关链接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 创建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本数据类型
- 函数
- 区间
- 区块链
- 简介
- linux
- ulimit修改
- 防止syn攻击
- centos7部署bbr
- debain9开启bbr
- mysql
- 隔离性
- sql执行加载顺序
- 7种join
- explain
- 索引失效和优化
- 表连接优化
- orderby的filesort问题
- 慢查询
- show profile
- 全局查询日志
- 死锁解决
- sql
- 主从
- IDEA
- mac快捷键
- 美化界面
- 断点调试
- 重构
- springboot-devtools热部署
- IDEA进行JAR打包
- 导入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文档显示
- sftp文件同步
- 书签
- 代码查看和搜索
- postfix
- live template
- git
- 文件头注释
- JRebel
- 离线模式
- xRebel
- github
- 连接mysql
- 选项没有Java class的解决方法
- 扩展
- 项目配置和web部署
- 前端开发
- json和Inject language
- idea内存和cpu变高
- 相关设置
- 设计模式
- 单例模式
- 简介
- 责任链
- JUC
- 原子类
- 原子类简介
- 基本类型原子类
- 数组类型原子类
- 引用类型原子类
- JVM
- JVM规范内存解析
- 对象的创建和结构
- 垃圾回收
- 内存分配策略
- 备注
- 虚拟机工具
- 内存模型
- 同步八种操作
- 内存区域大小参数设置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 变量
- 运算符
- 模块
- Rewrite规则
- Netty
- netty为什么没用AIO
- 基本组件
- 源码解读
- 简单的socket例子
- 准备netty
- netty服务端启动
- 案例一:发送字符串
- 案例二:发送对象
- websocket
- ActiveMQ
- JMS
- 安装
- 生产者-消费者代码
- 整合springboot
- kafka
- 简介
- 安装
- 图形化界面
- 生产过程分析
- 保存消息分析
- 消费过程分析
- 命令行
- 生产者
- 消费者
- 拦截器interceptor
- partition
- kafka为什么快
- kafka streams
- kafka与flume整合
- RabbitMQ
- AMQP
- 整体架构
- RabbitMQ安装
- rpm方式安装
- 命令行和管控页面
- 消息生产与消费
- 整合springboot
- 依赖和配置
- 简单测试
- 多方测试
- 对象支持
- Topic Exchange模式
- Fanout Exchange订阅
- 消息确认
- java client
- RabbitAdmin和RabbitTemplate
- 两者简介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 详解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 简介
- 入门程序
- luke查看索引
- 分析器
- 索引库维护
- elasticsearch
- 配置
- 插件
- head插件
- ik分词插件
- 常用术语
- Mapping映射
- 数据类型
- 属性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 单模式下CURD
- mget多个文档
- 批量操作
- 版本控制
- 基本查询
- Filter过滤
- 组合查询
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 发布订阅
- 事务
- 连接池
- 管道
- 分布式可重入锁
- 配置文件翻译
- 持久化
- RDB
- AOF
- 总结
- Lettuce
- zookeeper
- zookeeper简介
- 集群部署
- Observer模式
- 核心工作机制
- zk命令行操作
- zk客户端API
- 感知服务动态上下线
- 分布式共享锁
- 原理
- zab协议
- 两阶段提交协议
- 三阶段提交协议
- Paxos协议
- ZAB协议
- hadoop
- 简介
- hadoop安装
- 集群安装
- 单机安装
- linux编译hadoop
- 添加新节点
- 退役旧节点
- 集群间数据拷贝
- 归档
- 快照管理
- 回收站
- 检查hdfs健康状态
- 安全模式
- hdfs简介
- hdfs命令行操作
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案例-单词统计
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定义排序
- 自定义Bean对象
- 排序的分类
- 案例-按总量排序需求
- 一次性完成统计和排序
- 分区
- 分区简介
- 案例-结果分区
- 多表合并
- reducer端合并
- map端合并(分布式缓存)
- 分组
- groupingComparator
- 案例-求topN
- 全局计数器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat机制
- 自定义InputFormat
- 自定义outputFormat
- 多job串联
- 倒排索引
- 共同好友
- 串联
- 数据压缩
- InputFormat接口实现类
- yarn简介
- 推测执行算法
- 本地提交到yarn
- 框架运算全流程
- 数据倾斜问题
- mapreduce的优化方案
- HA机制
- 优化
- Hive
- 安装
- shell参数
- 数据类型
- 集合类型
- 数据库
- DDL操作
- 创建表
- 修改表
- 分区表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 严格模式
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transfrom实现
- having和where不同
- 压缩
- 存储
- 存储和压缩结合使用
- explain详解
- 调优
- Fetch抓取
- 本地模式
- 表的优化
- GroupBy
- count(Distinct)去重统计
- 行列过滤
- 动态分区调整
- 数据倾斜
- 并行执行
- JVM重用
- 推测执行
- reduce内存和个数
- sql查询结果作为变量(shell)
- youtube
- flume
- 简介
- 安装
- 常用组件
- 拦截器
- 案例
- 监听端口到控制台
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 单flume多channel,sink
- 自定义拦截器
- 高可用配置
- 使用注意
- 监控Ganglia
- sqoop
- 安装
- 常用命令
- 数据导入
- 准备数据
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 打包脚本
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- oozie
- 安装
- hbase
- 简介
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- 安装
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 过滤器查询
- 建表高级属性
- 与mapreduce结合
- 与sqoop结合
- 协处理器
- 参数配置优化
- 数据备份和恢复
- 节点管理
- 案例-点击流
- 简介
- HUE
- 安装
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 单词统计(接入kafka)
- 并行度和分组
- 启动流程分析
- ACK容错机制
- ACK简介
- BaseRichBolt简单使用
- BaseBasicBolt简单使用
- Ack工作机制
- 本地目录树
- zookeeper目录树
- 通信机制
- 案例
- 日志告警
- 工具
- YAPI
- chrome无法手动拖动安装插件
- 时间和空间复杂度
- jenkins
- 定位cpu 100%
- 常用脚本工具
- OOM问题定位
- scala
- 编译
- 基本语法
- 函数
- 数组常用方法
- 集合
- 并行集合
- 类
- 模式匹配
- 异常
- tuple元祖
- actor并发编程
- 柯里化
- 隐式转换
- 泛型
- 迭代器
- 流stream
- 视图view
- 控制抽象
- 注解
- spark
- 企业架构
- 安装
- api开发
- mycat
- Groovy
- 基础