# RabbitMQ概述和运行机制
# RabbitMQ概述
**RabbitMQ概述**:RabbitMQ是使用最广泛的开源消息代理。RabbitMQ轻量级,易于在集群内部和云平台中部署。它支持多种消息传递协议。 它可以满足企业高规模,高可用性的要求。RabbitMQ使用Erlang语言开发的。
**MQ概述**:全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
**MQ运行机制**: P表示生产者,C表示消费者,红色部分为消息队列
![](https://box.kancloud.cn/a02211c868ca3192b4f7f8d9af4bbd7c_536x86.png)
## MQ实战场景:
1.我们在双11的时候,当我们凌晨大量的秒杀和抢购商品,然后去结算的时候,就会发现,界面会提醒我们,让我们稍等,以及一些友好的图片文字提醒。而不是像前几年的时代,动不动就页面卡死,报错等来呈现给用户。在这个业务场景中,我们就可以采用队列的机制来处理,因为同时结算就只能达到这么多。
![](https://box.kancloud.cn/5534033654975dec4f1aa1f746452ee6_869x420.png)
![](https://box.kancloud.cn/a642a8065db7f6b250bbd8ba04073ab0_680x344.jpg)
2.在我们平时的超市中购物也是一样,当我们在结算的时候,并不会一窝蜂一样涌入收银台,而是排队结算。这也是队列机制。一个接着一个的处理,不能插队。
![](https://box.kancloud.cn/0df7fc413ae1f796d04d0f6f052c3a11_1024x683.jpg)
*****
**RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。**
RabbitMQ是AMQP服务器的一种。
**AMQP简介**:AMQP,即Advanced Message Queuing Protocol,**高级消息队列协议**,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,**并不受产品、开发语言等条件的限制。**
AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer(生产者) 和 Consumer(消费者) 两种类型:
![](https://box.kancloud.cn/d55ac336d3c298d60fcf5e26e6c6fdf1_600x450.png)
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Broker
表示消息队列服务器实体。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
* 可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
* 灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
* 消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
* 高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
* 多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
* 多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
* 管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
* 跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
* 插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
# RabbitMQ 安装
RabbitMQ是用erlang语言编写的,所以我们先安装erlang语言环境
配置erlang语言环境
```
vim /etc/yum.repos.d/rabbitmq-erlang.repo
# 在rabbitmq-erlang.repo 文件中加入下面的代码
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
enabled=1
# 执行导入key
rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
# 安装erlang
yum install erlang -y #安装erlang
```
**安装rabbitmq服务**
下载rabbitmq 地址:http://www.rabbitmq.com/download.html
```
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-3.7.8-1.el7.noarch.rpm
yum install rabbitmq-server-3.7.8-1.el7.noarch.rpm
```
启用RabbitMQ的web插件 ,方便后期管理界面
```
rabbitmq-plugins enable rabbitmq_management
```
```
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
```
设置开机启动
```
systemctl enable rabbitmq-server.service
```
启动服务
```
systemctl start rabbitmq-server
```
rabbitmq配置文件位置
```
ls /var/lib/rabbitmq/mnesia
```
访问控制台
默认用户名和密码: guest/guest 。guest用户仅允许从在服务器以localhost或127.0.0.1作为ip登录
如果远程登录,如:http://192.168.1.63:15672/, 则会提示错误,登录不了。
为RabbitMQ创建用户并赋权。
```
rabbitmqctl add_user root 123456 #添加用户
rabbitmqctl set_user_tags root administrator #设置用户权限为administrator
```
到此,已经搭建成功。
# RabbitMQ使用方法
**RabbitMQ查看相关的命令**
```
rabbitmqctl list_connections #用于查看当前的连接
rabbitmqctl list_queues #会列出所有队列名称,后边可能还会带着这个队列当前消息数
rabbitmqctl status #查看当前队列信息
```
## RabbitMQ的vhost管理
当我们在创建用户时,会指定用户能访问一个虚拟机,并且该用户只能访问该虚拟机下的队列和交换机,如果没有指定,默认的是”/”;一个rabbitmq服务器上可以运行多个vhost,以便于适用不同的业务需要,这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同vhost之间是隔离的。
添加yang-web和yang-bbs两个虚拟机来管理网站和论坛的队列
```
rabbitmqctl add_vhost yang-web
rabbitmqctl add_vhost yang-bbs
```
查看创建的虚拟主机 网页查看
删除bbs虚拟机
```
rabbitmqctl delete_vhost yang-bbs
```
查看虚拟机列表
```
rabbitmqctl list_vhosts
```
**“/”是rabbitmq默认的虚拟机,之前默认连接的都是它**
# RabbitMQ管理用户、角色和权限管理
1、用户管理语法
添加用户:rabbitmqctl add_user {username} {password}
删除用户:rabbitmqctl delete_user {username}
修改密码:rabbitmqctl change_password {username} {newpassword}
2、角色权限分配
设置用户角色语法:rabbitmqctl set_user_tags {username} {tag}
RabbitMQ的tag用户角色分类:none、management、policymaker、monitoring、administrator
tag常用角色为:administrator, monitoring, management
### RabbitMQ各类角色描述:
(1)、none角色权限 :不能访问 management plugin
(2)、management角色权限:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
(3)、policymaker角色权限 #policymaker ['pɒləsɪmeɪkə(r)] 决策者
拥有management的所有权限,还拥有查看、创建和删除自己的virtual hosts所属的policies(策略)和parameters([pəˈræmɪtə(r)] 参数 )
(4)、monitoring 角色权限
拥有management的所有权限,还拥有:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
(5)、administrator角色权限
拥有policymaker和monitoring的所有权限,还拥有:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
TODO
# 使用python调用rabbitmq服务器
```
pip install pika #安装pika模块。python用pika模块调用rabbitmq。
```
**注:** rabbitmq本质是一个生产者和消费者的模型结构。生产者->rabbitmq->消费者,即生产者产生消息,给到rabbitmq存储,消费者从rabbitmq中读取数据。
创建生产者代码send.py
```
import pika
#启用对库进行阻塞,同步操作以进行简单的使用
# 连接队列到服务器
#这里可以连接远程IP,请记得打开远程端口
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#声明一个管道
channel = connection.channel()
# 创建队列。有就不管,没有就自动创建
#声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')
# 使用默认的交换机发送信息。exchange为空就使用默认的
channel.basic_publish(exchange='',
routing_key='hello',
# queue名字 #路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
body='Hello World!')
# 消息内容
print(" [x] Sent 'Hello World!'")
connection.close()
注:declare [dɪˈkleə(r)] 声明 ; consuming [kənˈsju:mɪŋ] 消费;publish [ˈpʌblɪʃ] 颁布
```
创建消耗者代码receive.py
```
import pika
# 连接服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
# rabbitmq消费端仍然使用此方法创建队列。这样做的意思是:若没有就创建,和发送端道理一样。目的是为了保证队列一定会有
channel.queue_declare(queue='hello')
# 收到消息后的回调
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
#回调函数。执行结束后立即执行另外一个函数返回给发送端是否执行完毕。
queue='hello',
no_ack=True)
#不会告知服务端我是否收到消息。一般注释。#如果注释掉,对方没有收到消息的话不会将消息丢失,始终在队列里等待下次发送。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
## 开始测试队列
python send.py #此命令执行两次,产生两个名字叫hello的消息
rabbitmqctl list_queues #查看消息队列为2
在web界面查看消息队列,发现在2个消息队列等待处理:
python receive.py #消费或处理这2个消息
rabbitmqctl list_queues #查看队列,已经为0
在web界面,查看队列,也为0了
消息被消费后,队列就相应的移除
# 交换机
消息实际上投递到的是交换机,具体路由到那个队列由交换机根据路由键(routing key)完成。
当你发消息到代理服务器时,即便路由键是空的,RabbitMQ也会将其和使用的路由键进行匹配。如果路由的消息不匹配任何绑定模式,消息将会进入黑洞。
交换机在队列与消息中间起到了中间层的作用,有了交换机我们可以实现更灵活的功能,RabbitMQ中有三种常用的交换机类型:
direct: 如果路由键匹配,消息就投递到对应的队列
fanout:投递消息给所有绑定在当前交换机上面的队列
topic:允许实现有趣的消息通信场景,使得5不同源头的消息能够达到同一个队列。topic队列名称有两个特殊的关键字。
* 可以替换一个单词
可以替换所有的单词
:
可以理解,direct为1v1, fanout为1v所有,topic比较灵活,可以1v任意。
![](https://box.kancloud.cn/4cfe6f43c82a483a889652f5a4e9f1c1_720x438.png)
# 虚拟主机
每一个虚拟主机(vhost)相当于mini版的RabbitMQ服务器,拥有自己的队列,交换机和绑定,权限… 这使得一个RabbitMQ服务众多的应用程序,而不会互相冲突。
rabbitMQ默认的虚拟主机为: “/” ,一般我们在创建Rabbit的用户时会再给用户分配一个虚拟主机。
# 消息投递策略
默认情况下RabbitMQ的队列和交换机在RabbitMQ服务器重启之后会消失,原因在于队列和交换机的durable属性,该属性默认情况下为false.
能从AMQP服务器崩溃中恢复的消息称为持久化消息,如果想要从崩溃中恢复那么消息必须
* 投递模式设置2,来标记消息为持久化
* 发送到持久化的交换机
* 到到持久化的队列
缺点:消息写入磁盘性能差很多。除非特别关键的消息会使