ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 一、概述 基于rockemq-client集成; ## 二、方案 ### **引入客户端库依赖** ``` <!-- rocketmq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.1</version> </dependency> ``` ### **生产者生产消息例子(同步)** ``` public static void main(String... strings) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("211.149.232.36:9876"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } ``` ### **消费者消费消息例子** ``` public static void main(String... strings) throws Exception { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("211.149.232.36:9876"); // Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } ```