企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## CQRS 可以用下列步骤来描述一个简单的 **[CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete)** 应用程序流程: 1. **控制器层**处理 HTTP 请求并将任务委派给服务层。 2. **服务层**是处理大部分业务逻辑。 3. 服务使用**存储库或 DAOs** 来更改/保存实体。 4. 实体充当值的容器,具有 `setter` 和 `getter` 。 在大部分情况下,这种模式对中小型应用来说是足够的。然而,当我们的需求变得更加复杂时,**CQRS** 模型可能更合适并且易于扩展。 为了简化这个模型,`Nest` 提供了一个轻量级的 [CQRS](https://github.com/nestjs/cqrs) 模块,本章描述如何使用它。 ### 安装 首先安装需要的包 ```bash $ npm install --save @nestjs/cqrs ``` ### 指令 在本模块中,每个行为都被称为一个 **Command** 。当任何命令被分派时,应用程序必须对其作出反应。命令可以从服务中分派(或直接来自控制器/网关)并在相应的 **Command 处理程序** 中使用。 > heroes-game.service.ts ```typescript @Injectable() export class HeroesGameService { constructor(private commandBus: CommandBus) {} async killDragon(heroId: string, killDragonDto: KillDragonDto) { return this.commandBus.execute( new KillDragonCommand(heroId, killDragonDto.dragonId) ); } } ``` 这是一个示例服务, 它调度 `KillDragonCommand` 。让我们来看看这个命令: > kill-dragon.command.ts ```typescript export class KillDragonCommand { constructor( public readonly heroId: string, public readonly dragonId: string, ) {} } ``` 这个 `CommandBus` 是一个命令 **流** 。它将命令委托给等效的处理程序。每个命令必须有相应的命令处理程序: > kill-dragon.handler.ts ```typescript @CommandHandler(KillDragonCommand) export class KillDragonHandler implements ICommandHandler<KillDragonCommand> { constructor(private repository: HeroRepository) {} async execute(command: KillDragonCommand) { const { heroId, dragonId } = command; const hero = this.repository.findOneById(+heroId); hero.killEnemy(dragonId); await this.repository.persist(hero); } } ``` 现在,每个应用程序状态更改都是**Command**发生的结果。 逻辑封装在处理程序中。 如果需要,我们可以简单地在此处添加日志,甚至更多,我们可以将命令保留在数据库中(例如用于诊断目的)。 ### 事件(Events) 由于我们在处理程序中封装了命令,所以我们阻止了它们之间的交互-应用程序结构仍然不灵活,不具有**响应性**。解决方案是使用**事件**。 > hero-killed-dragon.event.ts ```typescript export class HeroKilledDragonEvent { constructor( public readonly heroId: string, public readonly dragonId: string, ) {} } ``` 事件是异步的。它们可以通过**模型**或直接使用 `EventBus` 发送。为了发送事件,模型必须扩展 `AggregateRoot` 类。。 > hero.model.ts ```typescript export class Hero extends AggregateRoot { constructor(private readonly id: string) { super(); } killEnemy(enemyId: string) { // logic this.apply(new HeroKilledDragonEvent(this.id, enemyId)); } } ``` `apply()` 方法尚未发送事件,因为模型和 `EventPublisher` 类之间没有关系。如何关联模型和发布者? 我们需要在我们的命令处理程序中使用一个发布者 `mergeObjectContext()` 方法。 > kill-dragon.handler.ts ```typescript @CommandHandler(KillDragonCommand) export class KillDragonHandler implements ICommandHandler<KillDragonCommand> { constructor( private repository: HeroRepository, private publisher: EventPublisher, ) {} async execute(command: KillDragonCommand) { const { heroId, dragonId } = command; const hero = this.publisher.mergeObjectContext( await this.repository.findOneById(+heroId), ); hero.killEnemy(dragonId); hero.commit(); } } ``` 现在,一切都按我们预期的方式工作。注意,我们需要 `commit()` 事件,因为他们不会立即被发布。显然,对象不必预先存在。我们也可以轻松地合并类型上下文: ```typescript const HeroModel = this.publisher.mergeContext(Hero); new HeroModel('id'); ``` 就是这样。模型现在能够发布事件。我们得处理他们。此外,我们可以使用 `EventBus` 手动发出事件。 ```typescript this.eventBus.publish(new HeroKilledDragonEvent()); ``` > `EventBus` 是一个可注入的类。 每个事件都可以有许多事件处理程序。 > hero-killed-dragon.handler.ts ```typescript @EventsHandler(HeroKilledDragonEvent) export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> { constructor(private readonly repository: HeroRepository) {} handle(event: HeroKilledDragonEvent) { // logic } } ``` 现在,我们可以将写入逻辑移动到事件处理程序中。 ### Sagas 这种类型的 **事件驱动架构** 可以提高应用程序的 **反应性** 和 **可伸缩性** 。现在, 当我们有了事件, 我们可以简单地以各种方式对他们作出反应。**Sagas**是建筑学观点的最后一个组成部分。 `sagas` 是一个非常强大的功能。单 `saga` 可以监听 1..* 事件。它可以组合,合并,过滤事件流。[RxJS](https://github.com/ReactiveX/rxjs) 库是`sagas`的来源地。简单地说, 每个 `sagas` 都必须返回一个包含命令的Observable。此命令是 **异步** 调用的。 > heroes-game.saga.ts ```typescript @Injectable() export class HeroesGameSagas { @Saga() dragonKilled = (events$: Observable<any>): Observable<ICommand> => { return events$.pipe( ofType(HeroKilledDragonEvent), map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)), ); } } ``` ?> `ofType` 运算符从 `@nestjs/cqrs` 包导出。 我们宣布一个规则 - 当任何英雄杀死龙时,古代物品就会掉落。 之后,`DropAncientItemCommand` 将由适当的处理程序调度和处理。 ### 查询 `CqrsModule` 对于查询处理可能也很方便。 `QueryBus` 与 `CommandsBus` 的工作方式相同。 此外,查询处理程序应实现 `IQueryHandler` 接口并使用 `@QueryHandler()` 装饰器进行标记。 ### 建立 我们要处理的最后一件事是建立整个机制。 > heroes-game.module.ts ```typescript export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler]; export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler]; @Module({ imports: [CqrsModule], controllers: [HeroesGameController], providers: [ HeroesGameService, HeroesGameSagas, ...CommandHandlers, ...EventHandlers, HeroRepository, ] }) export class HeroesGameModule {} ``` ### 概要 `CommandBus` ,`QueryBus` 和 `EventBus` 都是**Observables**。这意味着您可以轻松地订阅整个流, 并通过 **Event Sourcing** 丰富您的应用程序。 完整的源代码在[这里](https://github.com/kamilmysliwiec/nest-cqrs-example) 。