虽然我们在上一节中描述的AMQP模型是通用的并且适用于所有实现,但当我们进入资源管理时,细节特定于代理实现。 因此,在本节中,我们将重点关注仅存在于“spring-rabbit”模块中的代码,因为此时,RabbitMQ是唯一受支持的实现。
管理与RabbitMQ代理的连接的核心组件是`ConnectionFactory`接口。 `ConnectionFactory`实现的职责是提供`org.springframework.amqp.rabbit.connection.Connection`的实例,它是`com.rabbitmq.client.Connection`的包装器。我们提供的唯一具体实现是`CachingConnectionFactory`,默认情况下,它建立一个可由应用程序共享的单个连接代理。共享连接是可能的,因为与AMQP进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于JMS中Connection和Session之间的关系)。可以想象,连接实例提供了`createChannel`方法。 `CachingConnectionFactory`实现支持对这些通道进行缓存,并根据通道是否为事务性为通道维护单独的缓存。在创建`CachingConnectionFactory`的实例时,可以通过构造函数提供主机名。还应提供用户名和密码属性。如果要配置通道缓存的大小(默认值为25),也可以在此处调用`setChannelCacheSize()`方法。
从版本1.3开始,`CachingConnectionFactory`可以配置为缓存连接以及通道。 在这种情况下,每次调用`createConnection()`都会创建一个新连接(或从缓存中检索一个空闲连接)。 关闭连接会将其返回到缓存(如果尚未达到缓存大小)。 在此类连接上创建的通道也会被缓存。 在某些环境中使用单独的连接可能很有用,例如从HA集群中使用,与负载均衡器一起使用,以连接到不同的集群成员。 将`cacheMode`设置为`CacheMode.CONNECTION`。
>这不限制连接数,它指定允许的空闲打开连接数。
>
从1.5.5版开始,提供了一个新属性`connectionLimit`。 设置此选项后,它将限制允许的连接总数。 设置后,如果达到限制,则使用`channelCheckoutTimeLimit`等待连接变为空闲。 如果超过时间,则抛出`AmqpTimeoutException`。
>当缓存模式为CONNECTION时,不支持自动声明队列等(请参阅“自动声明交换,队列和绑定”一节)。
>此外,在编写本文时,rabbitmq-client库默认为每个连接(5个线程)创建一个固定的线程池。 使用大量连接时,应考虑在CachingConnectionFactory上设置自定义执行程序。 然后,所有连接都将使用相同的执行程序,并且可以共享其线程。 执行程序的线程池应该是无限制的,或者为预期的利用率设置适当的(通常,每个连接至少有一个线程)。 如果在每个连接上创建多个通道,则池大小将影响并发性,因此变量(或简单缓存)线程池执行程序将是最合适的。
>
重要的是要理解缓存大小(默认情况下)不是限制,而只是可以缓存的通道数。 如果缓存大小为10,则实际上可以使用任意数量的通道。 如果正在使用10个以上的通道并将它们全部返回到缓存中,则10将进入缓存; 其余的将在物理上关闭。
从版本1.6开始,默认通道缓存大小从1增加到25.在大容量,多线程环境中,小缓存意味着以高速率创建和关闭通道。 增加默认缓存大小将避免此开销。 您应该通过RabbitMQ管理UI监视正在使用的通道,如果您看到许多通道正在创建和关闭,请考虑进一步增加缓存大小。 缓存只会按需增长(以满足应用程序的并发要求),因此此更改不会影响现有的低容量应用程序。
从版本1.4.2开始,`CachingConnectionFactory`具有属性`channelCheckoutTimeout`。 当此属性大于零时,`channelCacheSize`将成为可以在连接上创建的通道数的限制。 如果达到限制,则调用线程将阻塞,直到通道可用或达到此超时,在这种情况下抛出`AmqpTimeoutException`。
>框架内使用的通道(例如`RabbitTemplate`)将可靠地返回到缓存。 如果在框架之外创建通道(例如,通过直接访问连接并调用`createChannel()`),则必须可靠地(通过关闭)返回它们(可能在`finally`块中),以避免耗尽通道。
>
~~~java
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
~~~
使用xml配置大概如下:
~~~xml
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
~~~
>还有一个SingleConnectionFactory实现,它只在框架的单元测试代码中可用。 它比CachingConnectionFactory更简单,因为它不缓存通道,但由于缺乏性能和弹性,它不适用于简单测试之外的实际使用。 如果由于某种原因发现需要实现自己的ConnectionFactory,则AbstractConnectionFactory基类可以提供一个很好的起点。
>
可以使用rabbit命名空间快速方便地创建ConnectionFactory:
~~~xml
<rabbit:connection-factory id="connectionFactory"/>
~~~
在大多数情况下,这将是更好的选择,因为框架可以为您选择最佳默认值。 创建的实例将是CachingConnectionFactory。 请记住,通道的默认缓存大小为25.如果要缓存更多通道,请通过channelCacheSize属性设置更大的值。 在XML中它看起来像这样:
~~~xml
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
~~~
使用命名空间,您只需添加channel-cache-size属性:
~~~
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
~~~
默认缓存模式是CHANNEL,但您可以将其配置为缓存连接; 在这种情况下,我们使用`connection-cache-size`:
~~~
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
~~~
可以使用命名空间提供主机和端口属性
~~~
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
~~~
或者,如果在集群环境中运行,请使用addresses属性。
~~~
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672"/>
~~~
这是一个自定义线程工厂的示例,它使用rabbitmq-作为线程名称的前缀。
~~~xml
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
~~~
>当应用程序配置单个CachingConnectionFactory时,默认情况下使用Spring Boot自动配置,当Broker阻止连接时,应用程序将停止工作。 当它被经纪人阻止时,其任何客户都会停止工作。 如果我们在同一个应用程序中有生产者和消费者,那么当生产者阻止连接时,我们可能会遇到死锁,因为Broker上没有资源,消费者因为连接被阻止而无法释放它们。 为了缓解这个问题,只需要一个具有相同选项的单独的CachingConnectionFactory实例 - 一个用于生产者,一个用于消费者。 不建议事务生产者使用单独的CachingConnectionFactory,因为它们应该重用与消费者事务关联的
>
从版本2.0.2开始,`RabbitTemplate`具有自动使用第二个连接工厂的配置选项,除非正在使用事务。 有关详细信息,请参阅“使用单独连接”一节。 发送者连接的`ConnectionNameStrategy`与调用该方法的结果附加`.publisher`的主策略相同。
从版本1.7.7开始,提供`AmqpResourceNotAvailableException`,当`SimpleConnection.createChannel()`无法创建`Channel`时,会抛出AmqpResourceNotAvailableException,例如,因为达到了`channelMax`限制且缓存中没有可用的通道。 可以在RetryPolicy中使用此异常来在某些退避后恢复操作。
- 1.前言
- 2.介绍
- 2.1 快速浏览
- 3.参考
- 3.1 使用spring amqp
- 3.1.1 AMQP抽象
- 3.1.2 资源的连接和管理
- 介绍
- 配置底层客户端连接工厂
- RabbitConnectionFactoryBean和配置SSL
- 路由连接工厂
- 队列亲和力和LocalizedQueueConnectionFactory
- 发送确认和返回
- 3.1.3 添加自定义客户端连接属性
- 3.1.4 AmqpTemplate
- 介绍
- 添加重试功能
- 发送消息是异步的 - 如何检测成功和失败
- 发布的确认和返回
- 3.1.5 发送消息
- 介绍
- 消息构建 API
- 发布的返回
- 3.1.6 接收消息
- 介绍
- 轮询消费者
- 异步消费者