通过 cloud-stream-rabbitmq-provider8801 模块作为消息的生产者,而模块 cloud-stream-rabbitmq-consumer8802 作为消息的消费者。本次演示采用 RabbitMQ消息中间件。
<br/>
步骤如下:
[TOC]
# 1. 搭建RabbitMQ环境
关于 RabbitMQ 环境的搭建参考 https://www.kancloud.cn/king_om/x_1_mq/2483251 。
<br/>
# 2. 构建 8801 消息生产者模块
**1. 在 8801 模块的`pom.xml`中添加 stream-rabbit 依赖**
```xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
```
**2. 在 8801 模块的`application.yml`添加相关配置**
```yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #消息推送通道
destination: studyExchange #表示要使用的exchange名称定义(相当于topic)
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
```
**3. 定义消息推送通道**
(1)*`com.atguigu.springcloud.service.IMessageService `*
```java
public interface IMessageService {
public String send();
}
```
(2)*`com.atguigu.springcloud.service.impl.MessageServiceImpl `*
```java
/**
* 添加注解 @EnableBinding(Source.class) 定义消息推送通道
*/
@EnableBinding(Source.class)
public class MessageServiceImpl implements IMessageService {
@Resource
private MessageChannel output; //消息发送通道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// 将消息 serial 推送
// public static <T> MessageBuilder<T> withPayload(T payload) {
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("生产者:" + serial);
return serial;
}
}
```
**4. controller层调用消息推送通道**
```java
@RestController
public class SendMessageController {
@Resource
private MessageServiceImpl messageService;
@RequestMapping("/sendMessage")
public String sendMessage() {
String send = messageService.send();
return send;
}
}
```
**5. 测试**
(1)先启动RabbitMQ,再启动 8801 生产者模块,访问 http://localhost:8801/sendMessage ,多刷新几次页面生产者就会生产如下消息。
```
生产者:2aedf80b-2933-4dab-abe5-5e37a02b961e
生产者:acdc3451-1b34-431c-ad32-dc7ca47aa6c8
生产者:74ba6c4c-5195-4b72-a5cf-88652ac0f347
生产者:1e42e7f4-6a5f-4873-93d3-0948319361e7
生产者:b1507cb8-3a4f-4987-9fb8-ce1ce9b970c1
```
(2)访问RabbitMQ http://localhost:15672/ ,在RabbitMQ中会生成一个 `studyExchange` 主题。
![](https://img.kancloud.cn/4b/0b/4b0bb092bb2e795c028b3b0394a48195_1634x471.jpg)
(3)多刷新几次 http://localhost:8801/sendMessage ,你会看到 生产消息 的速率。
![](https://img.kancloud.cn/16/f6/16f60f643dedb478edc1a23d32ad8893_1328x408.jpg)
<br/>
# 3. 构建 8802 消息消费者模块
**1. 8802 模块的`pom.xml`中添加 stream-rabbit 依赖**
```xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
```
**2. 在 8802 模块的`application.yml`添加相关配置**
```yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
input: #消息接收通道
destination: studyExchange #表示要使用的exchange名称定义(相当于一个topic)
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
```
**3. 在 8802 模块定义消息接收通道**
```java
/**
* 添加注解 @EnableBinding(Sink.class) 定义消息接收通道
*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
/**
* 添加 @StreamListener(Sink.INPUT) 监听消息队列
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
// 通过 getPayload 方法接收消息
System.out.println("消费者:" + message.getPayload());
}
}
```
**4. 测试**
(1)先启动 RabbitMQ,后启动 8801 生成者模块,最后启动 8802 消费者模块。
(2)访问 8801 生产者 http://localhost:8801/sendMessage ,并多刷新几次页面,生产者将生产消息,并被消费者接收到消息。
```
生产者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c
生产者:d55519c0-19ba-4699-b2f2-c36dd7d14815
生产者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912
生产者:e421c64d-b3de-4e74-b326-f5abe171fb4a
生产者:57e756e5-5630-41e7-8386-4d7dc891c080
消费者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c
消费者:d55519c0-19ba-4699-b2f2-c36dd7d14815
消费者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912
消费者:e421c64d-b3de-4e74-b326-f5abe171fb4a
消费者:57e756e5-5630-41e7-8386-4d7dc891c080
```
- 微服务
- 微服务是什么?
- 微服务架构
- 微服务优缺点
- 微服务技术栈
- 微服务框架对比
- SpringCloud
- SpringCloud是什么
- SpringCloud与SpringBoot对比
- SpringCloud与Dubbo对比
- Rest微服务案例
- 总体介绍
- 父工程构建步骤
- 公共模块构建步骤
- 服务端模块构建步骤
- 消费端模块构建步骤
- Eureka服务注册与发现
- Eureka是什么
- Eureka原理
- Eureka注册服务中心构建
- 向Eureka注册已有微服务
- Eureka的自我保护机制
- Eureka服务发现
- Eureka集群配置
- Eureka与Zookeeper对比
- Ribbon负载均衡
- Ribbon是什么
- Ribbon负载均衡演示
- 构建服务端模块
- 构建消费端模块
- Ribbon核心组件IRule
- 自定义负载均衡策略
- Ribbon均衡策略优先级
- 轮询策略算法
- OpenFeign负载均衡
- OpenFeign是什么
- 负载均衡演示
- 日志打印功能
- 导出功能
- Hystrix断路器
- Hystrix是什么
- 服务熔断
- Hystrix服务端构建
- 服务熔断演示
- 服务熔断类型
- HystrixProperty配置汇总
- 服务降级
- Hystrix客户端构建
- 服务降级演示
- fallbackFactory
- 熔断与降级
- 服务监控
- 网关服务Zuul
- Zuul是什么
- Zuul路由服务构建
- 设置访问映射规则
- Config分布式配置中心
- Config分布式配置中心是什么
- Config服务端与Git通信
- Config客户端获取配置
- Config客户端动态刷新
- Bus消息总线
- Bus消息总线是什么
- Bus消息总线原理
- 广播通知设计思想
- 广播通知演示
- 定点通知演示
- Stream消息驱动
- 为什么要引入Stream
- Stream消息驱动是什么
- Stream设计思想
- Stream流程和注解
- Stream案例演示
- 重复消费问题
- 消息持久化
- Sleuth分布式链路跟踪
- Sleuth是什么
- 搭建链路监控
- SpringCloud Alibaba
- Nacos注册与配置中心
- Nacos是什么
- 安装并运行Nacos
- Nacos注册中心
- 服务端入住Nacos
- 消费端入住Nacos
- Nacos负载均衡演示
- 服务注册中心对比
- Nacos的AP和CP转化
- Nacos配置中心
- 基础配置演示
- Nacos分类配置
- Nacos集群搭建
- Sentinel实现熔断与限流
- Sentinel是什么
- Sentinel环境搭建
- Sentinel监控微服务演示
- Sentinel流控规则
- 流量监控的作用
- 设置流控规则
- Sentinel降级规则
- 熔断降级作用
- 设置降级规则
- Sentinel热点限流
- 什么是热点
- 设置热点限流
- Sentinel系统限流
- @SentinelResource
- @SentinelResource属性
- @SentinelResource限流演示
- @SentinelResource熔断演示
- 规则持久化
- 熔断框架比较
- Seata分布式事务
- 分布式事务问题
- Seata是什么
- Seata分布式事务过程
- Seata环境搭建
- 演示示例
- 业务说明
- 数据库环境准备
- 微服务环境准备
- 测试
- Consul服务注册与发现
- Consul是什么
- Consul能做什么
- 环境搭建
- Windows平台
- 服务端入住Consul
- 消费端入住Consul
- 注册中心对比
- Zookeeper服务注册与发现
- Zookeeper是什么
- 环境搭建
- 服务端入住Zookeeper
- 消费端入住Zookeeper
- 网关服务Gateway
- Gateway是什么
- Gateway能做什么
- Gateway对比Zuul
- 三大核心概念
- Gateway工作流
- 环境搭建
- 网关路由配置方式
- 配置文件配置
- 代码中配置
- 动态路由
- Predicate断言
- 断言是什么
- 常用断言
- Filter过滤器
- 过滤器是什么
- 过滤器种类
- 自定义过滤器