ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
## 订阅(Subscriptions) 除了使用查询获取数据和使用突变修改数据之外,`GraphQL` 规范还支持第三种操作类型,称为订阅。 `GraphQL` 订阅是一种将数据从服务器推送到选择监听来自服务器的实时消息的客户端的方式。 订阅类似于查询,因为它们指定一组要传递给客户端的字段,但不是立即返回单个答案,而是打开一个通道并将结果发送给客户端,每次服务器上发生特定事件时 . 订阅的一个常见用例是向客户端通知特定事件,例如创建新对象、更新字段等(在[此处](https://www.apollographql.com/docs/react/data/subscriptions)阅读更多内容)。 ```typescript Subscription: { commentAdded: { subscribe: () => pubSub.asyncIterator('commentAdded'); } } ``` > `pubsub` 是一个 `PubSub` 类的实例。在[这里](https://www.apollographql.com/docs/graphql-subscriptions/setup.html)阅读更多。 ## 使用 Apollo 驱动程序启用订阅[#](enable-subscriptions-with-apollo-driver) 要启用订阅,请将 `installSubscriptionHandlers` 属性设置为 `true`。 ~~~typescript GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, installSubscriptionHandlers: true, }), ~~~ > <a style="color:red;">**提醒**:</a>`installSubscriptionHandlers` 配置选项已从最新版本的 Apollo 服务器中删除,并且很快也会在此包中弃用。 默认情况下,`installSubscriptionHandlers` 将回退到使用 `subscriptions-transport-ws`([阅读更多](https://github.com/apollographql/subscriptions-transport-ws)),但我们强烈建议使用 `graphql-ws`([阅读更多](https://github.com/enisdenjo/graphql-ws))库。 要改用 `graphql-ws` 包,请使用以下配置: ~~~typescript GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, subscriptions: { 'graphql-ws': true }, }), ~~~ > 您还可以同时使用这两个包(`subscriptions-transport-ws` 和 `graphql-ws`),例如,为了向后兼容。 ### 代码优先[#](#code-first) 要使用代码优先方法创建订阅,我们使用 `@Subscription() `装饰器(从 `@nestjs/graphql `包中导出)和来自 `graphql-subscriptions `包的 `PubSub`类,它提供了一个简单的发布/订阅 API。 以下订阅处理程序通过调用 `PubSub#asyncIterator` 来处理订阅事件。 此方法采用单个参数 `triggerName`,它对应于事件主题名称。 ~~~typescript const pubSub = new PubSub(); @Resolver((of) => Author) export class AuthorResolver { // ... @Subscription((returns) => Comment) commentAdded() { return pubSub.asyncIterator('commentAdded'); } } ~~~ >所有装饰器都是从 `@nestjs/graphql` 包中导出的,而 `PubSub` 类是从 `graphql-subscriptions` 包中导出的。 > <a style="color:orange;">**笔记:**</a> `PubSub` 是一个公开简单的发布和订阅 API 的类。 在此处阅读更多相关信息。 请注意,`Apollo` 文档警告说默认实现不适合生产(在[此处](https://www.apollographql.com/docs/apollo-server/data/subscriptions/)阅读更多信息)。 生产应用程序应使用由外部商店支持的 `PubSub` 实现(在[此处](https://github.com/apollographql/graphql-subscriptions#pubsub-implementations)阅读更多信息)。 这将导致在 SDL 中生成 GraphQL 架构的以下部分: ~~~graphql type Subscription { commentAdded(): Comment! } ~~~ 请注意,根据定义,订阅返回一个具有单个顶级属性的对象,其键是订阅的名称。 此名称要么继承自订阅处理程序方法的名称(即上面的 `commentAdded`),要么通过将带有键名称的选项作为第二个参数传递给` @Subscription() `装饰器来显式提供,如下所示。 ~~~typescript @Subscription(returns => Comment, { name: 'commentAdded', }) subscribeToCommentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 此构造生成与前面的代码示例相同的 SDL,但允许我们将方法名称与订阅分离。 ### 发布[#](#publishing) 现在,要发布事件,我们使用 `PubSub#publish` 方法。 这通常在突变中使用,以在对象图的一部分发生更改时触发客户端更新。 例如: >posts/posts.resolver.ts ~~~typescript @Mutation(returns => Post) async addComment( @Args('postId', { type: () => Int }) postId: number, @Args('comment', { type: () => Comment }) comment: CommentInput, ) { const newComment = this.commentsService.addComment({ id: postId, comment }); pubSub.publish('commentAdded', { commentAdded: newComment }); return newComment; } ~~~ `PubSub#publish` 方法将 `triggerName`(同样,将其视为事件主题名称)作为第一个参数,将事件有效负载作为第二个参数。 如前所述,根据定义,订阅返回一个值并且该值具有形状。 再次查看为我们的 `commentAdded` 订阅生成的 SDL: ~~~graphql type Subscription { commentAdded(): Comment! } ~~~ 这告诉我们订阅必须返回一个顶级属性名称为 `commentAdded` 的对象,该对象的值是一个 `Comment `对象。 需要注意的重要一点是,`PubSub#publish` 方法发出的事件负载的形状必须与预期从订阅返回的值的形状相对应。 因此,在我们上面的示例中,`pubSub.publish('commentAdded', { commentAdded: newComment })` 语句发布了一个带有适当形状有效负载的 `commentAdded `事件。 如果这些形状不匹配,您的订阅将在 `GraphQL` 验证阶段失败。 ### 过滤订阅[#](#filtering-subscriptions) 要过滤掉特定事件,请将过滤器属性设置为过滤器函数。 此函数的作用类似于传递给数组过滤器的函数。 它有两个参数:包含事件有效负载(由事件发布者发送)的有效负载,以及接受订阅请求期间传入的任何参数的变量。 它返回一个布尔值,确定是否应将此事件发布给客户端侦听器。 ~~~typescript @Subscription(returns => Comment, { filter: (payload, variables) => payload.commentAdded.title === variables.title, }) commentAdded(@Args('title') title: string) { return pubSub.asyncIterator('commentAdded'); } ~~~ ### 改变订阅负载[#](#mutating-subscription-payloads) 要改变已发布的事件负载,请将 `resolve` 属性设置为函数。 该函数接收事件有效负载(由事件发布者发送)并返回适当的值。 ~~~typescript @Subscription(returns => Comment, { resolve: value => value, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ > <a style="color:orange;">**笔记:**</a>如果你使用 `resolve` 选项,你应该返回解包后的负载(例如,在我们的例子中,直接返回一个 `newComment`对象,而不是一个 `{ commentAdded: newComment }` 对象)。 如果您需要访问注入的提供者(例如,使用外部服务来验证数据),请使用以下构造。 ~~~typescript @Subscription(returns => Comment, { resolve(this: AuthorResolver, value) { // "this" refers to an instance of "AuthorResolver" return value; } }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 相同的结构适用于过滤器: ~~~typescript @Subscription(returns => Comment, { filter(this: AuthorResolver, payload, variables) { // "this" refers to an instance of "AuthorResolver" return payload.commentAdded.title === variables.title; } }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ ### 架构优先[#](#schema-first) 要在 Nest 中创建等效订阅,我们将使用 `@Subscription()`装饰器。 ~~~typescript const pubSub = new PubSub(); @Resolver('Author') export class AuthorResolver { // ... @Subscription() commentAdded() { return pubSub.asyncIterator('commentAdded'); } } ~~~ 要根据上下文和参数过滤掉特定事件,请设置过滤器(`filter`)属性。 ~~~typescript @Subscription('commentAdded', { filter: (payload, variables) => payload.commentAdded.title === variables.title, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 要改变已发布的有效负载,我们可以使用解析(`resolve`)函数。 ~~~typescript @Subscription('commentAdded', { resolve: value => value, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造: ~~~typescript @Subscription('commentAdded', { resolve(this: AuthorResolver, value) { // "this" refers to an instance of "AuthorResolver" return value; } }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 相同的结构适用于过滤器: ~~~typescript @Subscription('commentAdded', { filter(this: AuthorResolver, payload, variables) { // "this" refers to an instance of "AuthorResolver" return payload.commentAdded.title === variables.title; } }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ~~~ 最后一步是更新类型定义文件。 ~~~graphql type Author { id: Int! firstName: String lastName: String posts: [Post] } type Post { id: Int! title: String votes: Int } type Query { author(id: Int!): Author } type Comment { id: String content: String } type Subscription { commentAdded(title: String!): Comment } ~~~ 有了这个,我们创建了一个 commentAdded(title: String!): Comment 订阅。 您可以在[此处](https://github.com/nestjs/nest/blob/master/sample/12-graphql-schema-first)找到完整的示例实现。 ### 发布订阅[#](#pubsub) 我们在上面实例化了一个本地 `PubSub` 实例。 首选方法是将 `PubSub` 定义为提供者并通过构造函数注入它(使用 `@Inject()` 装饰器)。 这允许我们在整个应用程序中重用该实例。 例如,如下定义一个提供者,然后在需要的地方注入`“PUB_SUB”`。 ~~~typescript { provide: 'PUB_SUB', useValue: new PubSub(), } ~~~ ### 自定义订阅服务器[#](#customize-subscriptions-server) 要自定义订阅服务器(例如,更改路径),请使用订阅(`subscriptions`)选项属性。 ~~~typescript GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, subscriptions: { 'subscriptions-transport-ws': { path: '/graphql' }, } }), ~~~ 如果您使用 `graphql-ws` 包进行订阅,请将 `subscriptions-transport-ws` 键替换为 `graphql-ws`,如下所示: ~~~typescript GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, subscriptions: { 'graphql-ws': { path: '/graphql' }, } }), ~~~ ### 通过 WebSocket 进行身份验证[#](#authentication-over-websockets) 检查用户是否通过身份验证可以在 `onConnect` 回调函数中完成,您可以在订阅选项中指定该回调函数。 `onConnect` 将接收传递给 `SubscriptionClient` 的 `connectionParams` 作为第一个参数([阅读更多](https://www.apollographql.com/docs/react/data/subscriptions/#5-authenticate-over-websocket-optional))。 ~~~typescript GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, subscriptions: { 'subscriptions-transport-ws': { onConnect: (connectionParams) => { const authToken = connectionParams.authToken; if (!isValid(authToken)) { throw new Error('Token is not valid'); } // extract user information from token const user = parseToken(authToken); // return user info to add them to the context later return { user }; }, } }, context: ({ connection }) => { // connection.context will be equal to what was returned by the "onConnect" callback }, }), ~~~ 此示例中的 `authToken` 仅在首次建立连接时由客户端发送一次。 使用此连接进行的所有订阅都将具有相同的 `authToken`,因此具有相同的用户信息。 > <a style="color:orange;">**笔记:**</a>`subscriptions-transport-ws` 中有一个错误,允许连接跳过 `onConnect` 阶段([阅读更多](https://github.com/apollographql/subscriptions-transport-ws/issues/349))。 您不应假设在用户开始订阅时调用了 `onConnect`,并且始终检查上下文是否已填充。 如果您使用的是 `graphql-ws` 包,`onConnect` 回调的签名会略有不同: ~~~typescript subscriptions: { 'graphql-ws': { onConnect: (context: Context<any>) => { const { connectionParams, extra } = context; // user validation will remain the same as in the example above // when using with graphql-ws, additional context value should be stored in the extra field extra.user = { user: {} }; }, }, context: ({ extra }) => { // you can now access your additional context value through the extra field } }, ~~~ ## 使用 Mercurius 驱动程序启用订阅[#](#enable-subscriptions-with-mercurius-driver) 要启用订阅,请将订阅(`subscription`)属性设置为 `true`。 ~~~typescript GraphQLModule.forRoot<MercuriusDriverConfig>({ driver: MercuriusDriver, subscription: true, }), ~~~ > **提示**:您还可以传递选项对象来设置自定义发射器、验证传入连接等。在[此处](https://github.com/mercurius-js/mercurius/blob/master/docs/api/options.md#plugin-options)阅读更多信息(请参阅订阅)。 ### 代码优先[#](#code-first-1) 要使用代码优先的方法创建订阅,我们使用 `@Subscription()` 装饰器(从 `@nestjs/graphql `包中导出)和 `mercurius` 包中的 `PubSub` 类,它提供了一个简单的发布/订阅 API。 以下订阅处理程序通过调用 `PubSub#asyncIterator` 来处理订阅事件。 此方法采用单个参数 `triggerName`,它对应于事件主题名称。 ~~~typescript @Resolver((of) => Author) export class AuthorResolver { // ... @Subscription((returns) => Comment) commentAdded(@Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } } ~~~ 上面示例中使用的所有装饰器都是从 `@nestjs/graphql`包中导出的,而 `PubSub` 类是从 `mercurius` 包中导出的。 `PubSub` 是一个公开简单的发布和订阅 API 的类。 查看此部分,了解如何注册自定义 `PubSub`类。 这将导致在 SDL 中生成 GraphQL 架构的以下部分: ~~~graphql type Subscription { commentAdded(): Comment! } ~~~ 请注意,根据定义,订阅返回一个具有单个顶级属性的对象,其键是订阅的名称。 此名称要么继承自订阅处理程序方法的名称(即上面的 `commentAdded`),要么通过将带有键名称的选项作为第二个参数传递给 `@Subscription()` 装饰器来显式提供,如下所示。 ~~~typescript @Subscription(returns => Comment, { name: 'commentAdded', }) subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } ~~~ 此构造生成与前面的代码示例相同的 SDL,但允许我们将方法名称与订阅分离。 ### 发布[#](#publishing-1) 现在,要发布事件,我们使用 `PubSub#publish` 方法。 这通常在突变中使用,以在对象图的一部分发生更改时触发客户端更新。 例如: >posts/posts.resolver.ts ~~~typescript @Mutation(returns => Post) async addComment( @Args('postId', { type: () => Int }) postId: number, @Args('comment', { type: () => Comment }) comment: CommentInput, @Context('pubsub') pubSub: PubSub, ) { const newComment = this.commentsService.addComment({ id: postId, comment }); await pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment } }); return newComment; } ~~~ 如前所述,根据定义,订阅返回一个值并且该值具有形状。 再次查看为我们的 `commentAdded`订阅生成的 SDL: ~~~graphql type Subscription { commentAdded(): Comment! } ~~~ 这告诉我们订阅必须返回一个顶级属性名称为 `commentAdded`的对象,该对象的值是一个 `Comment` 对象。 需要注意的重要一点是,`PubSub#publish` 方法发出的事件负载的形状必须与预期从订阅返回的值的形状相对应。 因此,在我们上面的示例中, `pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment } })`语句发布了一个带有适当形状的有效负载的 `commentAdded` 事件。 如果这些类型不匹配,您的订阅将在 GraphQL 验证阶段失败。 ### 过滤订阅[#](#filtering-subscriptions-1) 要过滤掉特定事件,请将过滤器属性设置为过滤器函数。 此函数的作用类似于传递给数组过滤器的函数。 它有两个参数:包含事件有效负载(由事件发布者发送)的有效负载,以及接受订阅请求期间传入的任何参数的变量。 它返回一个布尔值,确定是否应将此事件发布给客户端侦听器。 ~~~typescript @Subscription(returns => Comment, { filter: (payload, variables) => payload.commentAdded.title === variables.title, }) commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } ~~~ 如果您需要访问注入的提供者(例如,使用外部服务来验证数据),请使用以下构造。 ~~~typescript @Subscription(returns => Comment, { filter(this: AuthorResolver, payload, variables) { // "this" refers to an instance of "AuthorResolver" return payload.commentAdded.title === variables.title; } }) commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } ~~~ ### 架构优先#](#schema-first-1) 要在 Nest 中创建等效订阅,我们将使用 `@Subscription()` 装饰器。 ~~~typescript const pubSub = new PubSub(); @Resolver('Author') export class AuthorResolver { // ... @Subscription() commentAdded(@Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } } ~~~ 要根据上下文和参数过滤掉特定事件,请设置过滤器属性。 ~~~typescript @Subscription('commentAdded', { filter: (payload, variables) => payload.commentAdded.title === variables.title, }) commentAdded(@Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } ~~~ 如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造: ~~~typescript @Subscription('commentAdded', { filter(this: AuthorResolver, payload, variables) { // "this" refers to an instance of "AuthorResolver" return payload.commentAdded.title === variables.title; } }) commentAdded(@Context('pubsub') pubSub: PubSub) { return pubSub.subscribe('commentAdded'); } ~~~ 最后一步是更新类型定义文件。 ~~~graphql type Author { id: Int! firstName: String lastName: String posts: [Post] } type Post { id: Int! title: String votes: Int } type Query { author(id: Int!): Author } type Comment { id: String content: String } type Subscription { commentAdded(title: String!): Comment } ~~~ 有了这个,我们创建了一个 `commentAdded(title: String!): Comment` 订阅。 #### 发布订阅[#](#pubsub-1) 在上面的示例中,我们使用了默认的 `PubSub` 发射器 (`mqemitter`) 首选方法(用于生产)是使用 `mqemitter-redis`。 或者,可以提供自定义 `PubSub` 实现(在[此处](https://github.com/mercurius-js/mercurius/blob/master/docs/subscriptions.md)阅读更多信息) ~~~typescript GraphQLModule.forRoot<MercuriusDriverConfig>({ driver: MercuriusDriver, subscription: { emitter: require('mqemitter-redis')({ port: 6579, host: '127.0.0.1', }), }, }); ~~~ ### 通过 WebSocket 进行身份验证[#](#authentication-over-websockets-1) 检查用户是否通过身份验证可以在您可以在订阅选项中指定的 `verifyClient`回调函数中完成。 `verifyClient` 将接收` info` 对象作为第一个参数,您可以使用它来检索请求的标头。 ~~~typescript GraphQLModule.forRoot<MercuriusDriverConfig>({ driver: MercuriusDriver, subscription: { verifyClient: (info, next) => { const authorization = info.req.headers?.authorization as string; if (!authorization?.startsWith('Bearer ')) { return next(false); } next(true); }, } }), ~~~ <!-- tabs:start --> #### ** 模式优先 ** 为了以 Nest 方式创建等效订阅,我们将使用 `@Subscription()` 装饰器。 ```typescript const pubSub = new PubSub(); @Resolver('Author') export class AuthorResolver { constructor( private readonly authorsService: AuthorsService, private readonly postsService: PostsService, ) {} @Query('author') async getAuthor(@Args('id') id: number) { return await this.authorsService.findOneById(id); } @ResolveProperty('posts') async getPosts(@Parent() author) { const { id } = author; return await this.postsService.findAll({ authorId: id }); } @Subscription() commentAdded() { return pubSub.asyncIterator('commentAdded'); } } ``` 为了根据上下文和参数过滤掉特定事件,我们可以设置一个 filter 属性。 ```typescript @Subscription('commentAdded', { filter: (payload, variables) => payload.commentAdded.repositoryName === variables.repoFullName, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ``` 为了改变已发布的有效负载,我们可以使用 resolve 函数。 ```typescript @Subscription('commentAdded', { resolve: value => value, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ``` ### 类型定义 最后一步是更新类型定义文件。 ```typescript type Author { id: Int! firstName: String lastName: String posts: [Post] } type Post { id: Int! title: String votes: Int } type Query { author(id: Int!): Author } type Comment { id: String content: String } type Subscription { commentAdded(repoFullName: String!): Comment } ``` 做得好。我们创建了一个 commentAdded(repoFullName: String!): Comment 订阅。您可以在[此处](https://github.com/nestjs/nest/blob/master/sample/12-graphql-apollo)找到完整的示例实现。 #### ** 使用 Typescript ** 要使用 class-first 方法创建订阅,我们将使用 @Subscription() 装饰器。 ```typescript const pubSub = new PubSub(); @Resolver('Author') export class AuthorResolver { constructor( private readonly authorsService: AuthorsService, private readonly postsService: PostsService, ) {} @Query(returns => Author, { name: 'author' }) async getAuthor(@Args({ name: 'id', type: () => Int }) id: number) { return await this.authorsService.findOneById(id); } @ResolveProperty('posts') async getPosts(@Parent() author) { const { id } = author; return await this.postsService.findAll({ authorId: id }); } @Subscription(returns => Comment) commentAdded() { return pubSub.asyncIterator('commentAdded'); } } ``` 为了根据上下文和参数过滤掉特定事件,我们可以设置 filter 属性。 ```typescript @Subscription(returns => Comment, { filter: (payload, variables) => payload.commentAdded.repositoryName === variables.repoFullName, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ``` 为了改变已发布的有效负载,我们可以使用 resolve 函数。 ```typescript @Subscription(returns => Comment, { resolve: value => value, }) commentAdded() { return pubSub.asyncIterator('commentAdded'); } ``` <!-- tabs:end --> ### Pubsub 我们在这里使用了一个本地 `PubSub` 实例。相反, 我们应该将 `PubSub` 定义为一个组件, 通过构造函数 (使用 `@Inject ()` 装饰器) 注入它, 并在整个应用程序中重用它。[您可以在此了解有关嵌套自定义组件的更多信息](/8/fundamentals?id=自定义providercustomer-provider)。 ```typescript { provide: 'PUB_SUB', useValue: new PubSub(), } ``` ### Module 为了启用订阅,我们必须将 `installSubscriptionHandlers` 属性设置为 `true` 。 ```typescript GraphQLModule.forRoot({ typePaths: ['./**/*.graphql'], installSubscriptionHandlers: true, }), ``` 要自定义订阅服务器(例如,更改端口),您可以使用 `subscriptions` 属性(阅读[更多](https://www.apollographql.com/docs/apollo-server/v2/api/apollo-server.html#constructor-options-lt-ApolloServer-gt))。