## 好文 * [扪心自问,你真的熟练掌握MQ了吗](https://mp.weixin.qq.com/s/gqFVeZIE6nfYQd57pQkfaA) ## 使用Docker构建RabbitMQ 查找镜像 ``` docker search rabbitmq ``` 拉取镜像 ``` docker pull rabbitmq:3.7.16-management ``` > 默认情况下,会拉取rabbitmq的latest版本。 > 这里拉取 Web浏览器管理页面的tag `3.7.16-management`【管理插件】 启动镜像 ``` docker run -p 15672:15672 -p 5672:5672 -d --hostname dnmp-rabbitmq --name dnmp-rabbitmq --network dnmp_backend -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.16-management ``` > 参数解释 * `15672` :表示 RabbitMQ 控制台端口号,可以在浏览器中通过控制台来执行 RabbitMQ 的相关操作。 * `5672 `: 表示 RabbitMQ 所监听的 TCP 端口号,应用程序可通过该端口与 RabbitMQ 建立 TCP 连接,完成后续的异步消息通信 * `RABBITMQ_DEFAULT_USER`:用于设置登陆控制台的用户名,这里我设置 `admin ` * `RABBITMQ_DEFAULT_PASS`:用于设置登陆控制台的密码,这里我设置 `admin `容器启动成功后,可以在浏览器输入地址:`http://ip:15672/ `访问控制台 ## PHP 客户端库 > 以下以ThinkPHP5.1 框架为测试环境 #### 安装扩展库 [php-amqplib](https://github.com/php-amqplib/php-amqplib) composer installed ``` docker run --rm --interactive --tty -v e:/dnmp/www/iot.tinywan.com:/app composer require php-amqplib/php-amqplib v2.9.0 --ignore-platform-reqs ``` ## 第一章 开始代码 ##### 消息发布者(发送者) > `mq_send.php` 脚本 ``` #!/usr/bin/env php <?php namespace think; define('APP_PATH', __DIR__ . '/application/'); require __DIR__ . '/thinkphp/base.php'; Container::get('app',[APP_PATH])->bind('http/RabbitMq/send')->run()->send(); ``` >[warning] 业务代码 ``` public function send() { $connection = new AMQPStreamConnection('dnmp-rabbitmq', 5672, 'admin', 'admin'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); } ``` >[danger] 注意: > 1、 `user `和 `password `就是docker启动时候的 `RABBITMQ_DEFAULT_USER=admin`和 `RABBITMQ_DEFAULT_PASS=admin`。前面我们设置的 `admin`和`admin` > 2、连接主机`host`是`dnmp-rabbitmq`,由于是在docker容器之内 >[info] 在终端中,运行消费者(接收者) ``` > docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_receive.php" [*] Waiting for messages. To exit press CTRL+C [x] Received Hello World! [x] Received Hello World! [x] Received Hello World! [x] Received Hello World! ``` ##### 消息接收者(发送者) `mq_receive.php`脚本 ``` #!/usr/bin/env php <?php namespace think; define('APP_PATH', __DIR__ . '/application/'); require __DIR__ . '/thinkphp/base.php'; Container::get('app',[APP_PATH])->bind('http/RabbitMq/receive')->run()->send(); ``` >[warning] 业务代码 ``` public function receive() { $connection = new AMQPStreamConnection('dnmp-rabbitmq', 5672, 'admin', 'admin'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } ``` >[info] 在终端中,运行发布者(发件人) ``` > docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_send.php" [x] Sent 'Hello World!' ``` ## 第二章 队列 您可能希望看到RabbitMQ有哪些队列以及它们中有多少消息。您可以使用rabbitmqctl工具(作为特权用户)执行此操作: ``` docker exec -it dnmp-rabbitmq sh -c "rabbitmqctl list_queues" Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages hello 0 ``` ### 循环调度 使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作积压,我们可以添加更多工人,这样就可以轻松扩展。 首先,让我们尝试同时运行两个`mq_worker.php`脚本。他们都会从队列中获取消息,但究竟如何呢?让我们来看看。 你需要打开三个控制台。两个将运行`mq_worker.php`脚本。这些游戏机将成为我们的两个消费者 - `C1`和`C2`。 > 第一个终端 ``` PS E:\dnmp> echo C1 C1 PS E:\dnmp> docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_work.php" [*] Waiting for messages. To exit press CTRL+C [x] Received First message [x] Done [x] Received Third message [x] Done [x] Received Fifth message [x] Done ``` > 第二个终端 ``` PS E:\dnmp> echo C2 C2 PS E:\dnmp> docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_work.php" [*] Waiting for messages. To exit press CTRL+C [x] Received First message [x] Done [x] Received Third message [x] Done [x] Received Fifth message [x] Done ``` >[warning] 第三个终端 我们将发布新任务。启动消费者后,您可以发布一些消息: ``` docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_new_task.php First message. " docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_new_task.php Second message. " docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_new_task.php Third message. " docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_new_task.php Fourth message. " docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_new_task.php Fifth message. " ``` #### 查看其他两个worker是如何接受消息的 ``` > docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_work.php" [*] Waiting for messages. To exit press CTRL+C [x] Received First message [x] Done [x] Received Third message [x] Done [x] Received Fifth message [x] Done ``` ``` > docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_work.php" [*] Waiting for messages. To exit press CTRL+C [x] Received Second message [x] Done [x] Received Fourth message [x] Done ``` > 默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。与三个或更多工人一起尝试。 ### 消息确认 执行任务可能需要几秒钟。你可能想知道如果其中一个消费者开始一项长期任务并且只是部分完成而死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向消费者发送消息,它立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失分发给这个特定工作者但尚未处理的所有消息。 但我们不想失去任何任务。如果工人死亡,我们希望将任务交付给另一名工人。 为了确保消息永不丢失,RabbitMQ支持[消息*确认*](https://www.rabbitmq.com/confirms.html)。消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。 如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解消息未完全处理并将重新排队。如果同时有其他在线消费者,则会迅速将其重新发送给其他消费者。这样你就可以确保没有消息丢失,即使工人偶尔会死亡。 没有任何消息超时;当消费者死亡时,RabbitMQ将重新发送消息。即使处理消息需要非常长的时间,也没关系。 默认情况下,消息确认已关闭。现在是时候通过设置第四个参数来打开它们`basic_consume`到假(`true`表示*`没有ACK*`),并从工作人员发送适当的确认,一旦我们有一个任务来完成。 #### `即将被消费`和`未被消费`的消息查看 ``` > docker exec -it dnmp-rabbitmq sh -c "rabbitmqctl list_queues name messages_ready messages_unacknowledged" Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged task_queue 4 0 hello 0 0 ``` > `messages_ready `:即将被消费 > `messages_unacknowledged `:消费但是未被确认 > 这时候如果开启一个work,则会全部被消费掉 ``` > docker exec -it dnmp-php72 sh -c "php /var/www/iot.tinywan.com/mq_work.php" [*] Waiting for messages. To exit press CTRL+C [x] Received Fifth message [x] Done [x] Received Fifth message [x] Done [x] Received Fifth message [x] Done [x] Received Fifth message [x] Done ``` 再次查看,准备的消息已经全部被消费完了 ``` > docker exec -it dnmp-rabbitmq sh -c "rabbitmqctl list_queues name messages_ready messages_unacknowledged" Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged task_queue 0 0 hello 0 0 ``` ### 消息持久性 我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。 当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。 首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给`queue_declare`为`true`: ``` $channel->queue_declare('hello', false, true, false, false); ``` 这里如果实验 1、关闭RabbitMQ服务器:`docker stop dnmp-rabbitmq` 2、重启RabbitMQ服务器:`docker start dnmp-rabbitmq` 3、重新查看消息同时消费消息,消息是否已经持久化在数据文件中`rabbitmqctl list_queues name messages_ready messages_unacknowledged` 4、开启一个worker 消费消息,看看消息是否还在 ## 第三章 发布订阅 #### 交易所 让我们快速浏览前面教程中介绍的内容: * 甲*生产者*是发送消息的用户的应用程序。 * 甲*队列*是存储消息的缓冲器。 * 甲*消费者*是接收消息的用户的应用程序。 RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。