企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 代码 ## 生产者 ~~~ package mq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; // private String url = "failover://tcp://localhost:61616"; //主题 private String subject = "myqueue"; //目标 private Queue destination = null; //连接 private Connection connection = null; //会话 private Session session = null; private MessageProducer producer = null; //初始化 private void initialize() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } //发送消息 public void produceMessage(String message) throws JMSException { initialize(); //创建文本消息,把消息变成他的格式 TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } //关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } ~~~ 生产者内部是开启多个线程在生产的 **测试** ~~~ package mq; import javax.jms.JMSException; public class ProducerTest { public static void main(String[] args) throws JMSException { ProducerTool producerTool = new ProducerTool(); for (int i=0; i<10; i++) { producerTool.produceMessage("hello,world"+i); } producerTool.close(); } } ~~~ **网页** ![](https://box.kancloud.cn/ebb29ff1ec828c1b3278c4afa34a6a0b_1799x341.png) ## 消费者 ~~~ package mq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ConsumerTool implements MessageListener, ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; // private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String url = "failover://tcp://localhost:41414"; private String queue = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; private ActiveMQConnectionFactory connectionFactory = null; public static Boolean isconnection = false; // 初始化 private void initialize() throws JMSException { connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queue); consumer = session.createConsumer(destination); } public void consumeMessage() throws JMSException { initialize(); connection.start(); //activemq采用推送机制来消耗消息 consumer.setMessageListener(this); connection.setExceptionListener(this); System.out.println("Consumer " + Thread.currentThread().getName() + " :=>local listening..."); isconnection = true; //开始监听 Message message = consumer.receive(); System.out.println(message.getJMSMessageID()); } //如果是异常的话 @Override public void onException(JMSException e) { isconnection = false; } // 消息处理函数 @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + msg); } else { System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + message); } } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer" + Thread.currentThread().getName() + ":->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } } ~~~ **测试** ~~~ package mq; public class ConsumerTest implements Runnable{ static Thread t1 = null; static Thread t2 = null; public static void main(String[] args) { //创建消费者 t1 = new Thread(new ConsumerTest()); t1.start(); t2 = new Thread(new ConsumerTest()); t2.start(); /* * while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) { * t1 = new Thread(new ConsumerTest()); t1.start(); * System.out.println("重新启动"); } Thread.sleep(5000); } 延时500毫秒之后停止接受消息 * Thread.sleep(500); consumer.close(); */ } @Override public void run() { try{ ConsumerTool consumer = new ConsumerTool(); consumer.consumeMessage(); //如果断开连接,主线程就不走 while (ConsumerTool.isconnection) { } }catch (Exception e) { e.printStackTrace(); } } } ~~~