合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# activeMQ 对于消息的传递有两种类型: **一种是点对点的**,即一个生产者和一个消费者一一对应; **另一种是发布/** **订阅模式**,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。 JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。 - StreamMessage -- Java原始值的数据流 - MapMessage--一套名称\-值对 - TextMessage--一个字符串对象 - ObjectMessage--一个序列化的 Java对象 - BytesMessage--一个字节的数据流 其中在开发中常用到的是TextMessage,即消息通过json数据格式传输,而接收数据时通过字符串对象(TextMessage)获取。 > 使用 ``` /* * 生产者(MessageProducer) */ @Test public void queueProducerTest() throws Exception { // 创建一个ActiveMQConnectionFactory对象并传入与activeMq通信协议地址与端口 // 用ConnectionFactory接受创建ActiveMQConnectionFactory的对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.151:61616"); // 通过connectionFactory创建connection Connection connection = connectionFactory.createConnection(); // 开启connection,调用start()方法 connection.start(); // connection创建一个session对象 // param1 是否开启分布式事务 一般不开启(false)如果开启则第二个参数无效 // param2 设置应答类型 手动 自动 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // session创建destination对象 (queue,topic) Queue queue = session.createQueue("igeek_test"); // 队列名称 // session创建MessageProvider对象 MessageProducer producer = session.createProducer(queue); // session创建TextMessage对象 TextMessage textMessage = session.createTextMessage("信息:我是测试发送信息!"); // 使用MessageProvider发送消息 producer.send(textMessage); // 关闭资源 producer.close(); session.close(); connection.close(); } /* * 消费者(consumer) */ @Test public void queueConsumerTest() throws Exception { // 创建一个ActiveMQConnectionFactory对象并传入与activeMq通信协议地址与端口 // 用ConnectionFactory接受创建ActiveMQConnectionFactory的对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.151:61616"); // 通过connectionFactory创建connection Connection connection = connectionFactory.createConnection(); // 开启connection,调用start()方法 connection.start(); // connection创建一个session对象 // param1 是否开启分布式事务 一般不开启(false)如果开启则第二个参数无效 // param2 设置应答类型 手动 自动 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // session创建destination对象 (queue,topic) Queue queue = session.createQueue("igeek_test"); // 队列名称 // session创建一个消费者 MessageConsumer consumer = session.createConsumer(queue); // consumer设置MessageListener consumer.setMessageListener(new MessageListener() { // params message 监听到的消息 @Override public void onMessage(Message message) { try { // 强转为TextMessage类型 TextMessage textMessage = (TextMessage) message; //获得消息内容 System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read(); consumer.close(); session.close(); connection.close(); } ``` > 与Spring框架结合使用 - 生产者(producer) ``` <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.151:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生产者 --> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> ``` - 消费者(consumer) ``` <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.151:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="spring-topic" /> </bean> <!-- 消费者实现类 测试 --> <bean id="myMessageListener" class="com.igeek.esgobuy.search.message.MyMessageListener"></bean> <!-- 消息监听容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> ``` 消费者监听实现类 ``` public class MyMessageListener implements MessageListener { /* * 处理接受到的消息 */ @Override public void onMessage(Message message) { try { // 将Message强转为TextMessage 只有当Message对象接受的是TextMessage才能成功 TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ``` * [ ] 创建消费者和生产者的消息命名应该一致 * [ ] 一对一方式(queue),生产者的消息会等待消费者接收,再出栈。广播方式(topic),当消费者没有及时监听,会错过生产者的消息。