ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
## gRPC [gRPC](https://github.com/grpc/grpc-node) 是一个现代的、高性能RPC框架,可以运行在任何环境下。它可以有效在数据中心之间连接服务,并通过插件支持负载均衡、跟踪、健康诊断和授权。 和很多RPC系统一样,gRPC基于可以定义远程调用的函数(方法)的概念。针对每个方法,定义一个参数并返回类型。服务、参数和返回类型在`.proto`文件中定义,使用谷歌的开源语言——中性[协议缓存(protocol buffers)](https://developers.google.com/protocol-buffers)机制。 使用gRPC传输器,Nest使用`.proto`文件来动态绑定客户端和服务以简化远程调用并自动序列化和反序列化结构数据。 ### 安装 在开始之前,我们必须安装所需的软件包: ``` $ npm i --save @grpc/grpc-js @grpc/proto-loader ``` ### 概述 类似其他微服务传输器层的实现,要使用gRPC传输器机制,你需要像下面的示例一样给`createMicroservice()`方法传递指定传输器`transport`属性和可选的`options`属性。 > main.ts ```typescript const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.GRPC, options: { package: 'hero', protoPath: join(__dirname, 'hero/hero.proto'), }, }); ``` > `join()`函数需要从`path`包导入,`Transport`枚举 需要从 `@nestjs/microservices` 包导入。 在 `nest-cli.json` 文件中,我们添加 `assets` 属性以便于部署非ts文件,添加 `watchAssets` 来对assets文件们进行监听,就grpc而言,我们希望 `.proto` 文件自动复制到 `dist` 文件夹下 ```typescript { "compilerOptions": { "assets": ["**/*.proto"], "watchAssets": true } } ``` ### 选项 **gRPC**传输器选项暴露了以下属性。 ||| |---|---| package|`Protobuf`包名称(与.proto文件定义的相匹配)。必须的 protoPath|`.proto`文件的绝对或相对路径,必须的。 url|连接url,字符串,格式为`ip address/dns name:port` (例如, 'localhost:50051') 定义传输器连接的地址/端口,可选的。默认为'localhost:5000' protoLoader|用来调用`.proto`文件的NPM包名,可选的,默认为'@grpc/proto-loader' loader| `@grpc/proto-loader`选项。可以控制`.proto`文件更多行为细节,可选的。[参见这里](https://github.com/grpc/grpc-node/blob/master/packages/proto-loader/README.md)。 credentials|服务器凭证,可选的。([参见更多](https://grpc.io/grpc/node/grpc.ServerCredentials.html)) ### 示例`gRPC`服务 我们定义`HeroesService`示例gRPC服务。在上述的`options`对象中, `protoPath` 是设置`.proto`定义文件`hero.proto`的路径。`hero.proto` 文件是使用协议缓冲区语言构建的。 > hero.proto ```proto syntax = "proto3"; package hero; service HeroesService { rpc FindOne (HeroById) returns (Hero) {} } message HeroById { int32 id = 1; } message Hero { int32 id = 1; string name = 2; } ``` 在上面的示例中,我们定义了一个 `HeroService`,它暴露了一个 `FindOne()` gRPC处理程序,该处理程序期望 `HeroById` 作为输入并返回一个 `Hero` 消息(协议缓冲语言使用`message`元素定义参数类型和返回类型)。 接下来,需要实现这个服务。如下在控制器中使用`@GrpcMethod()`装饰器来定义一个满足要求的处理程序。这个装饰器提供了要声明gRPC服务方法的元数据。 > 之前章节中介绍的`@MessagePattern()`装饰器([阅读更多](https://docs.nestjs.com/microservices/basics#request-response))在基于gRPC的微服务中不适用。基于gPRC的微服务使用`@GrpcMethod()`装饰器。 > hero.controller.ts ```typescript @Controller() export class HeroesController { @GrpcMethod('HeroesService', 'FindOne') findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any>): Hero { const items = [ { id: 1, name: 'John' }, { id: 2, name: 'Doe' }, ]; return items.find(({ id }) => id === data.id); } } ``` > `@GrpcMethod()` 需要从 `@nestjs/microservices` 包导入 。`Metadata`和`ServerUnaryCall`从`grpc`导入。 上述装饰器有两个参数。第一个是服务名称(例如`HeroesService`),对应在`hero.proto`文件中定义的`HeroesService`,第二个(字符串`FindOne`)对应`hero.proto`文件中`HeroesService`内定义的`FindOne()`方法。 `findone()`处理程序方法有三个参数,`data`从调用者中传递,`metadata`保存了gRPC需要的元数据,`call`用于获取`GrpcCall`对象属性,例如`sendMetadata`以像客户端发送元数据。 `@GrpcMethod()`装饰器两个参数都是可选的,如果不指定第二个参数(例如`FindOne`),Nest会自动将`.proto`文件中的rpc方法与处理程序相关联,并将rpc处理程序名称转换为大写骆驼格式(例如,`findOne`处理器与`FindOne`rpc调用定义关联),如下: > hero.controller.ts ```typescript @Controller() export class HeroesController { @GrpcMethod('HeroesService') findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any>): Hero { const items = [ { id: 1, name: 'John' }, { id: 2, name: 'Doe' }, ]; return items.find(({ id }) => id === data.id); } } ``` 也可以忽略`@GrpcMethod()`的第一个参数。在这种情况下,Nest将基于定义了处理程序的类的`proto`文件自动关联处理程序和服务定义。例如,在以下代码中,类`HeroesService`和它在`hero.proto`文件中定义的`HeroesService`服务的处理器方法相关联,以`HeroesService`名称相匹配。 > hero.controller.ts ```typescript @Controller() export class HeroesService { @GrpcMethod() findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any>): Hero { const items = [ { id: 1, name: 'John' }, { id: 2, name: 'Doe' }, ]; return items.find(({ id }) => id === data.id); } } ``` ### 客户端 Nest应用可以作为gRPC客户端,消费`.proto`文件定义的服务。你可以使用`ClientGrpc`对象调用远程服务。可以通过几种方式调用`ClientGrpc`对象。 推荐的技术是导入`ClientModule`,使用`register()` 方法绑定一个在`.proto`文件中定义的服务包以注入token并配置服务。`name`属性是注入的token。在gRPC服务中,使用`transport:Transport.GRPC`,`options`属性和前节相同。 ```typescript imports: [ ClientsModule.register([ { name: 'HERO_PACKAGE', transport: Transport.GRPC, options: { package: 'hero', protoPath: join(__dirname, 'hero/hero.proto'), }, }, ]), ]; ``` > `register()`方法包含一个对象数组。通过逗号分隔注册对象以注册多个对象。 注册后,可以使用`@Inject()`注入配置的`ClientGrpc`对象。然后使用`ClientGrpc`对象的`getService()`方法来获取服务实例,如下: ```typescript @Injectable() export class AppService implements OnModuleInit { private heroesService: HeroesService; constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {} onModuleInit() { this.heroesService = this.client.getService<HeroesService>('HeroesService'); } getHero(): Observable<string> { return this.heroesService.findOne({ id: 1 }); } } ``` > gRPC客户端不会发送名称包含下划线`_`的字段,除非`keepCase`选项在`proto`装载配置中(`options.loader.keepcase`在微服务传输器配置中)被配置为`true`。 注意,和其他微服务传输器方法相比,这里的技术有一点细微的区别。使用`ClientGrpc`代替`ClientProxy`类,提供`getService()`方法,使用一个服务名称作为参数并返回它的实例(如果存在)。 也可以使用 `@Client()` 装饰器来初始化`ClientGrpc`对象,如下: ```typescript @Injectable() export class AppService implements OnModuleInit { @Client({ transport: Transport.GRPC, options: { package: 'hero', protoPath: join(__dirname, 'hero/hero.proto'), }, }) client: ClientGrpc; private heroesService: HeroesService; onModuleInit() { this.heroesService = this.client.getService<HeroesService>('HeroesService'); } getHero(): Observable<string> { return this.heroesService.findOne({ id: 1 }); } } ``` 最后,在更复杂的场景下,我们可以使用`ClientProxyFactory`注入一个动态配置的客户端。 在任一种情况下,最终要需要`HeroesService`代理对象,它暴露了 `.proto` 文件中定义的同一组方法。现在可以访问这些代理对象(例如,heroesService),gRPC系统自动序列化请求并发送到远程系统中,返回应答,并且反序列化应答。由于gRPC屏蔽了网络通讯的细节,`herosService`看上去和本地服务一样。 注意,所有这些都是 **小写** (为了遵循自然惯例)。基本上,我们的`.proto`文件 `HeroService` 定义包含 `FindOne()` 函数。这意味着 `heroService` 实例将提供 `findOne()` 方法。 ```typescript interface HeroService { findOne(data: { id: number }): Observable<any>; } ``` 消息处理程序也可以返回一个`Observable`,在流完成之后其结果值会被发出。 > hero.controller.ts ```typescript @Get() call(): Observable<any> { return this.heroService.findOne({ id: 1 }); } ``` 要发送gRPC元数据(随请求),可以像如下这样传递第二个参数: ```typescript call(): Observable<any> { const metadata = new Metadata(); metadata.add('Set-Cookie', 'yummy_cookie=choco'); return this.heroesService.findOne({ id: 1 }, metadata); } ``` > `Metadata`类从`grpc`包中导入。 注意,这可能需要更新我们在之前步骤中定义的`HeroesService`接口。 #### 示例 [这里](https://github.com/nestjs/nest/tree/master/sample/04-grpc) 提供了一个完整的示例。 ### gRPC流 `GRPC` 本身支持长期的实时连接(称为流)。 对于诸如聊天,热评或数据块传输之类的服务案例,流可以是非常有用的工具。 您可以在官方文档([此处](https://grpc.io/docs/guides/concepts/))中找到更多详细信息。 `Nest` 通过两种可能的方式支持 `GRPC`流处理程序: - `RxJS Subject + Observable` 处理程序:可用于在`Controller` 内部编写响应或将其传递给 `Subject / Observable`使用者。 - `Pure GRPC` 调用流处理程序:将其传递给某个执行程序非常有用,后者将处理节点标准双工流处理程序的其余分派。 ### 流示例 定义一个示例的gRPC服务,名为`HelloService`。`hello.proto`文件使用协议缓冲语言组织,如下: ```typescript // hello/hello.proto syntax = "proto3"; package hello; service HelloService { rpc BidiHello(stream HelloRequest) returns (stream HelloResponse); rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse); } message HelloRequest { string greeting = 1; } message HelloResponse { string reply = 1; } ``` > `LotsOfGreetings`方法可以简单使用`@GrpcMethod`装饰器(参考以上示例),以返回流并发射出多个值。 基于`.proto`文件,定义`HelloService`接口。 ```typescript interface HelloService { bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>; lotsOfGreetings( upstream: Observable<HelloRequest>, ): Observable<HelloResponse>; } interface HelloRequest { greeting: string; } interface HelloResponse { reply: string; } ``` **提示**: proto 接口可以由[ts-proto包自动生成,在](https://github.com/stephenh/ts-proto)[这里](https://github.com/stephenh/ts-proto/blob/main/NESTJS.markdown)了解更多。 ### 主题策略 `@GrpcStreamMethod()` 装饰器提供`RxJS Observable`的函数参数,也就是说,我们可以接收和处理多个消息。 ```typescript @GrpcStreamMethod() bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> { const subject = new Subject(); const onNext = message => { console.log(message); subject.next({ reply: 'Hello, world!' }); }; const onComplete = () => subject.complete(); messages.subscribe({ next: onNext, complete: onComplete, }); return subject.asObservable(); } ``` > 为了支持与 `@GrpcStreamMethod()` 装饰器的全双工交互,需要从`Controller` 方法中返回 `RxJS Observable`。 > `Metadata`和`ServerUnaryCall`类/接口从`grpc`包中导入。 依据服务定义(在`.proto`文件中),`BidiHello`方法需要向服务发送流请求。要从客户端发送多个异步消息到流,需要暴露一个RxJS的`ReplySubject`类。 ```typescript const helloService = this.client.getService<HelloService>('HelloService'); const helloRequest$ = new ReplaySubject<HelloRequest>(); helloRequest$.next({ greeting: 'Hello (1)!' }); helloRequest$.next({ greeting: 'Hello (2)!' }); helloRequest$.complete(); return helloService.bidiHello(helloRequest$); ``` 在上述示例中,将两个消息写入流(`next()`调用)并且通知服务我们完成两个数据发送(`complete()`调用)。 ### 调用流处理程序 当返回值被定义为`stream`时,`@GrpcStreamCall()`装饰器提供了一个`grpc.ServerDuplexStream`作为函数参数,支持标准的 `.on('data', callback)`、`.write(message)`或 `.cancel()`方法,有关可用方法的完整文档可在[此处](https://grpc.github.io/grpc/node/grpc-ClientDuplexStream.html)找到。 可选的,当方法返回值不是`stream`时,`@GrpcStreamCall()`装饰器提供两个函数参数,分别为`grpc.ServerReadableStream` ([参见这里](https://grpc.github.io/grpc/node/grpc-ServerReadableStream.html)) 和`callback`。 接下来开始应用`BidiHello`,它应该支持全双工交互。 ```typescript @GrpcStreamCall() bidiHello(requestStream: any) { requestStream.on('data', message => { console.log(message); requestStream.write({ reply: 'Hello, world!' }); }); } ``` > 此装饰器不需要提供任何特定的返回参数。 可以像对待任何其他标准流类型一样处理流。 在上述示例中,使用`write()`方法将对象写入响应流。将回调信息作为第二个参数传递给`.on()`方法,当服务每次收到收据块时会进行调用。 应用`LotsOfGreetings`方法: ```typescript @GrpcStreamCall() lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) { requestStream.on('data', message => { console.log(message); }); requestStream.on('end', () => callback(null, { reply: 'Hello, world!' })); } ``` 这里使用`callback`函数在`requestStream`完成时来发送响应。 ### gRPC 元数据 元数据是一系列反应特定RPC调用信息的键值对,键是字符串格式,值通常是字符串,但也可以是二进制数据。元数据对gRPC自身而言是不透明的,客户端向服务器发送信息时携带元数据信息,反之亦然。元数据包含认证token,请求指示器和监控用途的标签,以及数据信息例如数据集中的记录数量。 要在`@GrpcMethod()`处理程序中读取元数据,使用第二个参数(元数据),类型为`Metadata`(从`grpc`包中导入)。 要从处理程序中发回元数据,使用`ServerUnaryCall#sendMetadata()`方法(第三个处理程序参数)。 >heroes.controller.ts ```typescript @Controller() export class HeroesService { @GrpcMethod() findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any>): Hero { const serverMetadata = new Metadata(); const items = [ { id: 1, name: 'John' }, { id: 2, name: 'Doe' }, ]; serverMetadata.add('Set-Cookie', 'yummy_cookie=choco'); call.sendMetadata(serverMetadata); return items.find(({ id }) => id === data.id); } } ``` 类似地,要使用`@GrpcStreamMethod()`处理程序(主题策略)在处理程序注释中读取元数据,使用第二个参数(元数据),类型为`Metadata`(从`grpc`包中导入)。 要从处理程序中发回元数据,使用`ServerDuplexStream#sendMetadata()`方法(第三个处理程序参数)。 要从[call stream handlers](https://docs.nestjs.com/microservices/grpc#call-stream-handler)(使用`@GrpcStreamCall()`装饰器注释的处理程序)中读取元数据,监听`requestStream`引用中的`metadata`事件。 ```typescript requestStream.on('metadata', (metadata: Metadata) => { const meta = metadata.get('X-Meta'); }); ```