[TOC]
# 介绍
RabbitMQ 是一个消息代理:它接收并转发消息。你可以把它当成一个邮局,当你把你想发送的邮件投进邮箱时,你可以确定邮递员最终会把邮件送到你的收件人。在上面的比喻中,我们把 RabbitMQ 比作 邮箱、邮局、邮递员。
RabbitMQ 和 邮局主要的不同是RabbitMQ不处理真实的信件,它接收、存储和转发二进制的数据–消息(message)。
RabbitMQ和消息传递(messaging)中一些常使用术语:
* Producing 仅仅指发送消息(message)。发送消息的程序就称为生产者 producer
![](https://box.kancloud.cn/51a6c4d6048c67e0fa189460ec75350b_186x130.jpg)
* queue 就像是RabbitMQ内部的邮箱。虽然消息在RabbitMQ和你的应用程序之间流动,但它们只能存储在队列中。队列的大小只受主机的内存和磁盘的限制,他本质上是一个打的消息缓冲区。多个生产者可以向一个队列发送消息,多个消费者也可以尝试从一个队列接收数据,这就是队列。
![](https://box.kancloud.cn/15a632277e1b3e589c9b37e89cad1344_268x170.jpg)
* 消费和接收邮件相似,consumer 是一个主要等待接收消息的程序。
![](https://box.kancloud.cn/4a6834299d78f68f0c7b16be4fe9df16_226x158.jpg)
> 请注意, producer, consumer, broker 并不需要在同一个主机上,尽管它们在大多数应用程序中是存在一个主机上的。
# hello world
在这一章节我们将使用PHP来写两段程序: 一个生产者负责发送一条消息,还有一个消费者负责接收消息并打印出来。在这里简单说一下 php-amqlib API, 主要集中精力来完成上面的例子。
在下面的图中, P 代表生产者,C 代表消费者。中间的盒子代表一个队列 - RabbitMQ提供给消费者的一个消息缓冲区。
![](https://box.kancloud.cn/e131d9140b4e5360b1133d2ebb4c74e7_848x182.jpg)
在你的项目目录下添加 composer.json (通过composer安装)
~~~
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
~~~
安装
~~~
composer install
~~~
现在我们已经安装了 php-amqplib 类库, 接下来我们仅需要写少量的代码。
# 生产者
![](https://box.kancloud.cn/3f3e9e5758ca2310678e105828fec247_416x180.jpg)
我们将调用我们的消息发布者 sender.php 和我们的消息接受者 receive.php。消息发布者将会连接 RabbitMQ、发送一条消息、然后退出。
在 send.php 中。我们需要引入 php-amqplib 类库并且 使用必要的类:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
~~~
然后我们可以创建一个 RabbitMQ 的连接:
~~~
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
~~~
上面创建的连接是socket连接的抽象,并且完成了协议版本的协商和认证等等。在这里我们连接到一个本地机器的代理 - 因此使用 localhost 。如果我们想要连接不同机器上的代理,我们可以使用它的域名或者IP地址来替换localhost。
接下来我们新建一个 channel, 这是大部分用于完成任务的API所在的地方。
要发送消息,我们必须先声明一个队列给我们发送消息使用;然后我们就可以将消息发送到队列中:
~~~
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
~~~
声明一个队列是幂等的 - 只有当它不存在时才会被创建。消息的内容是一个字节数组,所以你可以编码你喜欢的任何东西。
最后,我们关闭channel 和 connection:
~~~
$channel->close();
$connection->close();
~~~
完整的 send.php:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
~~~
>不能正常发送消息
>可能是RabbitMQ 代理没有足够的的硬盘空间(默认至少需要 200MB)
# 消费者
我们的消费者是从 RabbitMQ 中获取消息,因此与发布单个消息的生产者不同,消费者一直运行以监听消息并将它们打印出来。
receive.php 代码几乎和 sender.php 有相同的依赖:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
~~~
设置和生产者相同;我们首先打开一个连接和创建channel,并声明我们要消费的消息队列。注意: 和生产者发送的队列名一致。
~~~
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
~~~
>请注意,我们在这里也声明了队列。这是因为我们可能在生产者之前启动消费者,所以我们要确保队列一定存在,然后再尝试从中消费信息。
在我们将要告诉服务器将队列中的消息传递给我们之前,我们需要定义一个可以接收服务器发送的消息的回调函数。请注意,消息是从服务器异步发送到客户端。、
~~~
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
~~~
receive.php 完整代码:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
整合代码
现在我们可以运行一下两个脚本。在命令行下,运行消费者(receive.php)
~~~
php receive.php
~~~
然后,运行生产者(send.php)
~~~
php send.php
~~~
消费者将会打印 生产者通过RabbitMQ 发送的消息。receive.php 将会一直运行,等待接收消息(使用 Ctrl+C 停止),因此尝试从另一个终端运行 send.php。
>
>列出RabbitMQ中队列
>你可能希望看到RabbitMQ中有什么队列和其中有多少消息。你可以使用您可以使用rabbitmqctl 工具
>sudo rabbitmqctl list_queues
>在windows上省略 sudo
>rabbitmqctl list_queues
- OAuth
- 简介
- 步骤
- 单点登录
- .user.ini
- 时间转换为今天昨天前天几天前
- 获取ip接口
- 协程
- 概念
- yield-from && return-values
- 协程与阻塞的思考
- 中间件
- mysqli异步与php的协程
- 代码片段
- pdo 执行的sql语句
- 二进制安全
- 捕捉异常中断
- global
- 利用cookie模拟登陆
- 解析非正常json
- 简单的对称加密算法
- RSA 加密
- 过滤掉emoji表情
- 判断远程图片是否存在
- 一分钟限制请求100次
- 文件处理
- 多文件上传
- 显示所有文件
- 文件下载和上面显示所有文件配合
- 文件的删除,统计,存数组等
- 图片处理
- 简介
- 验证码
- 图片等比缩放
- 批量添加水印
- beanstalkd
- 安装
- 使用
- RabbitMQ
- 简介
- debain安装
- centos安装
- 常用方法
- 入门
- 工作队列
- 订阅,发布
- 路由
- 主题
- 远程调用RPC
- 消息中间件的选型
- .htaccess
- isset、empty、if区别以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改动
- php中的各种坑
- php7改变
- php慢日志
- 邮件
- PHPMailer实现发邮件
- 验证邮件地址真实性
- 文件下载
- FastCgi 与 PHP-fpm 之间的关系
- openssl 加解密
- 反射
- 钩子方法
- 查找插件
- opcode
- opcache使用
- opcache优化
- 分布式一致性hash算法
- 概念
- 哈希算法好坏的四个定义
- php实现
- java实现
- 数组
- jwt
- jwt简介
- 单点登录
- phpize
- GeoIP扩展
- php无法获得https网页内容的解决方案
- homestead运行的脚本
- Unicode和Utf-8转换
- php优化
- kafka
- fpm配置
- configure配置详解