## 队列
队列是一种有用的设计模式,可以帮助你处理一般应用规模和性能的挑战。一些队列可以帮助你处理的问题示例包括:
- 平滑输出峰值。例如,如果用户可以在任何时间创建资源敏感型任务,你可以将其添加到一个消息队列中而不是同步执行。然后你可以通过工作者进程从队列中以一个可控的方式取出进程。在应用规模增大时,你可以轻松添加新的队列消费者来提高后端任务处理能力。
- 将可能阻塞`Node.js`事件循环的整体任务打碎。例如,如果一个用户请求是 CPU 敏感型工作,例如音频转码,你可以将其委托给其他进程,从而保证用户接口进程保持响应。
- 在不同的服务间提供一个可信的通讯通道。例如,你可以将任务(工作)加入一个进程或服务,并由另一个进程或服务来消费他们。你可以在由其他任何进程或服务执行的工作完成、错误或者其他状态变化时得到通知(通过监听状态事件)。当队列生产者或者消费者失败时,他们的状态会被保留,任务将在 node 重启后自动重启。
Nest 提供了`@nestjs/bull`包,这是[Bull](https://github.com/OptimalBits/bull)包的一个包装器,Bull 是一个流行的、支持良好的、高性能的基于 Nodejs 的消息队列系统应用。该包将 Bull 队列以 Nest 友好的方式添加到你的应用中。
Bull 使用[Redis](https://redis.io/)持久化工作数据,因此你需要在你的系统中安装 Redis。因为他是基于 Redis 的,你的队列结构可以是完全分布式的并且和平台无关。例如,你可以有一些队列[生产者](https://docs.nestjs.com/techniques/queues#producers)、[消费者](https://docs.nestjs.com/techniques/queues#consumers)和[监听者](https://docs.nestjs.com/techniques/queues#event-listeners),他们运行在 Nest 的一个或多个节点上,同时,其他生产者、消费者和监听者在其他 Node.js 平台或者其他网络节点上。
本章使用`@nestjs/bull`包,我们同时推荐阅读[BUll 文档](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md)来获取更多背景和应用细节。
### 安装
要开始使用,我们首先安装需要的依赖:
```typescript
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
```
一旦安装过程完成,我们可以在根`AppModule`中导入`BullModule`。
> app.module.ts
```typescript
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
```
`registerQueue()`方法用于实例化并/或注册队列。队列在不同的模块和进程之间共享,在底层则通过同样的凭据连接到同样的 Redis 数据库。每个队列由其`name`属性区分(如下),当共享队列(跨模块/进程)时,第一个`registerQueue()`方法同时实例化该队列并向模块注册它。其他模块(在相同或者不同进程下)则简单地注册队列。队列注册创建一个`injection token`,它可以被用在给定 Nest 模块中获取队列。
针对每个队列,传递一个包含下列属性的配置对象:
-`name:string`- 一个队列名称,它可以被用作`injection token`(用于将队列注册到控制器/提供者),也可以作为装饰器参数来将消费者类和监听者与队列联系起来。是必须的。 -`limiter:RateLimiter`-该选项用于确定消息队列处理速率,查看[RateLimiter](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。 -`redis:RedisOpts`-该选项用于配置 Redis 连接,查看[RedisOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。 -`prefix: string`-队列所有键的前缀。可选的。 -`defaultJobOptions: JobOpts`-选项用以控制新任务的默认属性。查看[JobOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)获取更多信息。可选的。 -`settings: AdvancedSettings`-高级队列配置设置。这些通常不需要改变。查看[AdvancedSettings](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)获取更多信息。可选的。
注意,`name`属性是必须的。其他选项是可选的,为队列行为提供更细节的控制。这些会直接传递给 Bull 的`Queue`构造器。在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)阅读更多选项。当在第二个或者子模块中注册一个队列时,最佳时间是省略配置对象中除`name`属性之外的所有选项。这些选项仅应该在实例化队列的模块中确定。
> 在`registerQueue()`方法中传递多个逗号分隔的选项对象来创建多个队列。
由于任务在 Redis 中是持久化的,每次当一个特定名称的队列被实例化时(例如,当一个 app 启动/重启时),它尝试处理任何可能在前一个旧的任务遗留未完成的`session`。
每个队里可能有一个或很多生产者、消费者以及监听者。消费者从一个特定命令队列中获取任务:FIFO(默认,先进先出),LIFO(后进先出)或者依据优先级。
控制队列处理命令在[这里](https://docs.nestjs.com/techniques/queues#consumers)讨论。
### 生产者
任务生产者添加任务到队列中。生产者是典型的应用服务(Nest [提供者](https://docs.nestjs.com/providers))。要添加工作到一个队列,首先注册队列到服务中:
```typescript
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
```
> `@InjectQueue()`装饰器由其名称指定队列,像它在`registerQueue()`方法中提供的那样(例如,`audio`)。
现在,通过调用队列的`add()`方法添加一个任务,传递一个用户定义的任务对象。任务表现为序列化的`JavaScript`对象(因为它们被存储在 Redis 数据库中)。你传递的任务形式是可选的;用它来在语义上表示你任务对象:
```typescript
const job = await this.audioQueue.add({
foo: 'bar',
});
```
### 命名的任务
任务需要独一无二的名字。这允许你创建专用的[消费者](https://docs.nestjs.com/techniques/queues#consumers),这将仅处理给定名称的处理任务。
```typescript
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
```
> 当使用命名任务时,你必须为每个添加到队列中的特有名称创建处理者,否则队列会反馈缺失了给定任务的处理器。查看[这里](https://docs.nestjs.com/techniques/queues#consumers)阅读更多关于消费命名任务的信息。
### 任务选项
任务可以包括附加选项。在`Quene.add()`方法的`job`参数之后传递选项对象。任务选项属性有:
- `priority: number`-选项优先级值。范围从 1(最高优先)到 MAX_INT(最低优先)。注意使用属性对性能有轻微影响,因此要小心使用。
- `delay: number`- 任务执行前等待的时间(毫秒)。注意,为了精确延时,服务端和客户端时钟应该同步。
- `attempts: number`-任务结束前总的尝试次数。
- `repeat: RepeatOpts`-按照定时设置重复任务记录,查看[RepeatOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。
- `backoff: number | BackoffOpts`- 如果任务失败,自动重试闪避设置,查看[BackoffOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。
- `lifo: boolean`-如果为`true`,从队列右端添加任务以替代从左边添加(默认为 false)。
- `timeout: number`-任务超时失败的毫秒数。
- `jobId: number | string`- 覆盖任务 ID-默认地,任务 ID 是唯一的整数,但你可以使用该参数覆盖它。如果你使用这个选项,你需要保证`jobId`是唯一的。如果你尝试添加一个包含已有 id 的任务,它不会被添加。
- `removeOnComplete: boolean | number`-如果为`true`,当任务完成时移除任务。一个数字用来指定要保存的任务数。默认行为是将完成的工作保存在已完成的设置中。
- `removeOnFail: boolean | number`-如果为`true`,当所有尝试失败时移除任务。一个数字用来指定要保存的任务数。默认行为是将失败的任务保存在已失败的设置中。
- `stackTraceLimit: number`-限制在`stacktrace`中保存的堆栈跟踪线。
这里是一些带有任务选项的自定义任务示例。
要延迟任务的开始,使用`delay`配置属性:
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 } // 3 seconds delayed
);
```
要从右端添加任务到队列(以 LIFO(后进先出)处理任务),设置配置对象的`lifo`属性为`true`。
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true }
);
```
要优先一个任务,使用`priority`属性。
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 }
);
```
### 消费者
消费者是一个类,定义的方法要么处理添加到队列中的任务,要么监听队列的事件,或者两者皆有。使用`@Processor()`装饰器来定义消费者类,如下:
```typescript
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
```
装饰器的字符串参数(例如,`audio`)是和类方法关联的队列名称。
在消费者类中,使用`@Process()`装饰器来装饰任务处理者。
```typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress);
}
return {};
}
}
```
装饰器方法(例如`transcode()`) 在工作空闲或者队列中有消息要处理的时候被调用。该处理器方法接受`job`对象作为其仅有的参数。处理器方法的返回值被保存在任务对象中,可以在之后被访问,例如,在用于完成事件的监听者中。
`Job`对象有多个方法,允许你和他们的状态交互。例如,上述代码使用`progress()`方法来更新工作进程。查看[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#job)以了解完整的`Job`对象 API 参照。
你可以指定一个任务处理方法,仅处理指定类型(包含特定`name`的任务)的任务,这可以通过如下所述的将`name`传递给`@Process()`装饰器完成。你在一个给定消费者类中可以有多个`@Process()`处理器,以反应每个任务类型(`name`),确保每个`name`有相应的处理者。
```typescript
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
```
### 事件监听者
当队列和/或任务状态改变时,`Bull`生成一个有用的事件集合。Nest 提供了一个装饰器集合,允许订阅一系列标准核心事件集合。他们从`@nestjs/bull`包中导出。
事件监听者必须在一个[消费者](https://docs.nestjs.com/techniques/queues#consumers)类中声明(通过`@Processor()`装饰器)。要监听一个事件,使用如下表格之一的装饰器来声明一个事件处理器。例如,当一个任务进入`audio`队列活跃状态时,要监听其发射的事件,使用下列结构:
```typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
```
鉴于 BUll 运行于分布式(多 node)环境,它定义了本地事件概念。该概念可以辨识出一个由完整的单一进程触发的事件,或者由不同进程共享的队列。一个本地事件是指在本地进程中触发的一个队列行为或者状态变更。换句话说,当你的事件生产者和消费者是本地单进程时,队列中所有事件都是本地的。
当一个队列在多个进程中共享时,我们可能要遇到全局事件。对一个由其他进程触发的事件通知器进程的监听者来说,它必须注册为全局事件。
当相应事件发射时事件处理器被唤醒。该处理器被下表所示的签名调用,提供访问事件相关的信息。我们讨论下面签名中本地和全局事件处理器。
| 本地事件监听者 | 全局事件监听者 | 处理器方法签名/当触发时 |
| ------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------|
| @OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 当错误发生时,`error`包括触发错误 |
| @OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number \| string) - 一旦工作者空闲就等待执行的任务,`jobId`包括进入此状态的 id |
| @OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-`job`任务已启动 |
| @OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-`job`任务被标记为延迟。这在时间循环崩溃或暂停时进行调试工作时是很有效的 |
| @OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-`job`任务进程被更新为`progress`值 |
| @OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) `job`任务进程成功以`result`结束 |
| @OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)`job`任务以`err`原因失败 |
| @OnQueuePaused() | @OnGlobalQueuePaused() | handler()队列被暂停 |
| @OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)队列被恢复 |
| @OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 旧任务从队列中被清理,`job`是一个清理任务数组,`type`是要清理的任务类型 |
| @OnQueueDrained() | @OnGlobalQueueDrained() | handler()在队列处理完所有等待的任务(除非有些尚未处理的任务被延迟)时发射出 |
| @OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)`job`任务被成功移除 |
当监听全局事件时,签名方法可能和本地有一点不同。特别地,本地版本的任何方法签名接受`job`对象的方法签名而不是全局版本的`jobId(number)`。要在这种情况下获取实际的`job`对象的引用,使用`Queue#getJob`方法。这种调用可能需要等待,因此处理者应该被声明为`async`,例如:
```typescript
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
```
> 要获取一个`Queue`对象(使用`getJob()`调用),你当然必须注入它。同时,队列必须注册到你要注入的模块中。
在特定事件监听器装饰器之外,你可以使用通用的`@OnQueueEvent()`装饰器与`BullQueueEvents`或者`BullQueueGlobalEvents`枚举相结合。在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#events)阅读更多有关事件的内容。
### 队列管理
队列有一个 API 来实现管理功能比如暂停、恢复、检索不同状态的任务数量等。你可以在[这里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)找到完整的队列 API。直接在`Queue`对象上调用这些方法,如下所示的暂停/恢复示例。
使用`pause()`方法调用来暂停队列。一个暂停的队列在恢复前将不会处理新的任务,但会继续处理完当前执行的任务。
```typescript
await audioQueue.pause();
```
要恢复一个暂停的队列,使用`resume()`方法,如下:
```typescript
await audioQueue.resume();
```
## 分离进程[#](#separate-processes)
作业处理程序也可以在单独的(分叉的)进程([source](https://github.com/OptimalBits/bull#separate-processes))中运行。这有几个优点:
* 该进程是沙盒化的,因此如果它崩溃,它不会影响工作人员。
* 您可以在不影响队列的情况下运行阻塞代码(作业不会停止)。
* 更好地利用多核 CPU。
* 与 redis 的连接更少。
>app.module.ts
~~~ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}
~~~
请注意,由于您的函数是在分叉进程中执行的,因此依赖注入(和 IoC 容器)将不可用。这意味着您的处理器函数将需要包含(或创建)它所需的所有外部依赖项实例。
>processor.ts
~~~ts
import { Job, DoneCallback } from 'bull';
export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
cb(null, 'It works');
}
~~~
### 异步配置
你可能需要异步而不是静态传递队列选项。在这种情况下,使用`registerQueueAsync()`方法,可以提供不同的异步配置方法。
一个方法是使用工厂函数:
```typescript
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
```
我们的工厂函数方法和其他[异步提供者](https://docs.nestjs.com/fundamentals/async-providers)(它可以是`async`的并可以使用`inject`来注入)方法相同。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
```
可选的,你可以使用`useClass`语法。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
useClass: BullConfigService,
});
```
上述结构在`BullModule`中实例化`BullConfigService`,并通过调用`createBullOptions()`来用它提供一个选项对象。注意这意味着`BullConfigService`要实现`BullOptionsFactory`工厂接口,如下:
```typescript
@Injectable()
class BullConfigService implements BullOptionsFactory {
createBullOptions(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
```
要阻止在`BullModule`中创建`BullConfigService`并使用一个从其他模块导入的提供者,可以使用`useExisting`语法。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useExisting: ConfigService,
});
```
这个结构和`useClass`有一个根本区别——`BullModule`将查找导入的模块来重用现有的`ConfigServie`而不是实例化一个新的。
### 示例
一个可用的示例见[这里](https://github.com/nestjs/nest/tree/master/sample/26-queues)。
- 介绍
- 概述
- 第一步
- 控制器
- 提供者
- 模块
- 中间件
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 自定义装饰器
- 基础知识
- 自定义提供者
- 异步提供者
- 动态模块
- 注入作用域
- 循环依赖
- 模块参考
- 懒加载模块
- 应用上下文
- 生命周期事件
- 跨平台
- 测试
- 技术
- 数据库
- Mongo
- 配置
- 验证
- 缓存
- 序列化
- 版本控制
- 定时任务
- 队列
- 日志
- Cookies
- 事件
- 压缩
- 文件上传
- 流式处理文件
- HTTP模块
- Session(会话)
- MVC
- 性能(Fastify)
- 服务器端事件发送
- 安全
- 认证(Authentication)
- 授权(Authorization)
- 加密和散列
- Helmet
- CORS(跨域请求)
- CSRF保护
- 限速
- GraphQL
- 快速开始
- 解析器(resolvers)
- 变更(Mutations)
- 订阅(Subscriptions)
- 标量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 联合类型
- 枚举(Enums)
- 字段中间件
- 映射类型
- 插件
- 复杂性
- 扩展
- CLI插件
- 生成SDL
- 其他功能
- 联合服务
- 迁移指南
- Websocket
- 网关
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 适配器
- 微服务
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定义传输器
- 异常过滤器
- 管道
- 守卫
- 拦截器
- 独立应用
- Cli
- 概述
- 工作空间
- 库
- 用法
- 脚本
- Openapi
- 介绍
- 类型和参数
- 操作
- 安全
- 映射类型
- 装饰器
- CLI插件
- 其他特性
- 迁移指南
- 秘籍
- CRUD 生成器
- 热重载
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模块
- Swagger
- 健康检查
- CQRS
- 文档
- Prisma
- 静态服务
- Nest Commander
- 问答
- Serverless
- HTTP 适配器
- 全局路由前缀
- 混合应用
- HTTPS 和多服务器
- 请求生命周期
- 常见错误
- 实例
- 迁移指南
- 发现
- 谁在使用Nest?