企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Queue AMQP 中的 队列 的概念和其他消息队列中 队列 的概念类似, 它有如下几个重要的概念: Name, 名字 Durable, 是否是持久的. 当为真时, 即使 broker 重启时, 此 queue 也不会被删除. Exclusive, 是否是独占的, 当为真时, 表示此 queue 只能有一个消费者, 并且当此消费者的连接断开时, 此 queue 会被删除. Auto-delete, 当为真时, 此 队列 会在最后一个消费者取消订阅时被删除. 在使用一个 队列 时, 需要先进行声明. 如果我们声明的队列不存在, 那么 broker 就会自动创建它. 不过如果此队列已经存在时, 我们就需要注意了, 若我们声明的队列的属性和已存在的队列的属性一致, 则不会有任何的问题, 但是如果先后两次声明的队列的属性不一致, 则会有 PRECONDITION_FAILED 错误(错误码为406). 关于队列名 AMQP 的队列名不能以 “amq.” 开头, 因为这样的队列名是 AMQP broker 内部所使用的. 当我们使用了这样的队列名时, 那么会有一个 ACCESS_REFUSED 错误(错误码为 403) 关于持久队列 持久队列会被持久化到磁盘中, 因此即使 broker 重启了, 持久队列也依然存在.不过需要注意的是, 不要将持久队列和消息的持久化混淆. 当 broker 重启时, 持久队列会自动重新声明, 然而只有队列中的持久化消息(persistent message)才会被恢复. 队列的绑定 队列的绑定关系是 exchagne 用于消息路由的规则, 即一个 exchange 能够将消息路由到某个队列的前提是此队列已经绑定到这个 exchange 中了. 当队列绑定到一个 exchange 中时, 我们还可以设置一个额外的参数, 即 route key, 这个 key 会被 direct exchange 和 topic exchange 作为额外的路由信息而使用, 换句话说, route key 扮演着过滤器的角色.当一个消息没有被路由到任意的队列时(例如此 exchange 没有任何的 queue 绑定着), 那么此时会根据消息的属性来决定是将此消息丢弃还是返回给生产者. 消费者 AMQP 0-9-1 支持两种消息分发模式: push 模式, 即 broker 主动推送消息给消费者 pull 模式, 即消费者主动从 broker 中拉取消息. 在 push 模式时, 应用程序需要告知 broker 它对哪些消息感兴趣, 即也就是我们所说的订阅一个消息主题. 每个消费者都有一个惟一的标识符, 即consumer tag, 我们可以用这个 tag 来取消一个消费者对某个主题的订阅(unsubscribe). 消息的 ACK AMQP 0-9-1 有两种消息 ACK 模式: 自动 ACK 模式 手动 ACK 模式 在自动 ACK 模式下, 当 broker 发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的 ACK 回复. 而在手动 ACK 模式下, 当 broker 发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动 ACK 模式下, 当消费者收到消息并处理完成后, 需要向 broker 显示地发送 ACK 指令.在手动 ACK 模式下, 如果消费者因为意外的 crash 而没有发送 ACK 给 broker, 那么此时 broker 会将此消息转发给其他的消费者(如果此时没有消费者了, 那么 broker 会缓存此消息, 直到有新的消费者注册). 拒绝消息 当一个消费者处理消息失败或此时不能处理消息时, 那么可以给 broker 发送一个拒绝消息的指令, 并且可以要求 broker 丢弃或重新分发此消息.不过需要注意的是, 如果此时只有一个消费者, 那么当此消费者拒收消息并要求 broker 重新分发此消息时, 那么就会造成了此消息不断地分发和拒收, 形成了死循环. 预取消息 通过预取消息机制, 消费者可以一次性批量取出消息, 然后在处理后对这些批量消息进行统一的 ACK 回复, 这样可以提高消息的吞吐量.不过, 需要注意的时, RabbitMQ 仅支持 channel 级别的预取消息的数量配置, 不支持基于连接的预取消息数量配置. 连接 AMQP 的连接是长连接, 它是一个使用 TCP 作为可靠传输的应用层协议.