用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。
<br/>
在下图中,P 是我们的生产者,C 是我们的消费者。中间的框是一个队列,在 RabbitMQ 代
表使用者保留的消息缓冲区。
![](https://img.kancloud.cn/64/0e/640ea81d676e6c4205c1e1ee6ee67217_1387x143.jpg)
<br/>
步骤如下:
**1. `pom.xml`添加依赖**
```xml
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
```
**2. 固定一个队列的名字**
```java
public class QueueName {
public final static String QUEUE_HELLO = "queue.hello";
}
```
**3. 生产者**
```java
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 连接rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setUsername("admin");
factory.setPassword("admin");
// 2. 注意:Channel 与 Connection 使用完后是需要关闭的,但这里使用了语句try-with-resources,
// 该语句可以在资源使用完后自动关闭资源。
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 声明一个队列。
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* var1: 队列名称
* var2: 队列里面的消息是否持久化。false(默认)不持久化、true持久化
* var3: 该队列是否只供一个消费者进行消费,即消息是否进行共享。false共享、true共享
* var4: 是否自动删除。即当最后一个消费者断开连接后该队列是否自动删除。true自动删除、false不删除
* var5:其他参数
*/
channel.queueDeclare(QueueName.QUEUE_HELLO, false, false, false, null);
String message = "hello world";
/**
* 发送一个消息。
* basicPublish(String var1, String var2, BasicProperties var3, byte[] var4)
* var1: 发送到哪个交换机,""也是一个交换机类型。
* var2: 路由的 key 是哪个
* var3: 其他参数
* var4: 发送的消息
*/
channel.basicPublish("", QueueName.QUEUE_HELLO, null, message.getBytes());
System.out.println("消息发送完毕.");
}
}
}
```
**4. 消费者**
```java
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 连接rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到的消息: " + message);
};
//取消消费的一个回调接口,如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断.");
};
/**
* 2. 消费者消费消息。
* basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4)
* var1: 消费哪个队列
* var2: 消费成功之后是否要自动应答。true自动应答, false手动应答
* var3: 进行消费的接口回调
* var4: 取消消费的接口回调
*/
channel.basicConsume(QueueName.QUEUE_HELLO, true, deliverCallback, cancelCallback);
}
}
```
**5. 测试**
(1)先启动生成者,控制台输出如下。
```
消息发送完毕.
```
(2)启动消费者,控制台输出如下。
```
等待接收消息.
接收到的消息: hello world
```
****
案例代码:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
- 消息队列
- 什么是MQ
- MQ的作用
- MQ的分类
- MQ的选择
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 环境搭建
- windows系统下的搭建
- centos7系统下的搭建
- 常用命令
- 服务相关命令
- 管理用户命令
- 管理队列命令
- 第一个RabbitMQ程序
- 工作队列
- 轮询分发消息
- 消息应答
- 持久化
- 发布确认
- 发布确认原理
- 发布确认策略
- 交换机概念
- 交换机类型
- 无名交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 死信队列
- 死信概念
- 死信来源
- 死信实战
- 延迟队列
- 什么是延迟队列
- TTL设置方式
- 队列TTL延迟队列
- 消息TTL延迟队列
- 插件打造延迟队列
- 延迟队列总结
- 发布确认高级
- 代码实现
- 回退消息
- 备份交换机
- 幂等性
- 幂等性概念
- 消息重复消费
- 消费端幂等性保障
- 优先级队列
- 使用场景
- 设置优先级
- 惰性队列
- 什么是惰性队列
- 队列的两种模式
- 声明惰性队列
- RabbitMQ集群
- 为什么要搭建集群
- 集群搭建步骤
- 集群工作方式
- 脱离集群
- 镜像队列
- 高可用负载均衡