[TOC]
英文文档:http://www.rabbitmq.com/getstarted.html
中文文档:https://rabbitmq.shujuwajue.com/tutorials_with_php/[2]Work_Queues.md.html
总结:官方文档看起来有点乱,总结了一下,基本都是互通的,以下为简单基本步骤,仅供参考,NO BB。
# 生产者:
## 1、创建连接
主要参数说明:
~~~
$host: RabbitMQ服务器主机IP地址
$port: RabbitMQ服务器端口
$user: 连接RabbitMQ服务器的用户名
$password: 连接RabbitMQ服务器的用户密码
$vhost: 连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
~~~
~~~
$connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
~~~
## 2、获取信道
~~~
// $channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
~~~
~~~
$channel = $connection->channel($channel_id);
~~~
## 3、在信道里创建交换器
~~~
# $exhcange_name 交换器名字
# $type 交换器类型:
’’ 默认交换机 匿名交换器 未显示声明类型都是该类型
fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的
headers 头部交换器
direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配
topic 话题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) 、 user.key .abc.* 类型的key
rpc
#$passive false
#durable false
#auto_detlete false
$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);
//常用设置 $passive=>false $durable=>false $auto_delete->false
~~~
## 4、创建要发送的信息 ,可以创建多个消息
~~~
#$data string类型 要发送的消息
#$properties array类型 设置的属性,比如设置该消息持久化[‘delivery_mode’=>2]
$msg = new AMQPMessage($data,$properties)
~~~
## 5、发送消息
~~~
#$msg object AMQPMessage对象
#$exchange string 交换机名字
#$routing_key string 路由键 如果交换机类型
fanout: 该值会被忽略,因为该类型的交换机会把所有它知道的队列发消息,无差别区别
direct 只有精确匹配该路由键的队列,才会发送消息到该队列
topic 只有正则匹配到的路由键的队列,才会发送到该队列
$channel->basic_publish($msg,$exchange,$routing_key);
~~~
## 6、关闭信道和链接
~~~
$channel->close();
$connection->close();
~~~
# 消费者:
## 1、创建连接
主要参数说明:
~~~
$host: RabbitMQ服务器主机IP地址
$port: RabbitMQ服务器端口
$user: 连接RabbitMQ服务器的用户名
$password: 连接RabbitMQ服务器的用户密码
$vhost: 连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
~~~
~~~
$connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
~~~
## 2、获取信道
~~~
// $channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
$channel = $connection->channel($channel_id);
~~~
## 3、在信道里创建交换器,未显式声明交换机都是使用匿名交换机
~~~
# $exhcange_name 交换器名字
# $type 交换器类型:
’’ 默认交换机 匿名交换器 未显示声明类型都是该类型
fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的
headers 头部交换器
direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配
topic 话题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) 、 user.key .abc.* 类型的key
rpc
#$passive false
#durable false
#auto_detlete false
$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);
//常用设置 $passive=>false $durable=>false $auto_delete->false
~~~
## 4、声明消费者队列
(1) 非持久化队列,RabbitMQ退出或者崩溃时,该队列就不存在
~~~
list($queue_name, ,) = $channel->queue_declare("", false, false, false, false)
~~~
(2) 持久化队列(需要显示声明,第三个参数要设置为true),保存到磁盘,但不一定完全保证不丢失信息,因为保存总是要有时间的。
~~~
list($queue_name, ,) = $channel->queue_declare("ex_queue", false, false, true, false)
~~~
## 5、绑定交换机,当未显示绑定交换机时,默认是绑定匿名交换机
~~~
#绑定:交换机与队列的关系,如下面这句代码意思是,$queue_name队列对logs交换机数据感兴趣,该队列就消费该交换机传过来的数据:这个队列(queue)对这个交换机(exchange)的消息感兴趣. $binding_key默认为空,表示对该交换机所有消息感兴趣,如果值不为空,则该队列只对该类型的消息感兴趣(除了fanout交换机以外)
$channel->queue_bind($queue_name, 'logs', $binding_key);
~~~
## 6、消费消息
~~~
#该代码表示使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker),轮询、负载均衡配置
#$channel->basic_qos(null, 1, null);
#第四个参数 no_ack = false 时,表示进行ack应答,确保消息已经处理
#$callback 表示回调函数,传入消息参数
$channel->basic_consume('ex_queue', '', false, false, false, false, $callback);
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
#当no_ack=false时, 需要写下行代码,否则可能出现内存不足情况#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
#监听消息,一有消息,立马就处理
while(count($channel->callbacks)) {
$channel->wait();
}
~~~
- OAuth
- 简介
- 步骤
- 单点登录
- .user.ini
- 时间转换为今天昨天前天几天前
- 获取ip接口
- 协程
- 概念
- yield-from && return-values
- 协程与阻塞的思考
- 中间件
- mysqli异步与php的协程
- 代码片段
- pdo 执行的sql语句
- 二进制安全
- 捕捉异常中断
- global
- 利用cookie模拟登陆
- 解析非正常json
- 简单的对称加密算法
- RSA 加密
- 过滤掉emoji表情
- 判断远程图片是否存在
- 一分钟限制请求100次
- 文件处理
- 多文件上传
- 显示所有文件
- 文件下载和上面显示所有文件配合
- 文件的删除,统计,存数组等
- 图片处理
- 简介
- 验证码
- 图片等比缩放
- 批量添加水印
- beanstalkd
- 安装
- 使用
- RabbitMQ
- 简介
- debain安装
- centos安装
- 常用方法
- 入门
- 工作队列
- 订阅,发布
- 路由
- 主题
- 远程调用RPC
- 消息中间件的选型
- .htaccess
- isset、empty、if区别以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改动
- php中的各种坑
- php7改变
- php慢日志
- 邮件
- PHPMailer实现发邮件
- 验证邮件地址真实性
- 文件下载
- FastCgi 与 PHP-fpm 之间的关系
- openssl 加解密
- 反射
- 钩子方法
- 查找插件
- opcode
- opcache使用
- opcache优化
- 分布式一致性hash算法
- 概念
- 哈希算法好坏的四个定义
- php实现
- java实现
- 数组
- jwt
- jwt简介
- 单点登录
- phpize
- GeoIP扩展
- php无法获得https网页内容的解决方案
- homestead运行的脚本
- Unicode和Utf-8转换
- php优化
- kafka
- fpm配置
- configure配置详解