## 自定义传输器
Nest提供了一系列开箱即用的传输器,也提供了允许用户自定义传输策略的API接口。传输器允许你使用可插拔的通讯层和非常简单的应用层消息协议通过网络连接组件。(阅读[全文](https://dev.to/nestjs/integrate-nestjs-with-external-services-using-microservice-transporters-part-1-p3))
> 不一定非要使用`@nestjs/microservices`包才能创建微服务,例如,如果需要和外部服务通讯 (假设为其他语言编写的其他微服务),你可能不需要 `@nestjs/microservice `提供的全部功能。实际上,如果你不需要装饰器(`@EventPattern`或者`@MessagePattern`)来定义订阅者,运行一个独立的应用并且手动维护连接/订阅频道可能会提供更高的灵活性。
使用自定义传输器,你可以集成任何消息系统/协议(包括`Google Cloud Pub/Sub`, `Amazon Kinesis`等等)或者已有的外部系统,在顶部添加额外的特性(例如用于MQTT的QoS)。
> 要更好地理解Nest微服务的工作模式以及如何扩展现有传输器,推荐阅读 [NestJS Microservices in Action](https://dev.to/johnbiundo/series/4724) 和 [Advanced NestJS Microservices](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l) 系列文章。
### 创建策略
首先定义一个代表自定义传输器的类。
```typescript
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy {
/**
* This method is triggered when you run "app.listen()".
*/
listen(callback: () => void) {
callback();
}
/**
* This method is triggered on application shutdown.
*/
close() {}
}
```
> 在这里不会实现一个完整的谷歌云订阅服务器,因为这需要更多更深入的传输器细节。
在这个例子中,声明了`GoogleCloudPubSubServer`类,提供`listen()`和`close()` 方法,并由`CustomTransportStrategy`接口进行限制。此外,我们的类扩展了从`@nestjs/microservices`包导入的`Server`类,来提供一些有用的方法。例如,提供Nest运行时注册消息处理程序的方法。可选的,如果要扩展传输器策略,也可以扩展相应的服务器。例如,`ServerRedis`。一般来说,我们在类前面添加`Server`前缀来表示该类用于处理订阅消息事件(并在必要时进行响应)。
这样就可以自定义一个传输器而不是使用内置的。
```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
},
);
```
和常规的传输器选项对象的`transport`和`options`属性不同,我们传递一个属性 `strategy`, 其值是我们自定义传输器类的一个实例。
回到我们的`GoogleCloudPubSubServer`类,在真实的应用中,我们可以在`listen()`方法(以及之后移除订阅和监听的`close()`方法)中指定订阅/监听特定频道来和代理/外部服务建立连接。但由于这需要对Nest微服务通讯原理有深入理解,我们建议阅读[这一系列文章](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l)。在本章这里,我们重点介绍服务器类的能力以及如何暴露它们来建立自定义策略。
例如,在我们程序的某个部分,定义了以下处理程序:
```typescript
@MessagePattern('echo')
echo(@Payload() data: object) {
return data;
}
```
该消息处理程序可以自动在Nest运行时注册。通过服务器类,可以看到被注册的消息类型是哪种,也可以接收和执行分配给它们的实际方法。要测试这个,我们在`listen()`中添加一个简单的`console.log`方法,并在回调函数前执行:
```typescript
listen(callback: () => void) {
console.log(this.messageHandlers);
callback();
}
```
程序重启后,可以看到终端中以下记录:
```typescript
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
```
> 如果我们使用`@EventPattern`装饰器,你可以看到同样的输出,但是`isEventHandler`属性被设置为`true`。
如你所见,`messageHandlers`属性是一个所有消息(和事件)处理程序的`Map`集合。在这里,模式被用作键名,你可以使用一个键(例如`echo`)来接收一个消息处理程序的引用:
```typescript
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
console.log(await echoHandler('Hello world!'));
callback();
}
```
一旦我们执行了`echoHandler`,并传递任意字符串作为参数(在这里是"Hello world!"), 可以看到以下输出:
```bash
Hello world!
```
这意味着消息处理程序正确执行了。
### 客户代理
如第一部分介绍的,你不一定需要`@nestjs/microservices`包来创建微服务,但是如果你这样做,那么需要集成一个自定义策略,你还需要提供一个`client`类。
?> 类似地,要实现完全兼容所有的`@nestjs/microservices`特性(例如`streaming`)需要对框架的通讯机制有深入理解。阅读[本文](https://dev.to/nestjs/part-4-basic-client-component-16f9)了解更多。
要和外部服务/发射与发布消息(或者事件)通讯,你可以使用一个特定库的SDK包,或者一个扩展了`ClientProxy`的自定义的客户端类,如下:
```typescript
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {}
}
```
> 注意,在这里我们不会实现一个完整的google云发布/订阅客户端,因为这需要对传输者技术深入理解。
如你所见,`ClientProxy`需要我们提供几个方法来建立和关闭连接,以及发布消息(`publish`)和事件(`dispatchEvent`)。注意,如果你不需要支持请求-响应的通讯风格,可以保持`publish()`方法空白。类似地,如果你不需要支持基于事件的通讯,跳过`dispatchEvent()`方法。
要观察何时何地执行了哪些方法,如下添加多个`console.log`方法:
```typescript
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect');
}
async close() {
console.log('close');
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet);
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {
console.log('message:', packet);
// In a real-world application, the "callback" function should be executed
// with payload sent back from the responder. Here, we'll simply simulate (5 seconds delay)
// that response came through by passing the same "data" as we've originally passed in.
setTimeout(() => callback({ response: packet.data }), 5000);
return () => console.log('teardown');
}
}
```
创建一个 `GoogleCloudPubSubClient `类并运行`send()`方法(参见前节),注册和返回一个可观察流。
```typescript
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
```
在终端可以看到如下输出:
```bash
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- after 5 seconds
```
要测试"teardown"方法(由`publish()`方法返回)正确执行,我们在流中添加一个超时操作,设置超时时间为2秒以保证其早于`setTimeout`调用回调函数。
```typescript
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message),
);
```
> `timeout`操作符从`rxjs/operators`包中导入。
应用`timeout`操作符,终端看上去类似如下:
```bash
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred
```
要分派一个事件(代替消息),使用`emit()`方法:
```typescript
googlePubSubClient.emit('event', 'Hello world!');
```
终端看上去如下:
```bash
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }
```
### 消息序列化
如果您需要围绕客户端的响应序列化添加一些自定义逻辑,您可以使用扩展 `ClientProxy` 类或其子类之一的自定义类。 要修改成功的请求,您可以覆盖 `serializeResponse` 方法,并且要修改通过此客户端的任何错误,您可以覆盖 `serializeError` 方法。 要使用此自定义类,您可以使用 `customClass `属性将类本身传递给 `ClientsModule.register()` 方法。 下面是一个将每个错误序列化为 `RpcException` 的自定义 `ClientProxy` 示例。
>error-handling.proxy.ts
~~~typescript
import { ClientTcp, RpcException } from '@nestjs/microservices';
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err);
}
}
~~~
然后像这样在 `ClientsModule` 中使用它:
>app.module.ts
~~~typescript
@Module({
imports: [
ClientsModule.register({
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}),
]
})
export class AppModule
~~~
> **暗示** :这是传递给 `customClass` 的类本身,而不是类的实例。 Nest 将在后台为您创建实例,并将给选项属性的任何选项传递给新的 `ClientProxy`。
- 介绍
- 概述
- 第一步
- 控制器
- 提供者
- 模块
- 中间件
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 自定义装饰器
- 基础知识
- 自定义提供者
- 异步提供者
- 动态模块
- 注入作用域
- 循环依赖
- 模块参考
- 懒加载模块
- 应用上下文
- 生命周期事件
- 跨平台
- 测试
- 技术
- 数据库
- Mongo
- 配置
- 验证
- 缓存
- 序列化
- 版本控制
- 定时任务
- 队列
- 日志
- Cookies
- 事件
- 压缩
- 文件上传
- 流式处理文件
- HTTP模块
- Session(会话)
- MVC
- 性能(Fastify)
- 服务器端事件发送
- 安全
- 认证(Authentication)
- 授权(Authorization)
- 加密和散列
- Helmet
- CORS(跨域请求)
- CSRF保护
- 限速
- GraphQL
- 快速开始
- 解析器(resolvers)
- 变更(Mutations)
- 订阅(Subscriptions)
- 标量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 联合类型
- 枚举(Enums)
- 字段中间件
- 映射类型
- 插件
- 复杂性
- 扩展
- CLI插件
- 生成SDL
- 其他功能
- 联合服务
- 迁移指南
- Websocket
- 网关
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 适配器
- 微服务
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定义传输器
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 独立应用
- Cli
- 概述
- 工作空间
- 库
- 用法
- 脚本
- Openapi
- 介绍
- 类型和参数
- 操作
- 安全
- 映射类型
- 装饰器
- CLI插件
- 其他特性
- 迁移指南
- 秘籍
- CRUD 生成器
- 热重载
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模块
- Swagger
- 健康检查
- CQRS
- 文档
- Prisma
- 静态服务
- Nest Commander
- 问答
- Serverless
- HTTP 适配器
- 全局路由前缀
- 混合应用
- HTTPS 和多服务器
- 请求生命周期
- 常见错误
- 实例
- 迁移指南
- 发现
- 谁在使用Nest?