# 微服务
## 概述
除了传统的(有时称为单片)应用程序架构之外,`Nest` 还支持微服务架构风格的开发。本文档中其他地方讨论的大多数概念,如依赖项注入、装饰器、异常过滤器、管道、保护和拦截器,都同样适用于微服务。`Nest` 会尽可能地抽象化实现细节,以便相同的组件可以跨基于 `HTTP` 的平台,`WebSocket` 和微服务运行。本节特别讨论 `Nest` 的微服务方面。
在 `Nest` 中,微服务基本上是一个使用与 `HTTP` 不同的传输层的应用程序。
![](https://docs.nestjs.com/assets/Microservices_1.png)
`Nest` 支持几种内置的传输层实现,称为传输器,负责在不同的微服务实例之间传输消息。大多数传输器本机都支持请求 - 响应和基于事件的消息样式。`Nest` 在规范接口的后面抽象了每个传输器的实现细节,用于请求 - 响应和基于事件的消息传递。这样可以轻松地从一个传输层切换到另一层,例如,利用特定传输层的特定可靠性或性能功能,而不会影响您的应用程序代码。
### 安装
首先,我们需要安装所需的软件包:
```bash
$ npm i --save @nestjs/microservices
```
### 开始
为了创建微服务,我们使用 `NestFactory` 类的 `createMicroservice()` 方法。
> main.ts
```typescript
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
},
);
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
```
> 默认情况下,微服务通过 **TCP协议** 监听消息。
`createMicroservice ()` 方法的第二个参数是 `options` 对象。此对象可能有两个成员:
|||
|---|---|
| `transport` | 指定传输器,例如`Transport.NATS` |
| `options` | 确定传输器行为的传输器特定选项对象 |
`options` 对象根据所选的传送器而不同。`TCP` 传输器暴露了下面描述的几个属性。其他传输器(如Redis,MQTT等)参见相关章节。
|||
|---|---|
| `host` | 连接主机名 |
| `port` | 连接端口|
| `retryAttempts` | 连接尝试的总数 |
| `retryDelay` | 连接重试延迟(ms) |
### 模式(patterns)
微服务通过 **模式** 识别消息。模式是一个普通值,例如对象、字符串。模式将自动序列化,并与消息的数据部分一起通过网络发送。因此,接收器可以容易地将传入消息与相应的处理器相关联。
### 请求-响应
当您需要在各种外部服务之间交换消息时,请求-响应消息样式非常有用。使用此范例,您可以确定服务确实收到了消息(不需要手动实现消息 `ACK` 协议)。然而,请求-响应范式并不总是最佳选择。例如,使用基于日志的持久性的流传输器(如 `Kafka` 或 `NATS` 流)针对解决不同范围的问题进行了优化,更符合事件消息传递范例(有关更多细节,请参阅下面的基于事件的消息传递)。
为了使服务能够通过网络交换数据,`Nest` 创建了两个通道,其中一个负责传输数据,而另一个负责监听传入的响应。对于某些底层传输,比如 `NATS`,这种双通道是支持开箱即用的。对于其他人,`Nest` 通过手动创建单独的渠道进行补偿。 这样做可能会产生开销,因此,如果您不需要请求-响应消息样式,则应考虑使用基于事件的方法。
基本上,要创建一个消息处理程序(基于请求 - 响应范例),我们使用 `@MessagePattern()` ,需要从 `@nestjs/microservices` 包导入。
> math.controller.ts
```typescript
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class MathController {
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
}
```
在上面的代码中,`accumulate()` 处理程序正在监听符合 `{cmd :'sum'}` 模式的消息。模式处理程序采用单个参数,即从客户端传递的 `data` 。在这种情况下,数据是必须累加的数字数组。
### 异步响应
每个模式处理程序都能够同步或异步响应。因此,支持 `async` (异步)方法。
> math.controller.ts
```typescript
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
return (data || []).reduce((a, b) => a + b);
}
```
此外,我们能够返回 [Rx](https://github.com/reactivex/rxjs) `Observable`,因此这些值将被发出,直到流完成。
```typescript
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): Observable<number> {
return from([1, 2, 3]);
}
```
以上消息处理程序将响应3次(对数组中的每个项)。
### 基于事件
虽然 `request-response` 方法是在服务之间交换消息的理想方法,但是当您的消息样式是基于事件的时(即您只想发布事件而不等待响应时),它不太适合。它会带来太多不必要的开销,而这些开销是完全无用的。例如,您希望简单地通知另一个服务系统的这一部分发生了某种情况。这种情况就适合使用基于事件的消息风格。
为了创建事件处理程序,我们使用 `@EventPattern()`装饰器, 需要 ` @nestjs/microservices` 包导入。
```typescript
@EventPattern('user_created')
async handleUserCreated(data: Record<string, unknown>) {
// business logic
}
```
> 你可以为单独的事件模式注册多个事件处理程序,所有的事件处理程序都会并行执行
该 `handleUserCreated()` 方法正在侦听 `user_created` 事件。事件处理程序接受一个参数,`data` 从客户端传递(在本例中,是一个通过网络发送的事件有效负载)。
### 装饰器
在更复杂的场景中,您可能希望访问关于传入请求的更多信息。例如,对于通配符订阅的 `NATS`,您可能希望获得生产者发送消息的原始主题。同样,在 `Kafka` 中,您可能希望访问消息头。为了做到这一点,你可以使用内置的装饰如下:
```typescript
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
return new Date().toLocaleTimeString(...);
}
```
> `@Payload()`、`@Ctx()` 和 `NatsContext` 需要从 `@nestjs/microservices` 包导入。
> 你也可以为 `@Payload()` 装饰器传入一个属性key值,来获取通过此装饰器拿到的对象的value值,例如 `@Payload('id')`
### 客户端
为了交换消息或将事件发布到 `Nest` 微服务,我们使用 `ClientProxy` 类, 它可以通过几种方式创建实例。此类定义了几个方法,例如`send()`(用于请求-响应消息传递)和`emit()`(用于事件驱动消息传递),这些方法允许您与远程微服务通信。使用下列方法之一获取此类的实例。
首先,我们可以使用 `ClientsModule` 暴露的静态`register()` 方法。此方法将数组作为参数,其中每个元素都具有 `name`属性,以及一个可选的`transport`属性(默认是`Transport.TCP`),以及特定于微服务的`options`属性。
`name`属性充当一个 `injection token`,可以在需要时将其用于注入 `ClientProxy` 实例。`name` 属性的值作为注入标记,可以是任意字符串或`JavaScript`符号,[参考这里](https://docs.nestjs.com/fundamentals/custom-providers#non-class-based-provider-tokens)。
`options` 属性是一个与我们之前在`createMicroservice()`方法中看到的相同的对象。
```typescript
@Module({
imports: [
ClientsModule.register([
{ name: 'MATH_SERVICE', transport: Transport.TCP },
]),
]
...
})
```
导入模块之后,我们可以使用 `@Inject()` 装饰器将`'MATH_SERVICE'`注入`ClientProxy`的一个实例。
```typescript
constructor(
@Inject('MATH_SERVICE') private client: ClientProxy,
) {}
```
> `ClientsModule`和 `ClientProxy`类需要从 `@nestjs/microservices` 包导入。
有时候,我们可能需要从另一个服务(比如 `ConfigService` )获取微服务配置而不是硬编码在客户端程序中,为此,我们可以使用 `ClientProxyFactory` 类来注册一个[自定义提供程序](https://docs.nestjs.com/fundamentals/custom-providers),这个类有一个静态的`create()`方法,接收传输者选项对象,并返回一个自定义的 `ClientProxy` 实例:
```typescript
@Module({
providers: [
{
provide: 'MATH_SERVICE',
useFactory: (configService: ConfigService) => {
const mathSvcOptions = configService.getMathSvcOptions();
return ClientProxyFactory.create(mathSvcOptions);
},
inject: [ConfigService],
}
]
...
})
```
> `ClientProxyFactory` 需要从 `@nestjs/microservices` 包导入 。
另一种选择是使用 `@client()`属性装饰器。
```typescript
@Client({ transport: Transport.TCP })
client: ClientProxy;
```
> `@Client()` 需要从 `@nestjs/microservices` 包导入 。
但是,使用 `@Client()` 装饰器不是推荐的方法(难以测试,难以共享客户端实例)。
`ClientProxy` 是惰性的。 它不会立即启动连接。 相反,它将在第一个微服务调用之前建立,然后在每个后续调用中重用。 但是,如果您希望将应用程序引导过程延迟到建立连接为止,则可以使用 `OnApplicationBootstrap` 生命周期挂钩内的 `ClientProxy` 对象的 `connect()` 方法手动启动连接。
```typescript
async onApplicationBootstrap() {
await this.client.connect();
}
```
如果无法创建连接,则该 `connect()` 方法将拒绝相应的错误对象。
### 发送消息
该 `ClientProxy` 公开 `send()` 方法。 此方法旨在调用微服务,并返回带有其响应的 `Observable`。 因此,我们可以轻松地订阅发射的值。
```typescript
accumulate(): Observable<number> {
const pattern = { cmd: 'sum' };
const payload = [1, 2, 3];
return this.client.send<number>(pattern, payload);
}
```
`send()` 函数接受两个参数,`pattern` 和 `payload`。`pattern` 具有 `@MessagePattern()` 修饰符中定义的这个模式,而 `payload` 是我们想要传输到另一个微服务的消息。该方法返回一个`cold Observable`对象,这意味着您必须在消息发送之前显式地订阅它。
### 发布事件
另一种是使用 `ClientProxy` 对象的 `emit()`方法。此方法的职责是将事件发布到消息代理。
```typescript
async publish() {
this.client.emit<number>('user_created', new UserCreatedEvent());
}
```
该 `emit()`方法有两个参数,`pattern` 和 `payload`。`pattern` 具有 `@EventPattern()` 修饰符中定义的这个模式,而`payload` 是我们想要传输到另一个微服务的消息。此方法返回一个 `hot Observable`(不同于`send()`方法返回一个 `cold Observable`),这意味着无论您是否显式订阅该 `Observable`,代理都将立即尝试传递事件。
### 作用域
对于不同编程语言背景的人来说,可能会意外地发现,在 `Nest` 中,几乎所有内容都在传入的请求之间共享。例如,我们有一个到数据库的连接池,带有全局状态的单例服务,等等。请记住,`Node.js` 并不遵循`request-response`的多线程无状态模型,在这种模型中,每个请求都由单独的线程处理。因此,对于应用程序来说,使用单例实例是完全安全的。
但是,在某些情况下,当应用程序是基于生命周期的行为时,也存在边界情况,例如 `GraphQL` 应用程序中的每个请求缓存、请求跟踪或多租户。在[这里](/8/fundamentals)学习如何控制范围。
请求作用域的处理程序和提供程序可以使用 `@Inject()` 装饰器结合`CONTEXT` (上下文)令牌注入`RequestContext`:
```typescript
import { Injectable, Scope, Inject } from '@nestjs/common';
import { CONTEXT, RequestContext } from '@nestjs/microservices';
@Injectable({ scope: Scope.REQUEST })
export class CatsService {
constructor(@Inject(CONTEXT) private readonly ctx: RequestContext) {}
}
```
还提供了对 `RequestContext` 对象的访问,该对象有两个属性:
```typescript
export interface RequestContext<T = any> {
pattern: string | Record<string, any>;
data: T;
}
```
`data` 属性是消息生产者发送的消息有效负载。 `pattern` 属性是用于标识适当的处理程序以处理传入消息的模式。
#### 处理超时
在分布式系统中,有时微服务可能宕机或者无法访问。要避免无限等待,可以使用超时,超时是一个和其他服务通讯的可信赖的方法。要在微服务中应用超时,你可以使用`RxJS`超时操作符。如果微服务没有在指定时间内返回响应,会抛出异常以便正确捕获与处理。
要处理该问题,可以使用`[rxjs](https://github.com/ReactiveX/rxjs)`包,并在管道中使用`timeout`操作符。
```TypeScript
this.client
.send<TResult, TInput>(pattern, data)
.pipe(timeout(5000))
.toPromise();
```
> `timeout`操作符从`rxjs/operators`中引入
5秒后,如果微服务没有响应,将抛出错误。
- 介绍
- 概述
- 第一步
- 控制器
- 提供者
- 模块
- 中间件
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 自定义装饰器
- 基础知识
- 自定义提供者
- 异步提供者
- 动态模块
- 注入作用域
- 循环依赖
- 模块参考
- 懒加载模块
- 应用上下文
- 生命周期事件
- 跨平台
- 测试
- 技术
- 数据库
- Mongo
- 配置
- 验证
- 缓存
- 序列化
- 版本控制
- 定时任务
- 队列
- 日志
- Cookies
- 事件
- 压缩
- 文件上传
- 流式处理文件
- HTTP模块
- Session(会话)
- MVC
- 性能(Fastify)
- 服务器端事件发送
- 安全
- 认证(Authentication)
- 授权(Authorization)
- 加密和散列
- Helmet
- CORS(跨域请求)
- CSRF保护
- 限速
- GraphQL
- 快速开始
- 解析器(resolvers)
- 变更(Mutations)
- 订阅(Subscriptions)
- 标量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 联合类型
- 枚举(Enums)
- 字段中间件
- 映射类型
- 插件
- 复杂性
- 扩展
- CLI插件
- 生成SDL
- 其他功能
- 联合服务
- 迁移指南
- Websocket
- 网关
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 适配器
- 微服务
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定义传输器
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 独立应用
- Cli
- 概述
- 工作空间
- 库
- 用法
- 脚本
- Openapi
- 介绍
- 类型和参数
- 操作
- 安全
- 映射类型
- 装饰器
- CLI插件
- 其他特性
- 迁移指南
- 秘籍
- CRUD 生成器
- 热重载
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模块
- Swagger
- 健康检查
- CQRS
- 文档
- Prisma
- 静态服务
- Nest Commander
- 问答
- Serverless
- HTTP 适配器
- 全局路由前缀
- 混合应用
- HTTPS 和多服务器
- 请求生命周期
- 常见错误
- 实例
- 迁移指南
- 发现
- 谁在使用Nest?