## RabbitMQ
[RabbitMQ](https://www.rabbitmq.com/) 是一个开源的轻量级消息代理,支持多种消息协议。它可以通过分布式部署、联合配置来满足高弹性、高可用性的需求。此外,它是部署最广泛的开源消息代理,在全球范围内从初创企业到大企业都在使用。
### 安装
在开始之前,我们必须安装所需的包:
```bash
$ npm i --save amqplib amqp-connection-manager
```
### 概述
为了使用 **RabbitMQ** 传输器,传递以下选项对象到 `createMicroservice()` 方法。
> main.ts
```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
```
> `Transport` 需要从 `@nestjs/microservices` 包导入。
### 选项
`options`对象和选择的传输器有关,`RabbitMQ`传输器暴露了一些属性:
-| -
---|---
urls|连接urls
queue|服务器要监听的队列名称
prefetchCount|频道预读取的数量
isGlobalPrefetchCount|使能预读取的频道
noAck|设置为`false`以启用手动确认模式
queueOptions|额外的队列选项([更多](https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue))
socketOptions|额外的socket选项([更多](https://www.squaremobius.net/amqp.node/channel_api.html#socket-options))
### 客户端
像其他微服务传输器一样,你可以在创建`ClientProxy`实例时传输[一些选项](https://docs.nestjs.com/microservices/basics#client)。
一种来创建实例的方法是使用`ClientsModule`。要使用`ClientsModule`创建一个客户端实例,引入并使用`register()`方法并传递一个 `options` 对象,该对象具有与前面在 `createMicroservice()` 方法具有相同的属性。`name`属性被用于注入`token`,更多关于`ClientsModule`内容参见[这里](https://docs.nestjs.com/microservices/basics#client)。
```typescript
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})
```
也可以使用其他创建客户端的实例( `ClientProxyFactory` 或 `@Client()` )。
### 上下文
在更复杂的场景中,您可能希望访问关于传入请求的更多信息。在`RabbitMQ` 中,您可以访问 `RmqContext`对象。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
```
?> `@Payload()`, `@Ctx()` 和 `RedisContext` 需要从 `@nestjs/microservices` 包导入.
要实用原生的`RabbitMQ`消息(包含`properties`, `fields`, 和`content`), 使用 `RmqContext`对象的`getMessage()`方法:
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
```
要获取`RabbitMQ`频道的引用,使用`RmqContext`对象的`getChannelRef`方法。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
```
### 消息确认
要确保消息没有丢失,RabbitMQ支持[消息确认](https://www.rabbitmq.com/confirms.html)。消息确认是指消费者发回给RabbitMQ确认消息已收到,RabbitMQ可以删除它了。如果消费者不工作(频道关闭,连接关闭或者TCP连接丢失)也没有发送确认,RabbitMQ会认为消息没有被处理,因此会重新将其加入队列。
要使能手动消息确认模式,将`noAck`设置为`false`:
```typescript
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},
```
当手动消费者确认开启时,我们必须从工作者到到信号发送一个合适的确认信息,以表示我们已经完成了一件工作。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
```
### 记录建设者
要配置消息选项,您可以使用`RmqRecordBuilder`该类(注意:这对于基于事件的流也是可行的)。例如,要设置`headers`和`priority`属性,使用`setOptions`方法,如下:
~~~typescript
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);
~~~
> **提示**`RmqRecordBuilder``@nestjs/microservices`类是从包中导出的。
您也可以通过访问 来在服务器端读取这些值`RmqContext`,如下所示:
~~~typescript
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
~~~
- 介绍
- 概述
- 第一步
- 控制器
- 提供者
- 模块
- 中间件
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 自定义装饰器
- 基础知识
- 自定义提供者
- 异步提供者
- 动态模块
- 注入作用域
- 循环依赖
- 模块参考
- 懒加载模块
- 应用上下文
- 生命周期事件
- 跨平台
- 测试
- 技术
- 数据库
- Mongo
- 配置
- 验证
- 缓存
- 序列化
- 版本控制
- 定时任务
- 队列
- 日志
- Cookies
- 事件
- 压缩
- 文件上传
- 流式处理文件
- HTTP模块
- Session(会话)
- MVC
- 性能(Fastify)
- 服务器端事件发送
- 安全
- 认证(Authentication)
- 授权(Authorization)
- 加密和散列
- Helmet
- CORS(跨域请求)
- CSRF保护
- 限速
- GraphQL
- 快速开始
- 解析器(resolvers)
- 变更(Mutations)
- 订阅(Subscriptions)
- 标量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 联合类型
- 枚举(Enums)
- 字段中间件
- 映射类型
- 插件
- 复杂性
- 扩展
- CLI插件
- 生成SDL
- 其他功能
- 联合服务
- 迁移指南
- Websocket
- 网关
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 适配器
- 微服务
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定义传输器
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 独立应用
- Cli
- 概述
- 工作空间
- 库
- 用法
- 脚本
- Openapi
- 介绍
- 类型和参数
- 操作
- 安全
- 映射类型
- 装饰器
- CLI插件
- 其他特性
- 迁移指南
- 秘籍
- CRUD 生成器
- 热重载
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模块
- Swagger
- 健康检查
- CQRS
- 文档
- Prisma
- 静态服务
- Nest Commander
- 问答
- Serverless
- HTTP 适配器
- 全局路由前缀
- 混合应用
- HTTPS 和多服务器
- 请求生命周期
- 常见错误
- 实例
- 迁移指南
- 发现
- 谁在使用Nest?