企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
>[info] 感谢Google翻译 [TOC] # "Hello World!" ## 简介 RabbitMQ是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定邮件先生或Mailperson女士最终会将邮件发送给您的收件人。在这个比喻中,RabbitMQ是邮箱,邮局和邮递员。 RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据 - 消息。 RabbitMQ和一般的消息传递使用了一些术语。 制作只不过是发送。发送消息的程序是生产者: ![](http://www.rabbitmq.com/img/tutorials/producer.png) 队列是RabbitMQ中的邮箱的名称。虽然消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者可以发送到一个队列的消息,并且许多消费者可以尝试从一个队列接收数据。这就是我们代表队列的方式: ![](http://www.rabbitmq.com/img/tutorials/queue.png) 消费与接受有类似的意义。消费者是一个主要等待接收消息的程序: ![](http://www.rabbitmq.com/img/tutorials/consumer.png) 请注意,生产者,消费者和代理不必驻留在同一主机上;实际上在大多数应用中他们没有。 ## "Hello World" ### (使用 the php-amqplib 客户端) 在本教程的这一部分中,我们将用PHP编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将掩盖[php-amqplib](https://github.com/php-amqplib/php-amqplib) API中的一些细节,专注于这个非常简单的事情才开始。它是消息传递的“Hello World”。 在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ代表消费者保留的消息缓冲区。 ![](http://www.rabbitmq.com/img/tutorials/python-one.png) >[danger]php-amqplib客户端库 RabbitMQ说多种协议。本教程介绍AMQP 0-9-1,它是一种开放的,通用的消息传递协议。RabbitMQ有许多不同语言的客户端。我们将在本教程中使用php-amqplib,并使用Composer进行依赖关系管理。 将composer.json文件添加到项目中: ```json { "require": { "php-amqplib/php-amqplib": ">=2.6.1" } } ``` 如果已安装Composer并且功能正常,则可以运行以下命令: ```shell composer.phar install ``` 现在我们安装了php-amqplib库,我们可以编写一些代码。 ### 发送 ![](http://www.rabbitmq.com/img/tutorials/sending.png) 我们将调用我们的消息发布者(发送者)`send.php` 和我们的消息接收者 `receive.php` 。发布者将连接到RabbitMQ,发送单个消息,然后退出。 在 `send.php` 中,我们需要包含库并使用必要的类: ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; ``` 然后我们可以创建到服务器的连接: ```php $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); ``` 连接抽象套接字连接,并为我们负责协议版本协商和身份验证等。这里我们连接到本地机器上的代理 - 因此是本地主机。如果我们想要连接到不同机器上的代理,我们只需在此处指定其名称或IP地址。 接下来,我们创建一个频道,这是完成任务的大部分API所在的位置。 要发送,我们必须声明一个队列供我们发送;然后我们可以向队列发布消息: ```php $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; ``` 声明队列是幂等的 - 只有在它不存在的情况下才会创建它。消息内容是一个字节数组,因此您可以编码任何您喜欢的内容。 最后,我们关闭了频道和连接; ```php $channel->close(); $connection->close(); ``` >[danger] 发送不起作用! 如果这是您第一次使用RabbitMQ并且没有看到“已发送”消息,那么您可能会感到头疼,想知道可能出现的问题。也许代理是在没有足够的可用磁盘空间的情况下启动的(默认情况下它至少需要200 MB空闲),因此拒绝接受消息。检查代理日志文件以确认并在必要时减少限制。配置文件文档将向您展示如何设置`disk_free_limit`。 ### 接收 那是我们的发布者。我们的接收器是来自RabbitMQ的推送消息,因此与发布单个消息的发布者不同,我们将保持其运行以侦听消息并将其打印出来。 ![](http://www.rabbitmq.com/img/tutorials/receiving.png) 代码(在`receive.php`中)具有几乎相同的include和用作send: ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; ``` 设置与发布者相同;我们打开一个连接和一个通道,并声明我们将要消耗的队列。请注意,这与发送到的队列匹配。 ```php $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; ``` 请注意,我们也在这里声明队列。因为我们可能会在发布者之前启动消费者,所以我们希望在尝试使用消息之前确保队列存在。 我们即将告诉服务器从队列中传递消息。我们将定义一个PHP回调函数,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户端的。 ```php $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } ``` 当`$channel`有回调,我们的代码将阻止。每当我们收到消息时,我们的`$callback`函数将传递收到的消息。 ### 归纳 现在我们可以运行这两个脚本。在终端中,运行消费者(接收者): ```php php receive.php ``` 然后,运行发布者(发件人): ```php php send.php ``` 消费者将通过RabbitMQ打印从发件人处获得的消息。接收器将继续运行,等待消息(使用Ctrl-C停止它),因此尝试从另一个终端运行发送器。 >[danger]列出队列 您可能希望看到RabbitMQ有哪些队列以及它们中有多少消息。您可以使用`rabbitmqctl`工具(作为特权用户)执行此操作: ```shell sudo rabbitmqctl list_queues ``` 在Windows上,省略sudo: ```shell rabbitmqctl.bat list_queues ``` # 工作队列 ## 工作队列 ![](http://www.rabbitmq.com/img/tutorials/python-two.png) 在第一篇教程中,我们编写了程序来发送和接收来自命名队列的消息。在这个中,我们将创建一个工作队列,用于在多个工作程序分配耗时的任务。 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作程序时,它们之间将共享任务。 这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。 ## 准备工作 在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整大小的图像或要渲染的pdf文件,所以让我们通过假装我们很忙来伪造它 - 使用`sleep()`函数。我们将字符串中的点数作为其复杂性;每个点都会占据“工作”的一秒钟。例如,Hello ...描述的假任务将花费三秒钟。 我们将稍微修改前一个示例中的send.php代码,以允许从命令行发送任意消息。该程序将任务安排到我们的工作队列,所以我们将其命名为`new_task.php`: ```php $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, '', 'hello'); echo ' [x] Sent ', $data, "\n"; ``` 我们旧的`receive.php`脚本还需要进行一些更改:它需要为消息体中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务,所以我们称之为`worker.php`: ```php $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); ``` 请注意,我们的假任务模拟执行时间。 像在教程一中那样运行它们: ```shell # shell 1 php worker.php ``` ```php # shell 2 php new_task.php "A very hard task which takes two seconds.." ``` ## 循环调度 使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作,我们可以添加更多进程,这样就可以轻松扩展。 首先,让我们尝试同时运行两个`worker.php`脚本。他们都会从队列中获取消息,但究竟如何呢?让我们来看看。 你需要打开三个控制台。两个将运行`worker.php`脚本。这些控制台将成为我们的两个消费者 - C1和C2。 ```shell # shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C ``` ```shell # shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C ``` 在第三个中,我们将发布新任务。启动消费者后,您可以发布一些消息: ```shell # shell 3 php new_task.php First message. php new_task.php Second message.. php new_task.php Third message... php new_task.php Fourth message.... php new_task.php Fifth message..... ``` 让我们看看交给我们进程的是什么: ```shell # shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' ``` ```shell # shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....' ``` 默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。与三个或更多进程一起尝试。 ## 消息确认 执行任务可能需要几秒钟。您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。在这种情况下,如果你杀死一个进程,我们将丢失它刚刚处理的消息。我们还将丢失分发给这个特定进程但尚未处理的所有消息。 但我们不想失去任何任务。如果进程死亡,我们希望将任务交付给另一个进程。 为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。 如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解消息未完全处理并将重入队列。如果其他消费者同时在线,则会迅速将其重新发送给其他消费者。这样你就可以确保没有消息丢失,即使进程偶尔会死亡。 没有任何消息超时;当消费者死亡时,RabbitMQ将重新发送消息。即使处理消息需要非常长的时间,也没关系。 默认情况下,消息确认已关闭。是时候通过basic_consume将第四个参数设置为false来打开它们(true表示没有确认),并在完成任务后从进程发送适当的确认。 ```php $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('task_queue', '', false, false, false, false, $callback); ``` 使用此代码,我们可以确定即使您在处理消息时使用CTRL + C杀死一个进程,也不会丢失任何内容。进程死后不久,所有未经确认的消息将被重新传递。 确认必须在收到的交付的同一信道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。 >[danger]被遗忘的确认 错过ack是一个常见的错误。这是一个简单的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。 为了调试这种错误,您可以使用`rabbitmqctl`来打印`messages_unacknowledged`字段: ```shell sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ``` 在Windows上,删除sudo: ```powershell rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged ``` ## 消息持久性 我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。 当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。 首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给`queue_declare`为`true`: ```php $channel->queue_declare('hello', false, true, false, false); ``` 虽然此命令本身是正确的,但它在我们当前的设置中不起作用。那是因为我们已经定义了一个名为hello的队列,这个队列不耐用。RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如`task_queue`: ```php $channel->queue_declare('task_queue', false, true, false, false); ``` 此标志设置为`true`需要应用于生产者和消费者代码。 此时我们确信即使RabbitMQ重新启动,`task_queue`队列也不会丢失。现在我们需要将消息标记为持久性 - 通过设置`delivery_mode = 2`消息属性,AMQPMessage将其作为属性数组的一部分。 ```php $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); ``` >[danger]有关消息持久性的注释 将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用发布者确认。 ## 公平派遣 您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工作进程的情况下,当所有奇数消息都很重或者消息很轻量时,一个进程将经常忙碌而另一个工作进程几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。 发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。 ![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png) 为了解决弊端我们可以使用`basic_qos`方法和`prefetch_count = 1`设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的进程。 ```php $channel->basic_qos(null, 1, null); ``` >[danger]关于队列大小的说明 如果所有woker都很忙,您的队列就会填满。您将需要密切关注这一点,并可能添加更多woker,或者采取其他策略。 ## 归纳 `new_task.php`文件的最终代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close(); ``` `worker.php`: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ``` 您设置工作队列可以使用消息确认和预取。即使RabbitMQ重新启动,持久性选项也可以使任务生效。 # 发布/订阅 ## 发布/订阅 在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都交付给一个进程。在这一部分,我们将做一些完全不同的事情 - 我们将向多个消费者传递信息。此模式称为“发布/订阅”。 为了说明这种模式,我们将构建一个简单的日志记录系统。它将包含两个程序 - 第一个将发出日志消息,第二个将接收和打印它们。 在我们的日志记录系统中,接收程序的每个运行副本都将获取消息。这样我们就可以运行一个接收器并将日志定向到磁盘;同时我们将能够运行另一个接收器并在屏幕上看到日志。 基本上,发布的日志消息将被广播给所有接收者。 ## 交换机 在本教程的前几部分中,我们向队列发送消息和从队列接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。 让我们快速浏览前面教程中介绍的内容: - 生产者是发送消息的用户应用程序。 - 队列是存储消息的缓冲区。 - 消费者是接收消息的用户应用程序。 RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。 相反,生产者只能向交换机发送消息。交换机是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面将它们推送到队列。交换机必须确切知道如何处理收到的消息。它应该附加到特定队列吗?它应该附加到许多队列吗?或者它应该被丢弃。其规则由交换机类型定义。 ![](http://www.rabbitmq.com/img/tutorials/exchanges.png) 有几种交换类型可供选择:直连,主题,headers和扇形。我们将专注于最后一个 - 扇形。让我们创建一个这种类型的交换机,并将其称为日志: ```php $channel->exchange_declare('logs', 'fanout', false, false, false); ``` 扇形交换机非常简单。正如您可能从名称中猜到的那样,它只是将收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。 >[danger]列出交换机 要列出服务器上的交换,您可以运行有用的rabbitmqctl: ```shell sudo rabbitmqctl list_exchanges ``` 在此列表中将有一些amq。*交换和默认(未命名)交换。这些是默认创建的,但目前您不太可能需要使用它们。 >[danger]默认交换机 在本教程的前几部分中,我们对交换机一无所知,但仍能够向队列发送消息。这是可能的,因为我们使用的是默认交换机,我们通过空字符串(“”)来识别。 回想一下我们之前是如何发布消息的: ```php $channel->basic_publish($msg, '', 'hello'); ``` 这里我们使用默认或匿名交换机:消息被路由到具有routing_key指定的名称的队列(如果存在)。路由键是basic_publish的第三个参数。 现在,我们可以发布到我们的命名交换机: ```php $channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs'); ``` ## 临时队列 您可能还记得以前我们使用的是具有指定名称的队列(请记住`hello`和`task_queue`)。能够命名队列对我们来说至关重要 - 我们需要将worker指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名很重要。 但我们的记录器并非如此。我们希望了解所有日志消息,而不仅仅是它们的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。 首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。为此,我们可以使用随机名称创建队列,或者更好 - 让服务器为我们选择随机队列名称。 其次,一旦我们断开消费者,就应该自动删除队列。 在php-amqplib客户端中,当我们将队列名称作为空字符串提供时,我们使用生成的名称创建一个非持久队列: ```php list($queue_name, ,) = $channel->queue_declare(""); ``` 方法返回时,`$queue_name`变量包含RabbitMQ生成的随机队列名称。例如,它可能看起来像`amq.gen-JzTY20BRgKO-HjmUJj0wLg`。 当声明它的连接关闭时,队列将被删除,因为它被声明为独占。您可以在队列指南中了解有关独占标志和其他队列属性的更多信息。 ## 绑定 ![](http://www.rabbitmq.com/img/tutorials/bindings.png) 我们已经创建了一个扇形交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定。 ```php $channel->queue_bind($queue_name, 'logs'); ``` 从现在开始,日志交换机会将消息附加到我们的队列中。 >[danger]列出绑定 您可以使用,您猜对了,列出现有绑定 ```shell rabbitmqctl list_bindings ``` ## 归纳 ![](http://www.rabbitmq.com/img/tutorials/python-three-overall.png) 生成日志消息的生产者程序与前一个教程没有太大的不同。最重要的变化是我们现在想要将消息发布到我们的日志交换机而不是匿名交换机。这里是`emit_log.php`脚本的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close(); ``` 如您所见,在建立连接后我们声明了交换机。此步骤是必要的,因为禁止发布到不存在的交换机。 如果没有队列绑定到交换机,消息将会丢失,但这对我们没有问题;如果没有消费者在听,我们可以安全地丢弃该消息。 `receive_logs.php`的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ``` 如果要将日志保存到文件,只需打开控制台并键入: ```shell php receive_logs.php > logs_from_rabbit.log ``` 如果您希望在屏幕上看到日志,请生成一个新终端并运行: ```shell php receive_logs.php ``` 当然,要发出日志类型: ```shell php emit_log.php ``` 使用`rabbitmqctl list_bindings`,您可以验证代码是否实际创建了我们想要的绑定和队列。运行两个`receive_logs.php`程序时,您应该看到类似的内容: ```shell sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done. ``` 结果的解释很简单:来自交换日志的数据转到两个具有服务器分配名称的队列。而这正是我们的意图。 # 路由 ## 路由 在上一个教程中,我们构建了一个简单的日志系统我们能够向许多接收者广播日志消息。 在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 ## 绑定 在前面的例子中,我们已经在创建绑定。您可能会记得以下代码: ```shell $channel->queue_bind($queue_name, 'logs'); ``` 绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换机的消息感兴趣。 绑定可以采用额外的`routing_key`参数。为了避免与`$channel :: basic_publish`参数混淆,我们将其称为绑定密钥。这就是我们如何使用键创建绑定: ```php $binding_key = 'black'; $channel->queue_bind($queue_name, $exchange_name, $binding_key); ``` 绑定密钥的含义取决于交换类型。我们之前使用的扇形交换只是忽略了它的价值。 ## 直连交换机 我们上一个教程中的日志记录系统向所有消费者广播所有消息。我们希望扩展它以允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。 我们使用的是扇形交换机,它没有给我们太大的灵活性 - 它只能进行无意识的广播。 我们将使用直连交换机。直连交换机背后的路由算法很简单 - 消息进入队列,其绑定密钥与消息的路由密钥完全匹配。 为了说明这一点,请考虑以下设置: ![](http://www.rabbitmq.com/img/tutorials/direct-exchange.png) 在此设置中,我们可以看到直接交换机X与两个绑定到它的队列。第一个队列绑定`orange`,第二个绑定有两个绑定,一个绑定密钥为`black`,另一个绑定为`green`。 在这样的设置中,使用路由密钥`orange`发布到交换机的消息将被路由到队列Q1。路由键为`balck`或`green`的消息将转到Q2。所有其他消息将被丢弃。 ## 多个绑定 ![](http://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png) 使用相同的绑定密钥绑定多个队列是完全合法的。在我们的例子中,我们可以在X和Q1之间添加绑定键黑色的绑定。在这种情况下,直连交换机将表现得像扇形交换机一样,并将消息广播到所有匹配的队列。路由密钥为黑色的消息将传送到Q1和Q2。 ## 发送日志 我们将此模型用于我们的日志系统。我们会将消息发送给直连交换机,而不是扇形。我们将提供日志严重性作为路由密钥。这样接收脚本将能够选择它想要接收的严重性。让我们首先关注发送日志。 一如既往,我们需要先创建一个交换: ```php $channel->exchange_declare('direct_logs', 'direct', false, false, false); ``` 我们已准备好发送消息: ```php $channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->basic_publish($msg, 'direct_logs', $severity); ``` 为简化起见,我们假设“严重性”可以是“info”,“warning”,“error”之一。 ## 订阅 接收消息将像上一个教程一样工作,但有一个例外 - 我们将为我们感兴趣的每个严重性创建一个新的绑定。 ```php foreach ($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } ``` ## 归纳 ![](http://www.rabbitmq.com/img/tutorials/python-four.png) `emit_log_direct.php`类的代码: ````php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo ' [x] Sent ', $severity, ':', $data, "\n"; $channel->close(); $connection->close(); ```` `receive_logs_direct.php`的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if (empty($severities)) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach ($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ``` 如果您只想将“warning”和“error”(而非“info”)日志消息保存到文件,只需打开控制台并键入: ```shell php receive_logs_direct.php warning error > logs_from_rabbit.log ``` 如果您想在屏幕上看到所有日志消息,请打开一个新终端并执行以下操作: ```shell php receive_logs_direct.php info warning error # => [*] Waiting for logs. To exit press CTRL+C ``` 并且,例如,要发出错误日志消息,只需键入: ```php php emit_log_direct.php error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.' ``` # 主题 ## 主题 在上一个教程中,我们改进了日志系统。我们使用的是直连的,而不是使用只能进行虚假广播的扇形交换机,并且有可能选择性地接收日志。 虽然使用直连交换机改进了我们的系统,但它仍然有局限性 - 它不能基于多个标准进行路由。 在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅日志。您可能从syslog unix工具中了解这个概念,该工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。 这会给我们带来很大的灵活性 - 我们可能想听听来自'cron'的关键错误以及来自'kern'的所有日志。 要在我们的日志系统中实现这一点,我们需要了解更复杂的主题交换机。 ## 主题交换机 发送到主题交换机的消息不能具有任意routing_key - 它必须是由`点`分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密钥中可以包含任意数量的单词,最多可达255个字节。 绑定密钥也必须采用相同的形式。主题交换机背后的逻辑类似于直连交换 - 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是,绑定键有两个重要的特殊情况: - *(星号)可以替代一个单词。 - #(hash)可以替换零个或多个单词。 最简单的示例如下: ![](http://www.rabbitmq.com/img/tutorials/python-five.png) 在这个例子中,我们将发送所有描述动物的消息。消息将与包含三个单词(两个点)的路由键一起发送。路由键中的第一个单词将描述速度,第二个是颜色,第三个是物种:“<speed>.<color>.<species>”。 这些绑定可以概括为: Q1对所有橙色动物感兴趣。 Q2希望听到关于兔子的一切,以及关于懒惰动物的一切。 路由密钥设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面,“quick.orange.fox”只会进入第一个队列,而“lazy.brown.fox”只会进入第二个队列。“lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定。“quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。 如果我们违反约定并发送带有一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不匹配任何绑定,将丢失。 另一方面,“lazy.orange.male.rabbit”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。 >[danger]主题交换机 主题交换机功能强大,可以像其他交换机一样。 当队列与“#”(哈希)绑定密钥绑定时 - 它将接收所有消息,而不管路由密钥 - 如扇形交换机。 当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换的行为就像直连交换机一样。 ## 归纳 我们将在日志记录系统中使用主题交换。我们将首先假设日志的路由键有两个词:“<facility>.<severity>”。 代码与上一个教程中的代码几乎相同。 `emit_log_topic.php`的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo ' [x] Sent ', $routing_key, ':', $data, "\n"; $channel->close(); $connection->close(); ``` `receive_logs_topic.php`的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if (empty($binding_keys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach ($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ``` 要接收所有日志: ```shell php receive_logs_topic.php "#" ``` 要从设施“kern”接收所有日志: ```php php receive_logs_topic.php "kern.*" ``` 或者,如果您只想听到“critical”日志: ```shell php receive_logs_topic.php "*.critical" ``` 您可以创建多个绑定: ```shell php receive_logs_topic.php "kern.*" "*.critical" ``` 并使用路由键“kern.critical”类型发出日志: ```shell php emit_log_topic.php "kern.critical" "A critical kernel error" ``` 玩这些程序玩得开心。请注意,代码不会对路由或绑定密钥做出任何假设,您可能希望使用两个以上的路由密钥参数。 # RPC ## 远程过程调用 (RPC) 在第二篇教程中,我们学习了如何使用工作队列在多个worker之间分配耗时的任务。 但是如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,这是一个不同的故事。此模式通常称为远程过程调用或RPC。 在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci数字的虚拟RPC服务。 ### 客户端界面 为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止: ```php $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo ' [.] Got ', $response, "\n"; ``` >[danger]关于RPC的说明 尽管RPC在计算中是一种非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,会出现问题。这样的混淆导致系统不可预测,并增加了调试的不必要的复杂性。错误使用RPC可以导致不可维护的意大利面条代码,而不是简化软件。 考虑到这一点,请考虑以下建议: - 确保显而易见哪个函数调用是本地的,哪个是远程的。 - 记录您的系统。使组件之间的依赖关系变得清晰。 - 处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应? 如有疑问,请避免使用RPC。如果可以,您应该使用异步管道 - 而不是类似RPC的阻塞,将结果异步推送到下一个计算阶段。 ### 回调队列 一般来说,通过RabbitMQ进行RPC很容易。客户端发送请求消息,服务器回复响应消息。为了接收响应,我们需要发送带有请求的“回调”队列地址。我们可以使用默认队列。我们来试试吧: ```php list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array('reply_to' => $queue_name) ); $channel->basic_publish($msg, '', 'rpc_queue'); # ... then code to read a response message from the callback_queue ... ``` >[danger]消息属性 AMQP 0-9-1协议预定义了一组带有消息的14个属性。大多数属性很少使用,但以下情况除外: `delivery_mode`:将消息标记为持久性(值为2)或瞬态(1)。您可能会记住第二个教程中的这个属性。 `content_type`:用于描述编码的mime类型。例如,对于经常使用的JSON编码,将此属性设置为:`application / json`是一种很好的做法。 `reply_to`:通常用于命名回调队列。 `correlation_id`:用于将RPC响应与请求相关联。 ### 相关ID 在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有更好的方法 - 让我们为每个客户端创建一个回调队列。 这引发了一个新问题,在该队列中收到响应后,不清楚响应属于哪个请求。那是在使用`correlation_id`属性的时候。我们将为每个请求将其设置为唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看此属性,并根据该属性,我们将能够将响应与请求进行匹配。如果我们看到未知的`correlation_id`值,我们可以安全地丢弃该消息 - 它不属于我们的请求。 您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败并出现错误?这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能会在向我们发送答案之后,但在发送请求的确认消息之前死亡。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。 ### 总结 ![](http://www.rabbitmq.com/img/tutorials/python-six.png) 我们的RPC将这样工作: 客户端启动时,会创建一个匿名的独占回调队列。 对于RPC请求,客户端发送带有两个属性的消息:`reply_to`(设置为回调队列)和`correlation_id`(设置为每个请求的唯一值)。 请求被发送到`rpc_queue`队列。 RPC worker(aka:server)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用`reply_to`字段中的队列将带有结果的消息发送回客户端。 客户端等待回调队列上的数据。出现消息时,它会检查`correlation_id`属性。如果它与请求中的值匹配,则返回对应用程序的响应。 ## 归纳 斐波纳契任务: ```php function fib($n) { if ($n == 0) { return 0; } if ($n == 1) { return 1; } return fib($n-1) + fib($n-2); } ``` 我们声明我们的斐波那契函数。它假定只有有效的正整数输入。(不要指望这个适用于大数字,它可能是最慢的递归实现)。 我们的RPC服务器`rpc_server.php`的代码如下所示: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('rpc_queue', false, false, false, false); function fib($n) { if ($n == 0) { return 0; } if ($n == 1) { return 1; } return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\n"; $callback = function ($req) { $n = intval($req->body); echo ' [.] fib(', $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to') ); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag'] ); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ``` 服务器代码非常简单: - 像往常一样,我们首先建立连接,通道和声明队列。 - 我们可能希望运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在`$channel.basic_qos`中设置`prefetch_count`设置。 - 我们使用`basic_consume`来访问队列。然后我们进入`while`循环,在其中我们等待请求消息,完成工作并发回响应。 我们的RPC客户端rpc_client.php的代码: ```php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class FibonacciRpcClient { private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' ); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false ); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array( $this, 'onResponse' ) ); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array( 'correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue ) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo ' [.] Got ', $response, "\n"; ``` 现在是查看`rpc_client.php`和`rpc_server.php`的完整示例源代码的好时机。 我们的RPC服务现已准备就绪。我们可以启动服务器: ```shell php rpc_server.php # => [x] Awaiting RPC requests ``` 要请求斐波纳契数,请运行客户端: ```shell php rpc_client.php # => [x] Requesting fib(30) ``` 此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势: - 如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。尝试在新控制台中运行第二个`rpc_server.php`。 - 在客户端,RPC只需要发送和接收一条消息。不需要像`queue_declare`这样的同步调用。因此,对于单个RPC请求,RPC客户端只需要一次网络往返。 我们的代码仍然相当简单,并不试图解决更复杂(但重要)的问题,例如: - 如果没有运行服务器,客户应该如何反应? - 客户端是否应该为RPC设置某种超时? - 如果服务器出现故障并引发异常,是否应将其转发给客户端? - 在处理之前防止无效的传入消息(例如检查边界,键入)。