企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 队列 队列是一种有用的设计模式,可以帮助你处理一般应用规模和性能的挑战。一些队列可以帮助你处理的问题示例包括: - 平滑输出峰值。例如,如果用户可以在任何时间创建资源敏感型任务,你可以将其添加到一个消息队列中而不是同步执行。然后你可以通过工作者进程从队列中以一个可控的方式取出进程。在应用规模增大时,你可以轻松添加新的队列消费者来提高后端任务处理能力。 - 将可能阻塞`Node.js`事件循环的整体任务打碎。例如,如果一个用户请求是 CPU 敏感型工作,例如音频转码,你可以将其委托给其他进程,从而保证用户接口进程保持响应。 - 在不同的服务间提供一个可信的通讯通道。例如,你可以将任务(工作)加入一个进程或服务,并由另一个进程或服务来消费他们。你可以在由其他任何进程或服务执行的工作完成、错误或者其他状态变化时得到通知(通过监听状态事件)。当队列生产者或者消费者失败时,他们的状态会被保留,任务将在 node 重启后自动重启。 Nest 提供了`@nestjs/bull`包,这是[Bull](https://github.com/OptimalBits/bull)包的一个包装器,Bull 是一个流行的、支持良好的、高性能的基于 Nodejs 的消息队列系统应用。该包将 Bull 队列以 Nest 友好的方式添加到你的应用中。 Bull 使用[Redis](https://redis.io/)持久化工作数据,因此你需要在你的系统中安装 Redis。因为他是基于 Redis 的,你的队列结构可以是完全分布式的并且和平台无关。例如,你可以有一些队列[生产者](https://docs.nestjs.com/techniques/queues#producers)、[消费者](https://docs.nestjs.com/techniques/queues#consumers)和[监听者](https://docs.nestjs.com/techniques/queues#event-listeners),他们运行在 Nest 的一个或多个节点上,同时,其他生产者、消费者和监听者在其他 Node.js 平台或者其他网络节点上。 本章使用`@nestjs/bull`包,我们同时推荐阅读[BUll 文档](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md)来获取更多背景和应用细节。 ### 安装 要开始使用,我们首先安装需要的依赖: ```typescript $ npm install --save @nestjs/bull bull $ npm install --save-dev @types/bull ``` 一旦安装过程完成,我们可以在根`AppModule`中导入`BullModule`。 > app.module.ts ```typescript import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.registerQueue({ name: 'audio', redis: { host: 'localhost', port: 6379, }, }), ], }) export class AppModule {} ``` `registerQueue()`方法用于实例化并/或注册队列。队列在不同的模块和进程之间共享,在底层则通过同样的凭据连接到同样的 Redis 数据库。每个队列由其`name`属性区分(如下),当共享队列(跨模块/进程)时,第一个`registerQueue()`方法同时实例化该队列并向模块注册它。其他模块(在相同或者不同进程下)则简单地注册队列。队列注册创建一个`injection token`,它可以被用在给定 Nest 模块中获取队列。 针对每个队列,传递一个包含下列属性的配置对象: -`name:string`- 一个队列名称,它可以被用作`injection token`(用于将队列注册到控制器/提供者),也可以作为装饰器参数来将消费者类和监听者与队列联系起来。是必须的。 -`limiter:RateLimiter`-该选项用于确定消息队列处理速率,查看[RateLimiter](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。 -`redis:RedisOpts`-该选项用于配置 Redis 连接,查看[RedisOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。 -`prefix: string`-队列所有键的前缀。可选的。 -`defaultJobOptions: JobOpts`-选项用以控制新任务的默认属性。查看[JobOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)获取更多信息。可选的。 -`settings: AdvancedSettings`-高级队列配置设置。这些通常不需要改变。查看[AdvancedSettings](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。 注意,`name`属性是必须的。其他选项是可选的,为队列行为提供更细节的控制。这些会直接传递给 Bull 的`Queue`构造器。在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)阅读更多选项。当在第二个或者子模块中注册一个队列时,最佳时间是省略配置对象中除`name`属性之外的所有选项。这些选项仅应该在实例化队列的模块中确定。 > 在`registerQueue()`方法中传递多个逗号分隔的选项对象来创建多个队列。 由于任务在 Redis 中是持久化的,每次当一个特定名称的队列被实例化时(例如,当一个 app 启动/重启时),它尝试处理任何可能在前一个旧的任务遗留未完成的`session`。 每个队里可能有一个或很多生产者、消费者以及监听者。消费者从一个特定命令队列中获取任务:FIFO(默认,先进先出),LIFO(后进先出)或者依据优先级。 控制队列处理命令在[这里](https://docs.nestjs.com/techniques/queues#consumers)讨论。 ### 生产者 任务生产者添加任务到队列中。生产者是典型的应用服务(Nest [提供者](https://docs.nestjs.com/providers))。要添加工作到一个队列,首先注册队列到服务中: ```typescript import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; import { InjectQueue } from '@nestjs/bull'; @Injectable() export class AudioService { constructor(@InjectQueue('audio') private audioQueue: Queue) {} } ``` > `@InjectQueue()`装饰器由其名称指定队列,像它在`registerQueue()`方法中提供的那样(例如,`audio`)。 现在,通过调用队列的`add()`方法添加一个任务,传递一个用户定义的任务对象。任务表现为序列化的`JavaScript`对象(因为它们被存储在 Redis 数据库中)。你传递的任务形式是可选的;用它来在语义上表示你任务对象: ```typescript const job = await this.audioQueue.add({ foo: 'bar', }); ``` ### 命名的任务 任务需要独一无二的名字。这允许你创建专用的[消费者](https://docs.nestjs.com/techniques/queues#consumers),这将仅处理给定名称的处理任务。 ```typescript const job = await this.audioQueue.add('transcode', { foo: 'bar', }); ``` > 当使用命名任务时,你必须为每个添加到队列中的特有名称创建处理者,否则队列会反馈缺失了给定任务的处理器。查看[这里](https://docs.nestjs.com/techniques/queues#consumers)阅读更多关于消费命名任务的信息。 ### 任务选项 任务可以包括附加选项。在`Quene.add()`方法的`job`参数之后传递选项对象。任务选项属性有: - `priority: number`-选项优先级值。范围从 1(最高优先)到 MAX_INT(最低优先)。注意使用属性对性能有轻微影响,因此要小心使用。 - `delay: number`- 任务执行前等待的时间(毫秒)。注意,为了精确延时,服务端和客户端时钟应该同步。 - `attempts: number`-任务结束前总的尝试次数。 - `repeat: RepeatOpts`-按照定时设置重复任务记录,查看[RepeatOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。 - `backoff: number | BackoffOpts`- 如果任务失败,自动重试闪避设置,查看[BackoffOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。 - `lifo: boolean`-如果为`true`,从队列右端添加任务以替代从左边添加(默认为 false)。 - `timeout: number`-任务超时失败的毫秒数。 - `jobId: number | string`- 覆盖任务 ID-默认地,任务 ID 是唯一的整数,但你可以使用该参数覆盖它。如果你使用这个选项,你需要保证`jobId`是唯一的。如果你尝试添加一个包含已有 id 的任务,它不会被添加。 - `removeOnComplete: boolean | number`-如果为`true`,当任务完成时移除任务。一个数字用来指定要保存的任务数。默认行为是将完成的工作保存在已完成的设置中。 - `removeOnFail: boolean | number`-如果为`true`,当所有尝试失败时移除任务。一个数字用来指定要保存的任务数。默认行为是将失败的任务保存在已失败的设置中。 - `stackTraceLimit: number`-限制在`stacktrace`中保存的堆栈跟踪线。 这里是一些带有任务选项的自定义任务示例。 要延迟任务的开始,使用`delay`配置属性: ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { delay: 3000 } // 3 seconds delayed ); ``` 要从右端添加任务到队列(以 LIFO(后进先出)处理任务),设置配置对象的`lifo`属性为`true`。 ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { lifo: true } ); ``` 要优先一个任务,使用`priority`属性。 ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { priority: 2 } ); ``` ### 消费者 消费者是一个类,定义的方法要么处理添加到队列中的任务,要么监听队列的事件,或者两者皆有。使用`@Processor()`装饰器来定义消费者类,如下: ```typescript import { Processor } from '@nestjs/bull'; @Processor('audio') export class AudioConsumer {} ``` 装饰器的字符串参数(例如,`audio`)是和类方法关联的队列名称。 在消费者类中,使用`@Process()`装饰器来装饰任务处理者。 ```typescript import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @Process() async transcode(job: Job<unknown>) { let progress = 0; for (i = 0; i < 100; i++) { await doSomething(job.data); progress += 10; job.progress(progress); } return {}; } } ``` 装饰器方法(例如`transcode()`) 在工作空闲或者队列中有消息要处理的时候被调用。该处理器方法接受`job`对象作为其仅有的参数。处理器方法的返回值被保存在任务对象中,可以在之后被访问,例如,在用于完成事件的监听者中。 `Job`对象有多个方法,允许你和他们的状态交互。例如,上述代码使用`progress()`方法来更新工作进程。查看[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#job)以了解完整的`Job`对象 API 参照。 你可以指定一个任务处理方法,仅处理指定类型(包含特定`name`的任务)的任务,这可以通过如下所述的将`name`传递给`@Process()`装饰器完成。你在一个给定消费者类中可以有多个`@Process()`处理器,以反应每个任务类型(`name`),确保每个`name`有相应的处理者。 ```typescript @Process('transcode') async transcode(job: Job<unknown>) { ... } ``` ### 事件监听者 当队列和/或任务状态改变时,`Bull`生成一个有用的事件集合。Nest 提供了一个装饰器集合,允许订阅一系列标准核心事件集合。他们从`@nestjs/bull`包中导出。 事件监听者必须在一个[消费者](https://docs.nestjs.com/techniques/queues#consumers)类中声明(通过`@Processor()`装饰器)。要监听一个事件,使用如下表格之一的装饰器来声明一个事件处理器。例如,当一个任务进入`audio`队列活跃状态时,要监听其发射的事件,使用下列结构: ```typescript import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @OnQueueActive() onActive(job: Job) { console.log( `Processing job ${job.id} of type ${job.name} with data ${job.data}...`, ); } ``` 鉴于 BUll 运行于分布式(多 node)环境,它定义了本地事件概念。该概念可以辨识出一个由完整的单一进程触发的事件,或者由不同进程共享的队列。一个本地事件是指在本地进程中触发的一个队列行为或者状态变更。换句话说,当你的事件生产者和消费者是本地单进程时,队列中所有事件都是本地的。 当一个队列在多个进程中共享时,我们可能要遇到全局事件。对一个由其他进程触发的事件通知器进程的监听者来说,它必须注册为全局事件。 当相应事件发射时事件处理器被唤醒。该处理器被下表所示的签名调用,提供访问事件相关的信息。我们讨论下面签名中本地和全局事件处理器。 | 本地事件监听者 | 全局事件监听者 | 处理器方法签名/当触发时 | | ------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------| | @OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 当错误发生时,`error`包括触发错误 | | @OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number \| string) - 一旦工作者空闲就等待执行的任务,`jobId`包括进入此状态的 id | | @OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-`job`任务已启动 | | @OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-`job`任务被标记为延迟。这在时间循环崩溃或暂停时进行调试工作时是很有效的 | | @OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-`job`任务进程被更新为`progress`值 | | @OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) `job`任务进程成功以`result`结束 | | @OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)`job`任务以`err`原因失败 | | @OnQueuePaused() | @OnGlobalQueuePaused() | handler()队列被暂停 | | @OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)队列被恢复 | | @OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 旧任务从队列中被清理,`job`是一个清理任务数组,`type`是要清理的任务类型 | | @OnQueueDrained() | @OnGlobalQueueDrained() | handler()在队列处理完所有等待的任务(除非有些尚未处理的任务被延迟)时发射出 | | @OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)`job`任务被成功移除 | 当监听全局事件时,签名方法可能和本地有一点不同。特别地,本地版本的任何方法签名接受`job`对象的方法签名而不是全局版本的`jobId(number)`。要在这种情况下获取实际的`job`对象的引用,使用`Queue#getJob`方法。这种调用可能需要等待,因此处理者应该被声明为`async`,例如: ```typescript @OnGlobalQueueCompleted() async onGlobalCompleted(jobId: number, result: any) { const job = await this.immediateQueue.getJob(jobId); console.log('(Global) on completed: job ', job.id, ' -> result: ', result); } ``` > 要获取一个`Queue`对象(使用`getJob()`调用),你当然必须注入它。同时,队列必须注册到你要注入的模块中。 在特定事件监听器装饰器之外,你可以使用通用的`@OnQueueEvent()`装饰器与`BullQueueEvents`或者`BullQueueGlobalEvents`枚举相结合。在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#events)阅读更多有关事件的内容。 ### 队列管理 队列有一个 API 来实现管理功能比如暂停、恢复、检索不同状态的任务数量等。你可以在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)找到完整的队列 API。直接在`Queue`对象上调用这些方法,如下所示的暂停/恢复示例。 使用`pause()`方法调用来暂停队列。一个暂停的队列在恢复前将不会处理新的任务,但会继续处理完当前执行的任务。 ```typescript await audioQueue.pause(); ``` 要恢复一个暂停的队列,使用`resume()`方法,如下: ```typescript await audioQueue.resume(); ``` ## 分离进程[#](#separate-processes) 作业处理程序也可以在单独的(分叉的)进程([source](https://github.com/OptimalBits/bull#separate-processes))中运行。这有几个优点: * 该进程是沙盒化的,因此如果它崩溃,它不会影响工作人员。 * 您可以在不影响队列的情况下运行阻塞代码(作业不会停止)。 * 更好地利用多核 CPU。 * 与 redis 的连接更少。 >app.module.ts ~~~ts import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; import { join } from 'path'; @Module({ imports: [ BullModule.registerQueue({ name: 'audio', processors: [join(__dirname, 'processor.js')], }), ], }) export class AppModule {} ~~~ 请注意,由于您的函数是在分叉进程中执行的,因此依赖注入(和 IoC 容器)将不可用。这意味着您的处理器函数将需要包含(或创建)它所需的所有外部依赖项实例。 >processor.ts ~~~ts import { Job, DoneCallback } from 'bull'; export default function (job: Job, cb: DoneCallback) { console.log(`[${process.pid}] ${JSON.stringify(job.data)}`); cb(null, 'It works'); } ~~~ ### 异步配置 你可能需要异步而不是静态传递队列选项。在这种情况下,使用`registerQueueAsync()`方法,可以提供不同的异步配置方法。 一个方法是使用工厂函数: ```typescript BullModule.registerQueueAsync({ name: 'audio', useFactory: () => ({ redis: { host: 'localhost', port: 6379, }, }), }); ``` 我们的工厂函数方法和其他[异步提供者](https://docs.nestjs.com/fundamentals/async-providers)(它可以是`async`的并可以使用`inject`来注入)方法相同。 ```typescript BullModule.registerQueueAsync({ name: 'audio', imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('QUEUE_HOST'), port: +configService.get('QUEUE_PORT'), }, }), inject: [ConfigService], }); ``` 可选的,你可以使用`useClass`语法。 ```typescript BullModule.registerQueueAsync({ name: 'audio', useClass: BullConfigService, }); ``` 上述结构在`BullModule`中实例化`BullConfigService`,并通过调用`createBullOptions()`来用它提供一个选项对象。注意这意味着`BullConfigService`要实现`BullOptionsFactory`工厂接口,如下: ```typescript @Injectable() class BullConfigService implements BullOptionsFactory { createBullOptions(): BullModuleOptions { return { redis: { host: 'localhost', port: 6379, }, }; } } ``` 要阻止在`BullModule`中创建`BullConfigService`并使用一个从其他模块导入的提供者,可以使用`useExisting`语法。 ```typescript BullModule.registerQueueAsync({ name: 'audio', imports: [ConfigModule], useExisting: ConfigService, }); ``` 这个结构和`useClass`有一个根本区别——`BullModule`将查找导入的模块来重用现有的`ConfigServie`而不是实例化一个新的。 ### 示例 一个可用的示例见[这里](https://github.com/nestjs/nest/tree/master/sample/26-queues)。