ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # RabbitmqAdmin使用 ~~~xml <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> </dependencies> ~~~ 容器中纳入ConnectionFactory和RabbitAdmin管理 ~~~java @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } } ~~~ 应用类,使用RabbitAdmin进行Exchange,Queue,Binding操作 ~~~java import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.HashMap; import java.util.Map; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class); System.out.println(rabbitAdmin); //创建四种类型的Exchange,可重复执行 rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false)); rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false)); rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false)); rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false)); //删除Exchange //rabbitAdmin.deleteExchange("zhihao.header.exchange"); //定义队列 rabbitAdmin.declareQueue(new Queue("zhihao.debug",true)); rabbitAdmin.declareQueue(new Queue("zhihao.info",true)); rabbitAdmin.declareQueue(new Queue("zhihao.error",true)); //删除队列 //rabbitAdmin.deleteQueue("zhihao.debug"); //将队列中的消息全消费掉 rabbitAdmin.purgeQueue("zhihao.info",false); //绑定,指定要绑定的Exchange和Route key rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.hehe",new HashMap())); rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.haha",new HashMap())); rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.welcome",new HashMap())); //绑定header exchange Map<String,Object> headerValues = new HashMap<>(); headerValues.put("type",1); headerValues.put("size",10); //whereAll指定了x-match: all参数 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")). to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match()); //whereAll指定了x-match: any参数 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")). to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match()); //进行解绑 rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")). to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info")); //声明topic类型的exchange rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false)); rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false)); //exchange与exchange绑定 rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE, "zhihao.miao.exchange","zhihao",new HashMap())); //使用BindingBuilder进行绑定 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")). to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao")); //rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE, //"amq.rabbitmq.log","zhihao",new HashMap())); context.close(); } } ~~~ **Exchange ,Queue,Binding的自动声明** 1. 直接把要自动声明的组件Bean纳入到spring容器中管理即可。 自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。 2. 自定声明支持单个和多个。 **自动声明Exchange**: ~~~java import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeclareConfig { //声明direct类型的Exchange @Bean public Exchange directExchange(){ return new DirectExchange("zhihao.direct.exchange",true,false); } //声明topic类型的Exchange @Bean public Exchange topicExchange(){ return new TopicExchange("zhihao.topic.exchange",true,false); } //声明fanout类型的Exchange @Bean public Exchange fanoutExchange(){ return new FanoutExchange("zhihao.fanout.exchange",true,false); } //声明headers类型的Exchange @Bean public Exchange headersExchange(){ return new HeadersExchange("zhihao.header.exchange",true,false); } } ~~~ 配置类,在spring容器中纳入ConnectionFactory实例和RabbitAdmin实例 ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } } ~~~ 启动应用类,自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。 ~~~ import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); //使得客户端第一次连接rabbitmq context.getBean(RabbitAdmin.class).getQueueProperties("**"); context.close(); } } ~~~ **队列的自动声明** ~~~java import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeclareConfig { @Bean public Queue debugQueue(){ return new Queue("zhihao.debug",true); } @Bean public Queue infoQueue(){ return new Queue("zhihao.info",true); } @Bean public Queue errorQueue(){ return new Queue("zhihao.error",true); } } ~~~ 上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台的队列生成 **绑定的自动生成** DeclareConfig类中, ~~~ import org.springframework.amqp.core.Binding; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DeclareConfig { @Bean public Binding binding(){ return new Binding("zhihao.debug",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.debug",new HashMap()); } @Bean public Binding binding2(){ return new Binding("zhihao.info",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.info",new HashMap()); } @Bean public Binding binding3(){ return new Binding("zhihao.error",Binding.DestinationType.QUEUE, "zhihao.direct.exchange","zhihao.error",new HashMap()); } } ~~~ **一次性生成多个queue,exchange,binding** ~~~ import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @Configuration public class DeclareConfig { @Bean public List<Queue> queues(){ List<Queue> queueList = new ArrayList<>(); queueList.add(new Queue("chao.wang.debug",true)); queueList.add(new Queue("chao.wang.info",true)); queueList.add(new Queue("chao.wang.error",true)); return queueList; } @Bean public List<Exchange> exchanges(){ List<Exchange> exchangeList = new ArrayList<>(); exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false)); exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false)); exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false)); return exchangeList; } @Bean public List<Binding> bindings(){ List<Binding> bindingList = new ArrayList<>(); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#")); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*")); bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")). to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*")); return bindingList; } } ~~~ 上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台Exchange,Queue,Binding都已经生成。 **注意** 当声明队列是以amp开头的时候,队列是不能创建声明的。 ~~~java @Bean public Queue amqQueue(){ return new Queue("amp.log",true); } ~~~ # 总结 **自动声明的一些条件** > * 要有连接(对rabbitmq的连接) > * 容器中要有`org.springframework.amqp.rabbit.core.RabbitAdmin`的实例 > * `RabbitAdmin`的`autoStartup`属性必须为true。 > * 如果`ConnectionFactory`使用的是`CachingConnectionFactory`,则`cacheMode`必须是`CachingConnectionFactory.CacheMode.CHANNEL`(默认)。 > * 所要声明的组件(`Queue`,`Exchange`和`Binding`)的`shouldDeclare`必须是`true`(默认就是`true`) > * `Queue`队列的名字不能以`amq.`开头。 注意:`Queue`,`Exchange`和`Binding`都直接或者间接的继承`Declarable`,而`Declarable`中定义了`shouldDeclare`的方法。 # 自动声明源码分析 `org.springframework.amqp.rabbit.core.RabbitAdmin`实现`InitializingBean`接口,在`BeanFactory`设置完所有属性之后执行特定初始化(`afterPropertiesSet`方法) `RabbitAdmin`的`afterPropertiesSet`方法, ~~~ @Override public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { //autoStartup属性的值为false的时候,直接return if (this.running || !this.autoStartup) { return; } //connectionFactory实例如果是CachingConnectionFactory,并且CacheMode是CacheMode.CONNECTION也会return下面不执行了。 if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) { this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } //连接的监听器 this.connectionFactory.addConnectionListener(new ConnectionListener() { // Prevent stack overflow... private final AtomicBoolean initializing = new AtomicBoolean(false); @Override public void onCreate(Connection connection) { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { //执行这个方法 initialize(); } finally { initializing.compareAndSet(true, false); } } @Override public void onClose(Connection connection) { } }); this.running = true; } } ~~~ `RabbitAdmin`的`initialize`方法,声明所有`exchanges`,`queues`和`bindings` ~~~ /** * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe * (but unnecessary) to call this method more than once. */ public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); //得到容器中所有的Exchange Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); //得到容器中所有的Queue Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); //得到容器中所有的Binding Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); //获取容器中所有的Collection,如果容器中所有元素是Exchange,Queue或者Binding的时候将这些实例也加入到spring容器中。 @SuppressWarnings("rawtypes") Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false) .values(); for (Collection<?> collection : collections) { if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) { for (Object declarable : collection) { if (declarable instanceof Exchange) { contextExchanges.add((Exchange) declarable); } else if (declarable instanceof Queue) { contextQueues.add((Queue) declarable); } else if (declarable instanceof Binding) { contextBindings.add((Binding) declarable); } } } } //进行了filter过滤, final Collection<Exchange> exchanges = filterDeclarables(contextExchanges); final Collection<Queue> queues = filterDeclarables(contextQueues); final Collection<Binding> bindings = filterDeclarables(contextBindings); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } this.rabbitTemplate.execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { //声明exchange,如果exchange是默认的exchange那么也不会声明。 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); //声明队列,如果队列名以amq.开头的也不会进行声明 declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; } }); this.logger.debug("Declarations finished"); } ~~~ `filterDeclarables`方法过滤一些`Exchange`,`Queue`,`Binding`,因为这三个类都是继承`Declarable这个类`, ~~~ private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) { Collection<T> filtered = new ArrayList<T>(); for (T declarable : declarables) { Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins(); //shouldDeclare属性必须是true,否则就会被过滤掉了 if (declarable.shouldDeclare() && (adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) { filtered.add(declarable); } } return filtered; } ~~~ 声明Exchanges ~~~ private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException { for (final Exchange exchange : exchanges) { if (this.logger.isDebugEnabled()) { this.logger.debug("declaring Exchange '" + exchange.getName() + "'"); } //不是默认的Exchange if (!isDeclaringDefaultExchange(exchange)) { try { //是否是delayed类型的Exchange if (exchange.isDelayed()) { Map<String, Object> arguments = exchange.getArguments(); if (arguments == null) { arguments = new HashMap<String, Object>(); } else { arguments = new HashMap<String, Object>(arguments); } arguments.put("x-delayed-type", exchange.getType()); //调用exchangeDeclare进行声明 channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments); } else { //调用exchangeDeclare进行声明 channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments()); } } catch (IOException e) { logOrRethrowDeclarationException(exchange, "exchange", e); } } } } ~~~ 声明Queue队列 ~~~ private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException { List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length); for (int i = 0; i < queues.length; i++) { Queue queue = queues[i]; //队列不以amq.开头的队列才能进行声明 if (!queue.getName().startsWith("amq.")) { if (this.logger.isDebugEnabled()) { this.logger.debug("declaring Queue '" + queue.getName() + "'"); } try { try { //进行队列声明 DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); declareOks.add(declareOk); } catch (IllegalArgumentException e) { if (this.logger.isDebugEnabled()) { this.logger.error("Exception while declaring queue: '" + queue.getName() + "'"); } try { if (channel instanceof ChannelProxy) { ((ChannelProxy) channel).getTargetChannel().close(); } } catch (TimeoutException e1) { } throw new IOException(e); } } catch (IOException e) { logOrRethrowDeclarationException(queue, "queue", e); } } this.logger.debug("Queue with name that starts with 'amq.' cannot be declared."); } return declareOks.toArray(new DeclareOk[declareOks.size()]); } ~~~ binding声明: ~~~java private void declareBindings(final Channel channel, final Binding... bindings) throws IOException { for (Binding binding : bindings) { if (this.logger.isDebugEnabled()) { this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType() + ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey() + "]"); } try { //QUEUE类型的绑定 if (binding.isDestinationQueue()) { //并且不是绑定到默认的Default Exchange if (!isDeclaringImplicitQueueBinding(binding)) { //绑定队列 channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); } } else { //Exchange类型的绑定 channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); } } catch (IOException e) { logOrRethrowDeclarationException(binding, "binding", e); } } } ~~~