>[danger] ## **RabbitMQ 是一个消息代理:它接收并转发消息。**
RabbitMQ 你可以把它当成一个邮箱,当你把你想发送的邮件投进邮箱时,你可以确定邮递员最终会把邮件送到你的收件人。
RabbitMQ 它接收、存储和转发二进制的数据–即消息(message)。
RabbitMQ中一些常使用术语:
* Publisher:生产者,消息的发送方。
* Connection:网络连接。
* Channel:信道,多路复用连接中的一条独立的双向数据流通道。
* Exchange:交换机(路由器),负责消息的路由到相应队列。
* Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
* Queue:队列,消息的缓冲存储区。
* Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
* Broker:消息队列的服务器实体。
* Consumer:消费者,消息的接收方。
RabbitMQ 消息结构:
![](https://img.kancloud.cn/2b/57/2b5734d2b2a2095c452562205cdaadc4_693x212.png)
# **hello world**
现在我们来实现一个生产者负责发送5条消息,一个消费者负责接收消息并打印出来
### **1、生产者**
生产者将会连接 RabbitMQ、发送5条消息、然后关闭连接。
![](https://img.kancloud.cn/4b/2c/4b2c48d511e099bee5315fc10bd89b05_216x100.png)
```
<?php
// +----------------------------------------------------------------------
// | najing [ 通用后台管理系统 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2020 http://www.najingquan.com All rights reserved.
// +----------------------------------------------------------------------
// | Author: 救火队队长
// +----------------------------------------------------------------------
namespace app\controller;
use app\BaseController;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Mq extends BaseController
{
/**
* 功能描述: 生产者,负责发送消息
* @author 救火队队长
* @return string
*/
public function send()
{
//队列名 消息队列载体,每个消息都会被投入到一个或多个队列。
$queue = 'hello';
//建立连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/');
//获取信道
$channel = $connection->channel();
//声明创建队列
$channel->queue_declare($queue, false, false, false, false);
for ($i=0; $i < 5; ++$i) {
sleep(1);//休眠1秒
//消息内容
$messageBody = "Hello,Zq Now Time:".date("h:i:s");
//将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//发送消息
$channel->basic_publish($message, '', $routing);
echo "Send Message:". $i."\n";
}
//关闭信道
$channel->close();
//关闭连接
$connection->close();
return 'Send Success';
}
}
```
在浏览器访问http://mq.najingquan.com/mq/send,发送5条消息
![](https://img.kancloud.cn/e9/e0/e9e0bd5fd981717af5919da0e7ee6158_1007x196.png)
登陆RabbitMQ Web 管理界面
![](https://img.kancloud.cn/2f/23/2f2372e8369c3b894b647a3a23cc2f80_1887x660.png)
### **2、消费者**
消费者从 RabbitMQ 中获取消息,与发布消息的生产者不同,消费者一直运行以监听消息,有新消息就立即处理
![](https://img.kancloud.cn/73/80/73803ab521a14d996cadaf4ac4bdcd41_216x100.png)
利用TP6自定义命令行实现消费者
```
<?php
// +----------------------------------------------------------------------
// | najing [ 通用后台管理系统 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2020 http://www.najingquan.com All rights reserved.
// +----------------------------------------------------------------------
// | Author: 救火队队长
// +----------------------------------------------------------------------
declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Mq extends Command
{
protected function configure()
{
// 指令配置
$this->setName('mq')
->setDescription('the mq command');
}
protected function execute(Input $input, Output $output)
{
//队列名 消息队列载体,每个消息都会被投入到一个或多个队列。
$queue = 'hello';
//建立连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/');
//获取信道
$channel = $connection->channel();
//声明创建队列
$channel->queue_declare($queue, false, false, false, false);
//消息消费
$channel->basic_consume($queue, '', false, true, false, false, function ($msg) use ($output) {
$output->writeln(" Received " . $msg->body . PHP_EOL);
});
while (count($channel->callbacks)) {
$channel->wait();
}
//关闭信道
$channel->close();
//关闭连接
$connection->close();
}
}
```
在控制台启动消费者:
![](https://img.kancloud.cn/51/6e/516eba962a44d56cdd287282705ff311_496x179.png)
切换到RabbitMQ Web 管理界面,消息已经被消费:
![](https://img.kancloud.cn/53/1e/531e37c43f2304b0cef7880d8f8c5bf5_1891x708.png)
- 消息队列中间件-前言
- RabbitMQ安装
- PHP安装rabbitmq、php-amqplib扩展
- RabbitMQ入门
- 工作队列(Work Queues)
- 发布/订阅(Publish/Subscribe)
- 直接交换机 (Direct exchange)
- 通配符交换机(Topic exchange)
- 远程调用(RPC)
- 延迟队列、死信队列
- 重试队列(可靠性投递,重试超过3次,入库告警)
- 消费幂等
- RabbitMQ + think-swoole + Redis秒杀高并发实战
- redis商品库存预减
- 秒杀请求入队,可靠性投递
- 秒杀请求出队,生成秒杀订单,减少商品库存
- 性能测试 - 单机(2核4G)2000并发,抢购100个商品