🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
通过 cloud-stream-rabbitmq-provider8801 模块作为消息的生产者,而模块 cloud-stream-rabbitmq-consumer8802 作为消息的消费者。本次演示采用 RabbitMQ消息中间件。 <br/> 步骤如下: [TOC] # 1. 搭建RabbitMQ环境 关于 RabbitMQ 环境的搭建参考 https://www.kancloud.cn/king_om/x_1_mq/2483251 。 <br/> # 2. 构建 8801 消息生产者模块 **1. 在 8801 模块的`pom.xml`中添加 stream-rabbit 依赖** ```xml <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> ... </dependencies> ``` **2. 在 8801 模块的`application.yml`添加相关配置** ```yml server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: #自此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #消息推送通道 destination: studyExchange #表示要使用的exchange名称定义(相当于topic) content-type: application/json #设置消息类型,本次为json binder: defaultRabbit #设置要绑定的消息服务的具体设置 ``` **3. 定义消息推送通道** (1)*`com.atguigu.springcloud.service.IMessageService `* ```java public interface IMessageService { public String send(); } ``` (2)*`com.atguigu.springcloud.service.impl.MessageServiceImpl `* ```java /** * 添加注解 @EnableBinding(Source.class) 定义消息推送通道 */ @EnableBinding(Source.class) public class MessageServiceImpl implements IMessageService { @Resource private MessageChannel output; //消息发送通道 @Override public String send() { String serial = UUID.randomUUID().toString(); // 将消息 serial 推送 // public static <T> MessageBuilder<T> withPayload(T payload) { output.send(MessageBuilder.withPayload(serial).build()); System.out.println("生产者:" + serial); return serial; } } ``` **4. controller层调用消息推送通道** ```java @RestController public class SendMessageController { @Resource private MessageServiceImpl messageService; @RequestMapping("/sendMessage") public String sendMessage() { String send = messageService.send(); return send; } } ``` **5. 测试** (1)先启动RabbitMQ,再启动 8801 生产者模块,访问 http://localhost:8801/sendMessage ,多刷新几次页面生产者就会生产如下消息。 ``` 生产者:2aedf80b-2933-4dab-abe5-5e37a02b961e 生产者:acdc3451-1b34-431c-ad32-dc7ca47aa6c8 生产者:74ba6c4c-5195-4b72-a5cf-88652ac0f347 生产者:1e42e7f4-6a5f-4873-93d3-0948319361e7 生产者:b1507cb8-3a4f-4987-9fb8-ce1ce9b970c1 ``` (2)访问RabbitMQ http://localhost:15672/ ,在RabbitMQ中会生成一个 `studyExchange` 主题。 ![](https://img.kancloud.cn/4b/0b/4b0bb092bb2e795c028b3b0394a48195_1634x471.jpg) (3)多刷新几次 http://localhost:8801/sendMessage ,你会看到 生产消息 的速率。 ![](https://img.kancloud.cn/16/f6/16f60f643dedb478edc1a23d32ad8893_1328x408.jpg) <br/> # 3. 构建 8802 消息消费者模块 **1. 8802 模块的`pom.xml`中添加 stream-rabbit 依赖** ```xml <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> ... </dependencies> ``` **2. 在 8802 模块的`application.yml`添加相关配置** ```yml server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: #自此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 input: #消息接收通道 destination: studyExchange #表示要使用的exchange名称定义(相当于一个topic) content-type: application/json #设置消息类型,本次为json binder: defaultRabbit #设置要绑定的消息服务的具体设置 ``` **3. 在 8802 模块定义消息接收通道** ```java /** * 添加注解 @EnableBinding(Sink.class) 定义消息接收通道 */ @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { /** * 添加 @StreamListener(Sink.INPUT) 监听消息队列 */ @StreamListener(Sink.INPUT) public void input(Message<String> message) { // 通过 getPayload 方法接收消息 System.out.println("消费者:" + message.getPayload()); } } ``` **4. 测试** (1)先启动 RabbitMQ,后启动 8801 生成者模块,最后启动 8802 消费者模块。 (2)访问 8801 生产者 http://localhost:8801/sendMessage ,并多刷新几次页面,生产者将生产消息,并被消费者接收到消息。 ``` 生产者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c 生产者:d55519c0-19ba-4699-b2f2-c36dd7d14815 生产者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912 生产者:e421c64d-b3de-4e74-b326-f5abe171fb4a 生产者:57e756e5-5630-41e7-8386-4d7dc891c080 消费者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c 消费者:d55519c0-19ba-4699-b2f2-c36dd7d14815 消费者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912 消费者:e421c64d-b3de-4e74-b326-f5abe171fb4a 消费者:57e756e5-5630-41e7-8386-4d7dc891c080 ```