### 消费者
```
我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品
```
~~~
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
~~~
### 消费者2
```
我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。
```
~~~
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
~~~
- JMS vs AMQP
- ActiveMQ
- 安装
- 简介
- 知识点
- 点对点
- 发布订阅
- 对比
- 安全认证
- 持久化
- Api
- Productor
- 发送消息
- 消息有效期
- 消息优先级
- 开启
- 严格顺序
- 强顺序
- Consumer
- 消息确认
- 消息的过滤
- 客户端
- java
- 点对点
- 生产者
- 消费者
- 发布订阅
- 生产者
- Springboot
- 配置
- QueueConfig
- 生产者
- 消费者
- 集群
- RabbitMQ
- 安装
- 主要概念
- 消息模型
- 基本消息模型
- 简介
- java
- 消费者
- 生产者
- 工具类
- work消息模型
- 简介
- java
- 消费者
- 生产者
- 订阅模型-Fanout
- 简介
- java
- 生产者
- 消费者
- 订阅模型-Direct
- 简介
- java
- 生产者
- 消费者
- 订阅模型-Topic
- 简介
- java
- 生产者
- 消费者
- 持久化
- Spring-AMQP
- 消费者
- 生产者