ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。 <br/> 在下图中,P 是我们的生产者,C 是我们的消费者。中间的框是一个队列,在 RabbitMQ 代 表使用者保留的消息缓冲区。 ![](https://img.kancloud.cn/64/0e/640ea81d676e6c4205c1e1ee6ee67217_1387x143.jpg) <br/> 步骤如下: **1. `pom.xml`添加依赖** ```xml <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> <!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> ``` **2. 固定一个队列的名字** ```java public class QueueName { public final static String QUEUE_HELLO = "queue.hello"; } ``` **3. 生产者** ```java public class Producer { public static void main(String[] args) throws Exception { // 1. 连接rabbitmq服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setUsername("admin"); factory.setPassword("admin"); // 2. 注意:Channel 与 Connection 使用完后是需要关闭的,但这里使用了语句try-with-resources, // 该语句可以在资源使用完后自动关闭资源。 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { /** * 声明一个队列。 * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) * var1: 队列名称 * var2: 队列里面的消息是否持久化。false(默认)不持久化、true持久化 * var3: 该队列是否只供一个消费者进行消费,即消息是否进行共享。false共享、true共享 * var4: 是否自动删除。即当最后一个消费者断开连接后该队列是否自动删除。true自动删除、false不删除 * var5:其他参数 */ channel.queueDeclare(QueueName.QUEUE_HELLO, false, false, false, null); String message = "hello world"; /** * 发送一个消息。 * basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) * var1: 发送到哪个交换机,""也是一个交换机类型。 * var2: 路由的 key 是哪个 * var3: 其他参数 * var4: 发送的消息 */ channel.basicPublish("", QueueName.QUEUE_HELLO, null, message.getBytes()); System.out.println("消息发送完毕."); } } } ``` **4. 消费者** ```java public class Consumer { public static void main(String[] args) throws Exception { // 1. 连接rabbitmq服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息."); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("接收到的消息: " + message); }; //取消消费的一个回调接口,如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断."); }; /** * 2. 消费者消费消息。 * basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) * var1: 消费哪个队列 * var2: 消费成功之后是否要自动应答。true自动应答, false手动应答 * var3: 进行消费的接口回调 * var4: 取消消费的接口回调 */ channel.basicConsume(QueueName.QUEUE_HELLO, true, deliverCallback, cancelCallback); } } ``` **5. 测试** (1)先启动生成者,控制台输出如下。 ``` 消息发送完毕. ``` (2)启动消费者,控制台输出如下。 ``` 等待接收消息. 接收到的消息: hello world ``` **** 案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01