>[danger] ## **RabbitMQ 是一个消息代理:它接收并转发消息。** RabbitMQ 你可以把它当成一个邮箱,当你把你想发送的邮件投进邮箱时,你可以确定邮递员最终会把邮件送到你的收件人。 RabbitMQ 它接收、存储和转发二进制的数据–即消息(message)。 RabbitMQ中一些常使用术语: * Publisher:生产者,消息的发送方。 * Connection:网络连接。 * Channel:信道,多路复用连接中的一条独立的双向数据流通道。 * Exchange:交换机(路由器),负责消息的路由到相应队列。 * Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。 * Queue:队列,消息的缓冲存储区。 * Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。 * Broker:消息队列的服务器实体。 * Consumer:消费者,消息的接收方。 RabbitMQ 消息结构: ![](https://img.kancloud.cn/2b/57/2b5734d2b2a2095c452562205cdaadc4_693x212.png) # **hello world** 现在我们来实现一个生产者负责发送5条消息,一个消费者负责接收消息并打印出来 ### **1、生产者** 生产者将会连接 RabbitMQ、发送5条消息、然后关闭连接。 ![](https://img.kancloud.cn/4b/2c/4b2c48d511e099bee5315fc10bd89b05_216x100.png) ``` <?php // +---------------------------------------------------------------------- // | najing [ 通用后台管理系统 ] // +---------------------------------------------------------------------- // | Copyright (c) 2020 http://www.najingquan.com All rights reserved. // +---------------------------------------------------------------------- // | Author: 救火队队长 // +---------------------------------------------------------------------- namespace app\controller; use app\BaseController; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class Mq extends BaseController { /** * 功能描述: 生产者,负责发送消息 * @author 救火队队长 * @return string */ public function send() { //队列名 消息队列载体,每个消息都会被投入到一个或多个队列。 $queue = 'hello'; //建立连接 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/'); //获取信道 $channel = $connection->channel(); //声明创建队列 $channel->queue_declare($queue, false, false, false, false); for ($i=0; $i < 5; ++$i) { sleep(1);//休眠1秒 //消息内容 $messageBody = "Hello,Zq Now Time:".date("h:i:s"); //将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); //发送消息 $channel->basic_publish($message, '', $routing); echo "Send Message:". $i."\n"; } //关闭信道 $channel->close(); //关闭连接 $connection->close(); return 'Send Success'; } } ``` 在浏览器访问http://mq.najingquan.com/mq/send,发送5条消息 ![](https://img.kancloud.cn/e9/e0/e9e0bd5fd981717af5919da0e7ee6158_1007x196.png) 登陆RabbitMQ Web 管理界面 ![](https://img.kancloud.cn/2f/23/2f2372e8369c3b894b647a3a23cc2f80_1887x660.png) ### **2、消费者** 消费者从 RabbitMQ 中获取消息,与发布消息的生产者不同,消费者一直运行以监听消息,有新消息就立即处理 ![](https://img.kancloud.cn/73/80/73803ab521a14d996cadaf4ac4bdcd41_216x100.png) 利用TP6自定义命令行实现消费者 ``` <?php // +---------------------------------------------------------------------- // | najing [ 通用后台管理系统 ] // +---------------------------------------------------------------------- // | Copyright (c) 2020 http://www.najingquan.com All rights reserved. // +---------------------------------------------------------------------- // | Author: 救火队队长 // +---------------------------------------------------------------------- declare (strict_types = 1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class Mq extends Command { protected function configure() { // 指令配置 $this->setName('mq') ->setDescription('the mq command'); } protected function execute(Input $input, Output $output) { //队列名 消息队列载体,每个消息都会被投入到一个或多个队列。 $queue = 'hello'; //建立连接 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/'); //获取信道 $channel = $connection->channel(); //声明创建队列 $channel->queue_declare($queue, false, false, false, false); //消息消费 $channel->basic_consume($queue, '', false, true, false, false, function ($msg) use ($output) { $output->writeln(" Received " . $msg->body . PHP_EOL); }); while (count($channel->callbacks)) { $channel->wait(); } //关闭信道 $channel->close(); //关闭连接 $connection->close(); } } ``` 在控制台启动消费者: ![](https://img.kancloud.cn/51/6e/516eba962a44d56cdd287282705ff311_496x179.png) 切换到RabbitMQ Web 管理界面,消息已经被消费: ![](https://img.kancloud.cn/53/1e/531e37c43f2304b0cef7880d8f8c5bf5_1891x708.png)