[TOC]
## 1. 什么是RocketMQ
2011年阿里巴巴中间件团队自主研发了RocketMQ消息中间件,具有单机亿级消息堆积能力,且能支持严格的消息顺序。凭借其高性能、低延时和高可靠的特性承载了近年来双十一交易峰值(2016年为17万笔/秒),在整个生产链路上都有着稳定和出色的表现。阿里在2016年将RocketMQ贡献给Apache,并成为了Apache的顶级开源项目。现在RocketMQ有开源和商用两个版本。
> 1. 队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
> 2. 支持持久化
> 3. 需要良好的硬件支持
> 4. 亿级消息堆积能力
**消息队列主要的应用场景**:
> 异步处理,应用解耦,流量削峰,消息通讯。
天然支持负载均衡,消费者组里的消费者实例平均分摊消费消息
## 2. 概念解读
一个RocketMQ集群架构如下:
![](https://box.kancloud.cn/bbdf66b6b3c6aa71e26ea0ac23940d4b_905x523.png)
运行原理:
1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。
### 2.1 nameserver
> 1. 几乎无状态节点、可部署多个,节点之间无信息同步。
> 2. nameserver接收broker的请求,注册broker路由信息;
> 3. 保存topic路由信息(producer和consumer都需要从它这里获取路由信息)
### 2.2 brokerserver
#### 2.2.1 功能描述
> 1. broker负责消息的接收、暂存和推送
> 2. broker节点分为两类:Master和slave,BrokerId为0表示Master,非0表示Slave
> 3. 一个Master可以对应多个Slave,但是一个Slave只可以对应一个Master。
> 4. Master和Slave的绑定是依靠相同的BrokerName和不同的BrokerId
> 5. Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到 所有队列中,最终效果就是所有消息都平均落在每个Broker上。
> Broker是把消息持久化到磁盘文件的,同步刷盘就是写入后才告知producer成功;异步刷盘是收到消息后就告知producer成功了,之后异步地将消息从内存(PageCache)写入到磁盘上。
>6. 每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server,并且定时发送心跳包
### 2.3 producer
> 1. 生产者:发送消息,将消息推送给brokerserver;
> 2. Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息
> 3. 向提供Topic服务的Master建立长连接,且定时向Master发送心跳。
> 4. Producer完全无状态,可集群部署。
**producer消息发送队列选择方式**
> RocketMQ采用**轮询**所有队列的方式确定消息发送到哪一个队列,RocketMQ提供了三种消息队列轮询(MessageQueueSelector)方式:
> 1. 根据Hash值进行轮询:
> 即SelectMessageQueueByHash implements MessageQueueSelector)
> 2. 随机方式:
> 即SelectMessageQueueByRandoom implements MessageQueueSelector)
> 3. 自定义方式:
> 继承MessageQueueSelector接口,重写select方法,返回选择的队列
### 2.4 consumer
> 1. Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息
> 2. 向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。
> 3. Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
#### 2.4.1 consumer概念
> 1. 消费者:接收消息,从brokerserver上获取消息。
> 2. 在rocketmq里,consumer被分为2类:
> MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
> 3. Consumer 如果做广播消费,则一个 consumer实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 topic 对应的队列集合。
> 4. 消费者必须有返回状态,否则rocketmq会重发这条消息
> 5. consumer与所有关联的broker保持长连接(包括主从),每隔30s发送心跳,可配置,可以通过heartbeatBrokerInterval配置。broker每隔10s扫描连接,发现2分钟内没有心跳,则关闭连接,并通知该consumer组内其他实例,过来继续消费该topic。
#### 2.4.1 consumer(pull、push)
> pull 与 push对比:
> 慢消费无疑是push模型最大的致命伤,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
> 反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~
> 二者区别是:
> push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
> pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
3.Push Consumer
应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法。所以,所谓Push指的是客户端内部的回调机制,并不是与服务端之间的机制。
4.Pull Consumer
应用通常主动调用Consumer从服务端拉消息,然后处理。这用的就是短轮询方式了,在不同情况下,与长轮询各有优点。
### 2.5 topic
> 1. 用来表示一类应用(所以一类应用用一个topic最佳)
> 2. 一个topic默认有四个队列
> 3. RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。
> 线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。
所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
### 2.6 tag
> topic的子型,tags 可以由应用自由设置。只有收送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
### 2.7 key
消息中的key有哪些作用:
> 服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key 来查询返回消息内容,以及消息被谁消费。
> 如果一个消息包含key值的话,会使用IndexFile存储消息索引,查询
> 自定义Key,可以用于去重
索引文件主要用于根据key来查询消息的,流程主要是:
> 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
> 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
> 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
## 3. 集群模式
### 3.1 主从关系
#### 3.1.1 确立主从关系
broker有三个描述信息:
> 1. brokerClusterName:指定broker所属集群(同一集群下,名称自然要相同)
> 2. brokerName: broker名称,用来表示broker
> 3. brokerId:broker编号
主从关系确立有两点:
> 1. 处于同一集群下(保证brokerClusterName相同即可)
> 2. 相同的brokerName
> 3. 不同的brokerId,0表示该节点是master,大于0表示该节点是slave
#### 3.1.1 主从关系说明
1. master与slave的数据同步:同步和异步
> 同步指:
> 消息到达master,master将数据同步给slave,完成后才确认消息被安全的保存(安全,效率较异步差)
> 异步指:
> 消息到达master后,直接返回一个成功应答,同时异步同步消息给slave。(安全性差一些,性能较高)
2. 一旦master宕机,开源的rockmet不支持自动主从切换,从节点支持读不支持写
rocket结构模式
### 3.2 单点master
> 不安全,不实用
### 3.3 双master
> 全是Master,没有Slave。
> 一个broker宕机了,应用是无影响的,缺点在于宕机的Master上未被消费的消息在Master没有恢复之前不可以订阅。
### 3.4 多Master多Slave模式(异步复制):
> 多对Master-Slave,高可用!采用异步复制的方式,主备之间短暂延迟,MS级别。
> Master宕机,消费者可以从Slave上进行消费,不受影响,但是Master的宕机,会导致丢失掉极少量的消息。但是该从broker不支持写。
### 3.5 多Master多Slave模式(同步双写):
> 数据同步方式的是同步方式,也就是在Master/Slave都写成功的前提下,向应用返回成功,可见不论是数据,还是服务都没有单点,都非常可靠!缺点在于同步的性能比异步稍低。但是该从broker不支持写。
### 3.6 RocketMQ与ActiveMQ对比
RocketMQ在功能和性能上都超过了ActiveMQ!
| | RocketMQ | ActiveMQ |说明|
| --- | --- | --- |--- |
| 消息过滤 | 仅支持客户端过滤 | 支持客户端和broker端 |RocketMQ在broker端进行过滤可以减少大量的网络传输是否会有消息重发造成的重复消费:RocketMQ可以保证,ActiveMQ无法保证|
| 消息有序性 | 支持 | 支持|
| 回溯消费 | 支持 | 不支持 |RocketMQ队列可以将数据持久化到硬盘的,但是需要定期进行清除|
| 定时消费 | 支持 | 不支持|
| 分布式 | 原生支持 | 原生不支持 |ActiveMQ需要做额外的工作来达到分布式|
| 事务 | 支持 | 支持|
| 持久化 | 支持 | 支持|
## 4. 回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
## 5. 消息存储
1. 消息的存储是一直存在于CommitLog中的,由于CommitLog是以文件为单位(而非消息)存在的,而且CommitLog的设计是只允许顺序写,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。
- Docker
- 什么是docker
- Docker安装、组件启动
- docker网络
- docker命令
- docker swarm
- dockerfile
- mesos
- 运维
- Linux
- Linux基础
- Linux常用命令_1
- Linux常用命令_2
- ip命令
- 什么是Linux
- SELinux
- Linux GCC编译警告:Clock skew detected. 错误解决办法
- 文件描述符
- find
- 资源统计
- LVM
- Linux相关配置
- 服务自启动
- 服务器安全
- 字符集
- shell脚本
- shell命令
- 实用脚本
- shell 数组
- 循环与判断
- 系统级别进程开启和停止
- 函数
- java调用shell脚本
- 发送邮件
- Linux网络配置
- Ubuntu
- Ubuntu发送邮件
- 更换apt-get源
- centos
- 防火墙
- 虚拟机下配置网络
- yum重新安装
- 安装mysql5.7
- 配置本地yum源
- 安装telnet
- 忘记root密码
- rsync+ crontab
- Zabbix
- Zabbix监控
- Zabbix安装
- 自动报警
- 自动发现主机
- 监控MySQL
- 安装PHP常见错误
- 基于nginx安装zabbix
- 监控Tomcat
- 监控redis
- web监控
- 监控进程和端口号
- zabbix自定义监控
- 触发器函数
- zabbix监控mysql主从同步状态
- Jenkins
- 安装Jenkins
- jenkins+svn+maven
- jenkins执行shell脚本
- 参数化构建
- maven区分环境打包
- jenkins使用注意事项
- nginx
- nginx认证功能
- ubuntu下编译安装Nginx
- 编译安装
- Nginx搭建本地yum源
- 文件共享
- Haproxy
- 初识Haproxy
- haproxy安装
- haproxy配置
- virtualbox
- virtualbox 复制新的虚拟机
- ubuntu下vitrualbox安装redhat
- centos配置双网卡
- 配置存储
- Windows
- Windows安装curl
- VMware vSphere
- 磁盘管理
- 增加磁盘
- gitlab
- 安装
- tomcat
- Squid
- bigdata
- FastDFS
- FastFDS基础
- FastFDS安装及简单实用
- api介绍
- 数据存储
- FastDFS防盗链
- python脚本
- ELK
- logstash
- 安装使用
- kibana
- 安准配置
- elasticsearch
- elasticsearch基础_1
- elasticsearch基础_2
- 安装
- 操作
- java api
- 中文分词器
- term vector
- 并发控制
- 对text字段排序
- 倒排和正排索引
- 自定义分词器
- 自定义dynamic策略
- 进阶练习
- 共享锁和排它锁
- nested object
- 父子关系模型
- 高亮
- 搜索提示
- Redis
- redis部署
- redis基础
- redis运维
- redis-cluster的使用
- redis哨兵
- redis脚本备份还原
- rabbitMQ
- rabbitMQ安装使用
- rpc
- RocketMQ
- 架构概念
- 安装
- 实例
- 好文引用
- 知乎
- ACK
- postgresql
- 存储过程
- 编程语言
- 计算机网络
- 基础_01
- tcp/ip
- http转https
- Let's Encrypt免费ssl证书(基于haproxy负载)
- what's the http?
- 网关
- 网络IO
- http
- 无状态网络协议
- Python
- python基础
- 基础数据类型
- String
- List
- 遍历
- Python基础_01
- python基础_02
- python基础03
- python基础_04
- python基础_05
- 函数
- 网络编程
- 系统编程
- 类
- Python正则表达式
- pymysql
- java调用python脚本
- python操作fastdfs
- 模块导入和sys.path
- 编码
- 安装pip
- python进阶
- python之setup.py构建工具
- 模块动态导入
- 内置函数
- 内置变量
- path
- python模块
- 内置模块_01
- 内置模块_02
- log模块
- collections
- Twisted
- Twisted基础
- 异步编程初探与reactor模式
- yield-inlineCallbacks
- 系统编程
- 爬虫
- urllib
- xpath
- scrapy
- 爬虫基础
- 爬虫种类
- 入门基础
- Rules
- 反反爬虫策略
- 模拟登陆
- problem
- 分布式爬虫
- 快代理整站爬取
- 与es整合
- 爬取APP数据
- 爬虫部署
- collection for ban of web
- crawlstyle
- API
- 多次请求
- 向调度器发送请求
- 源码学习
- LinkExtractor源码分析
- 构建工具-setup.py
- selenium
- 基础01
- 与scrapy整合
- Django
- Django开发入门
- Django与MySQL
- java
- 设计模式
- 单例模式
- 工厂模式
- java基础
- java位移
- java反射
- base64
- java内部类
- java高级
- 多线程
- springmvc-restful
- pfx数字证书
- 生成二维码
- 项目中使用log4j
- 自定义注解
- java发送post请求
- Date时间操作
- spring
- 基础
- spring事务控制
- springMVC
- 注解
- 参数绑定
- springmvc+spring+mybatis+dubbo
- MVC模型
- SpringBoot
- java配置入门
- SpringBoot基础入门
- SpringBoot web
- 整合
- SpringBoot注解
- shiro权限控制
- CommandLineRunner
- mybatis
- 静态资源
- SSM整合
- Aware
- Spring API使用
- Aware接口
- mybatis
- 入门
- mybatis属性自动映射、扫描
- 问题
- @Param 注解在Mybatis中的使用 以及传递参数的三种方式
- mybatis-SQL
- 逆向生成dao、model层代码
- 反向工程中Example的使用
- 自增id回显
- SqlSessionDaoSupport
- invalid bound statement(not found)
- 脉络
- beetl
- beetl是什么
- 与SpringBoot整合
- shiro
- 什么是shiro
- springboot+shrio+mybatis
- 拦截url
- 枚举
- 图片操作
- restful
- java项目中日志处理
- JSON
- 文件工具类
- KeyTool生成证书
- 兼容性问题
- 开发规范
- 工具类开发规范
- 压缩图片
- 异常处理
- web
- JavaScript
- 基础语法
- 创建对象
- BOM
- window对象
- DOM
- 闭包
- form提交-文件上传
- td中内容过长
- 问题1
- js高级
- js文件操作
- 函数_01
- session
- jQuery
- 函数01
- data()
- siblings
- index()与eq()
- select2
- 动态样式
- bootstrap
- 表单验证
- 表格
- MUI
- HTML
- iframe
- label标签
- 规范编程
- layer
- sss
- 微信小程序
- 基础知识
- 实践
- 自定义组件
- 修改自定义组件的样式
- 基础概念
- appid
- 跳转
- 小程序发送ajax
- 微信小程序上下拉刷新
- if
- 工具
- idea
- Git
- maven
- svn
- Netty
- 基础概念
- Handler
- SimpleChannelInboundHandler 与 ChannelInboundHandler
- 网络编程
- 网络I/O
- database
- oracle
- 游标
- PLSQL Developer
- mysql
- MySQL基准测试
- mysql备份
- mysql主从不同步
- mysql安装
- mysql函数大全
- SQL语句
- 修改配置
- 关键字
- 主从搭建
- centos下用rpm包安装mysql
- 常用sql
- information_scheme数据库
- 值得学的博客
- mysql学习
- 运维
- mysql权限
- 配置信息
- 好文mark
- jsp
- jsp EL表达式
- C
- test