企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 介绍 ![](https://box.kancloud.cn/80ad9a9135797cc8087318888e5bf942_2054x1192.jpg) ![](https://box.kancloud.cn/f5ab2a22712caf9afb4c1d101361074d_706x294.jpg) * 生产者( producer ) 在图中为 P,表示消息的发送者。 * 交换机( exchanges ) 在图中为 X, 生产者发过来的消息需要经过交换机,交换机将决定将消息放到哪些队列当中。 * 队列(queue) 队列在图 1 中由红色矩形阵列表示,负责保存消息和发放消息。 * 消费者(consumer) 在图中为 C,代表等待接收消息的程序。 ## 信息流 消息是怎么从生产者传递到消费者的呢? 首先,生产者发送消息到交换机,同时发送一个 key,通过这个 key,交换机就知道该把消息发到哪个队列。随后交换机把消息发送到相应的队列中。由队列将消息发送给消费者。消费者监听某些队列,当有消息过来时,就立即处理消息。 提问 交换机是如何根据 key 来分配消息到队列? 队列怎样将消息发送给消费者? ### 第一个问题 RabbitMQ 的交换机有四种类型:direct, topic, headers, fanout * fanout fanout 交换机就跟广播一样,对消息不作选择地发给所有绑定的队列。两个队列都将收到消息 将所有收到的消息广播到所有已知的队列。 * direct ![](https://box.kancloud.cn/4cc717537642ce114a0037464aae0df7_866x324.jpg) 在 direct 模式里,交换机和队列之间绑定了一个 key,只有消息的 key 与绑定了的 key 相同时,交换机才会把消息发给该队列。消息的 key 为 orange 时,消息将进入队列 Q1 ; key 为 black 或者 green 时,消息将进入队列 Q2。若消息的 key 是其他字符串,被交换机直接遗弃 ![](https://box.kancloud.cn/6ee25282a3716cd23ae2861317ac66ce_824x280.jpg) 多重绑定 同时,交换机支持多重绑定,多个队列可以以相同的 key 与交换机绑定。当消息的 key 为 black 时,消息将进入 Q1 和 Q2 * topic topic 模式可以理解为主题模式,当 key 包涵某个主题时,即可进入该主题的队列。topic 模式的 key 必须具有固定的格式:以 . 作为间隔的一串单词;比如:quick.orange.rabbit,key 最多不能超过 255byte。 交换机和队列的key可以以类似正则表达式的方式存在,有两种语法: 1. "*" 可以替代一个单词 2. "#" 可以替代 0 个或多个单词 ![](https://box.kancloud.cn/4f08c2677cb8cc4b795da4e9e397ba4c_864x286.jpg) 图中,Q1 与交换机绑定的 kye 为:“*.orange.*”,故当消息的 key 为三个单词,且中间的单词为 orange 时,消息将进入 Q1。Q2 与exchange 绑定的 key 为 ”rabbit.#”,当消息的 key 以 rabbit 开头时,消息将进入 Q2 。 * headers 根据发送的消息内容中的headers属性进行匹配。 ### 第二个问题 ![](https://box.kancloud.cn/14506f87c78a9a5630be1c362db4a43a_670x314.jpg) Round-robin Dispatching * 循环发放(Round-robin dispatching) 队列分发消息给消费者的方式采用循环发放。举例来说,若队列里有四个消息 w, x, y, z,则 C1 将得到消息 z 和 x , C2 将得到消息 y 和 w 。即每个消费者按顺序每人发一个消息。 注意,在这种分配方式下,消息其实在刚进入队列的时候就已经内定好将要被分发的消费者。即 z, x 一定是给 C1 . y, w 一定是给 C2 。 这种方式存在一些隐患,如果 z 和 x 都是耗时的命令、y , z 都是简单的命令,C1 将不停地工作,而 C2 就比较空闲,造成资源浪费。 * 公平发放(fair dispatching) 公平发放解决了上述问题。这种方式下,队列只会把消息给空闲的消费者。如果它看到某个消费者正忙,就查找下一个空闲消费者。 * 消息的确认(Message acknowledgment) 若没有特别设定,消息一旦被队列分发给消费者,就被 Rabbitmq 从内存中删除。 在这种情况下,如果将一个正在处理消息的消费者强行关闭,那么,消息将未被完全处理,且 RabbitMQ 完全不知情。 为了解决上述问题,可以配置使得消息处理完后,向 RabbitMQ 返回一个 Acknowledgment。RabbitMQ 直到收到Acknowledgment 后,才将消息删除。 当消费者死亡时(its channel is closed, connection is closed, or TCP connection is lost),RabbitMQ 会知道这个消费者发生问题了,将重新发送消息给空闲的消费者。 消息没有 TimeOut,即使消费者处理很长很长时间,乃至无穷无尽,RabbmitMQ 也认为消费者正在处理。 > 其实,消息的确认是默认开启的,不需要特地设置。 # 概念 **Queue(消息队列)** queue是mq内部对象,用于存储未被customer消费的消息。相同属性的queue可以重复定义,每个消息都会被投入到一个或多个队列。 **Binding(绑定)** binding是将exchange和queue按照路由规则绑定起来。可以理解为binding是exchange和queue之间的关系 **Connection(连接)** 消息tcp连接 **Channel(信道)** 每个connection里,可建立多个channel,每个channel代表一个会话任务。做到尽量共用connection # 简单例子 send.php: ~~~ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 创建连接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 创建channel,多个channel可以共用连接 $channel = $connection->channel(); // 创建交换机以及队列(如果已经存在,不需要重新再次创建并且绑定) // 创建直连的交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 创建队列 $channel->queue_declare('hello', false, false, false, false); // 交换机跟队列的绑定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 设置消息bady传送字符串logs(消息只能为字符串,建议消息均json格式) $msg = new AMQPMessage('logs'); // 发送数据到对应的交换机direct_logs并设置对应的routigKey $channel->basic_publish($msg, 'direct_logs', 'routigKey'); ~~~ receive.php: ~~~ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 创建连接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 创建channel,多个channel可以共用连接 $channel = $connection->channel(); // 可能会在数据发布之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。 // 创建直连的交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 创建队列 $channel->queue_declare('hello', false, false, false, false); // 交换机跟队列的绑定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 回调函数 $callback = function ($msg) { echo $msg->body; }; // 启动队列消费者 $channel->basic_consume('hello3', '', false, true, false, false, $callback); // 判断是否存在回调函数 while(count($channel->callbacks)) { // 此处为执行回调函数 $channel->wait(); } ~~~ # 安装 先把rabbitmq安装上 https://github.com/php-amqplib/php-amqplib 这个需要 mbstring bcmath dom curl这些插件 安装完rabbitermq之后,我们还要下载对应php的composer包 # RabbitMQ备注 * 非持久化会导致,队列重启,数据丢失 * exchange持久化,在声明durable参数时指定为true * queue持久化,在声明durable参数时指定true * 消息持久化,实例化AMQPMessage类时指定delivery_mode为2 * exchange和queue是否持久化需要一致才能绑定 * 消费者设置手动ack,在声明no_ack参数时指定false * 队列消息异常需要将消息删除并再次发送同样的消息置于末尾并手动记录日志 # 基本概念 来了解下RabbitMQ的成员组件: broker:消息队列服务器的实体,是一个中间件应用,负责接收生产者的消息,然后将消息发送到消息接受者或者其他broker exchange:消息交换机,是消息到达的第一个地方,消息通过它指定的路由规则,分发到不同的消息队列中 queue:消息队列,消息通过发送和路由之后最后到达的地方,到达queue的消息即进入逻辑上的等待消费状态。每个消息都会被发送到一个或者多个队列 binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来,也就是exchange和queue之间的虚拟连接 virtual host:虚拟主机,他是对broker的虚拟划分,将消费者、生产者和他们依赖的AMQP相关结构进行隔离,一般都是为了安全考虑,比如我们在一个broker中设置多个虚拟主机,对不同用户进行权限的分离 routing key:路由关键字,exchange根据这个关键字进行消息的投递 connection:连接,代表生产者、消费者、broker之间进行通信的物理网络 channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个链接里,可可建立多个channel,每个channel代表一个会话任务,通过channel可以隔离连接中的不同交互内容 看下客户端使用rabbitmq的基本流程: 1.消费者连接到消息队列服务器,打开一个channel 2.消费者声明一个exchange,并设置相关属性 3.消费者声明一个queue,并设置相关属性 4.消费者使用routing key,在exchange和queue之间建立绑定 5.生产者投递消息到exchange 6.exchange接收到消息后,根据消息的key和已经设置好的binding,进行消息路由,将消息投递到一个或者多个queue里 # 命令行使用 查看rabbitmq的安装目录:whereis rabbitmq cd到rabbitmq根目录下的sbin文件夹下 后台开启rabbitmq:./rabbitmq-server start -detached 关闭:./rabbitmqctl stop 查看交换机、绑定、队列: ~~~ ./rabbitmqctl list_exchanges ./rabbitmqctl list_bindings ./rabbitmqctl list_queues ~~~ 用户管理:(以下操作生效需要重启rabbitmq) 新建用户:./rabbitmqctl add_user username password 删除用户:./rabbitmqctl delete_user username 改密码:./rabbimqctl change_password username newpassword 设置用户角色:./rabbitmqctl set_user_tags username tag (Tag可以为 administrator,monitoring, management) 开启web管理界面:./rabbitmq-plugins enable rabbitmq_management (在浏览器地址栏输入:http://127.0.0.1:15672进入) # 介绍 RabbitMQ其他小介绍 RabbitMQ端口问题,RabbitMQ是默认霸占了5672,15672,25672这三个端口的 5672端口是用于AMQP协议连接 15672端口是用于http协议连接(不信可以试试web访问5672看行不行) RabbitMQ数据持久化 RabbitMQ有三种可设置的持久化,分别为Exchange(交换机)持久化,Queue(队列)持久化,信息持久化,如果设置了交换机和队列持久化,路由也会自动的持久化 # 参考 文档http://previous.rabbitmq.com/v3_5_7/getstarted.html http://www.cnblogs.com/bluebirds/p/6069623.html https://www.cnblogs.com/chunguang/p/5634342.html http://blog.csdn.net/sinat_21125451/article/details/53422648 https://www.phpxy.com/article/115.html 重点 http://blog.csdn.net/demon3182/article/details/77335206 (**超级重点,里面还有laragon**) http://blog.csdn.net/super_rd/article/details/70241007?utm_source=itdadao&utm_medium=referral http://rabbitmq.org.cn/ (中文) http://rabbitmq.mr-ping.com (python版) laragon http://blog.csdn.net/demon3182/article/details/76423340 http://blog.csdn.net/demon3182/article/details/76528612