企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## kafka [Kafka](https://kafka.apache.org/) 是一个由Apache软件基金会开源的一个高吞吐量的分布式流处理平台, 它具有三个关键特性: - 可以允许你发布和订阅消息流。 - 可以以容错的方式记录消息流。 - 可以在消息流记录产生时就进行处理。 Kafka 致力于提供一个处理实时数据的统一 、高吞吐量、 低延时的平台。 它在处理实时数据分析时可以与Apache Storm 和 Spark完美集成。 **Kafka 传输器是实验性的.** ### 安装 要开始构建基于Kafka的微服务首先需要安装所需的依赖: ```bash $ npm i --save kafkajs ``` ### 概述 类似其他微服务传输器层的实现,要使用kafka传输器机制,你需要像下面的示例一样给`createMicroservice()`方法传递指定传输器`transport`属性和可选的`options`属性。 > main.ts ```typescript const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], } } }); ``` > `Transport`枚举 需要从 `@nestjs/microservices` 包导入。 ### 选项 `options`对象和选择的传输器有关,`Kafka`传输器暴露了一些属性: -| - ---|--- client|客户端配置选项([参见这里](https://kafka.js.org/docs/configuration)) consumer|消费者配置选项([参见这里](https://kafka.js.org/docs/consuming#a-name-options-a-options)) run|运行配置选项([参见这里](https://kafka.js.org/docs/consuming)) subscribe|订阅配置选项([参见这里](https://kafka.js.org/docs/consuming#frombeginning)) producer|生产者配置选项([参见这里](https://kafka.js.org/docs/producing#options)) send|发送配置选项([参见这里](https://kafka.js.org/docs/producing#options)) ### 客户端 `Kafka`和其他微服务传送器有一点不同的是,我们需要用`ClientKafka`类替换`ClientProxy` 类。 像其他微服务一样,创建`ClientKafka`实例也有几个可[选项](https://docs.nestjs.com/microservices/basics#client)。 一种方式创建客户端实例我们需要用到`ClientsModule`方法。 为了通过`ClientsModule`创建客户端实例,导入`register()` 方法并且传递一个和上面`createMicroservice()`方法一样的对象以及一个`name`属性,它将被注入为token。了解更多关于[ClientsModule](https://docs.nestjs.com/microservices/basics#client)。 ```typescript @Module({ imports: [ ClientsModule.register([ { name: 'HERO_SERVICE', transport: Transport.KAFKA, options: { client: { clientId: 'hero', brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' } } }, ]), ] ... }) ``` 另一种方式建立客户端 ( `ClientProxyFactory`或者`@Client()`) 也可以正常使用。 为了创建客户端实例,我们需要使用 `@Client()` 装饰器。 ```typescript @Client({ transport: Transport.KAFKA, options: { client: { clientId: 'hero', brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' } } }) client: ClientKafka; ``` ### 消息模式 `Kafka`消息模式利用两个主题来请求和答复通道。`ClientKafka#send()`方法通过关联[相关ID](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html)发送带有[返回地址](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html)的消息,答复主题,带有请求信息的答复分区。 这要求在发送消息之前,`ClientKafka`实例需要订阅答复主题并至少分配一个分区。 随后,您需要为每个运行的Nest应用程序至少有一个答复主题分区。例如,如果您正在运行4个Nest应用程序,但是答复主题只有3个分区,则尝试发送消息时,其中1个Nest应用程序将会报错。 当启动新的`ClientKafka`实例时,它们将加入消费者组并订阅各自的主题。此过程触发一个主题分区的再平衡并分配给消费者组中的消费者。 通常,主题分区是使用循环分区程序分配的,该程序将主题分区分配给按消费者名称排序的消费者集合,消费者名称是在应用程序启动时随机设置的。但是,当新消费者加入该消费者组时,该新消费者可以位于消费者集合中的任何位置。这就创造了这样一种条件,即当现有消费者位于新消费者之后时,可以为现有消费者分配不同的分区。结果,分配了不同分区的消费者将丢失重新平衡之前发送的请求的响应消息。 为了防止`ClientKafka`使用者丢失响应消息,使用了Nest特定的内置自定义分区程序。这个自定义分区程序将分区分配给一组消费者,这些消费者按照在应用程序启动时设置的高精度的时间戳(`process.hrtime()`)进行排序。 ### 消息订阅响应 `ClientKafka`类提供了一个`subscribeToResponseOf()`方法,该方法会将获取请求的主题名称作为参数并将派生的答复主题加入到答复主题的集合中。这个函数在执行消息模式时是必须的。 >heroes.controller.ts ```typescript onModuleInit() { this.client.subscribeToResponseOf('hero.kill.dragon'); } ``` 如果`ClientKafka` 实例是异步创建的, `subscribeToResponseOf()`函数必须在`connect()`函数之前被调用。 >heros.controller.ts ```typescript async onModuleInit() { this.client.subscribeToResponseOf('hero.kill.dragon'); await this.client.connect(); } ``` ### 传入(Incoming) Nest将会接收传入的`Kafka`消息作为具有键,值和头属性(其值为Buffer类型)的对象。然后,Nest通过`Buffer`转换为字符串来解析这些值。如果字符串是可被序列化的,Nest会把字符串解析为`JSON`并将该值传递到其关联的处理程序。 ### 传出(Outgoing) 在发布事件或发送消息时,Nest将在序列化过程之后发送传出的`Kafka`消息。这发生在传递给`ClientKafka`的`emit()`和`send()`方法的参数上,或从`@MessagePattern`方法的返回值上。该序列化通过使用`JSON.stringify()`或`toString()`原型方法来“字符串化”不是字符串或缓冲区的对象。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return items; } } ``` > `@Payload()` 需要从 `@nestjs/microservices` 中导入. 传出的消息也可以通过传递带有`key`和`value`属性的对象来键入。密钥消息对于满足[共同分区要求](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements)很重要。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const realm = 'Nest'; const heroId = message.heroId; const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return { headers: { realm }, key: heroId, value: items } } } ``` 此外,以这种格式传递的消息还可以包含在自定义头中设置`headers`哈希属性值。 `headers`哈希属性值必须为`string`类型或`buffer`类型。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const realm = 'Nest'; const heroId = message.heroId; const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return { headers: { kafka_nestRealm: realm }, key: heroId, value: items } } } ``` ### 基于事件 虽然 `request-response` 方法非常适合在服务之间交换消息,但当您的消息样式是基于事件的(这又是 Kafka 的理想选择)时,它不太适合 - 当您只想发布事件而不等待响应时。 在这种情况下,您不希望请求-响应所需的开销来维护两个主题。 查看这两个部分以了解更多信息:[概述:基于事件](https://docs.nestjs.com/microservices/basics#event-based)和[概述:发布事件](https://docs.nestjs.com/microservices/basics#publishing-events)。 ### 上下文 在更复杂的方案中,您可能需要访问有关传入请求的更多信息。 使用Kafka传输器时,您可以访问`KafkaContext`对象。 ```typescript @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) { console.log(`Topic: ${context.getTopic()}`); } ``` >`@Payload()`, `@Ctx()` 和 `KafkaContext` 需要从 `@nestjs/microservices` 包导入. 为了访问`Kafka`原生的 `IncomingMessage`对象,需要像下面的示例一样使用`KafkaContext`的`getMessage()`方法。 ```typescript @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) { const originalMessage = context.getMessage(); const { headers, partition, timestamp } = originalMessage; } ``` `IncomingMessage`实现了以下的接口: ```typescript interface IncomingMessage { topic: string; partition: number; timestamp: string; size: number; attributes: number; offset: string; key: any; value: any; headers: Record<string, any>; } ``` ### 命名约定 `Kafka`微服务组件将其各自角色的描述附加到`client.clientId`和`consumer.groupId`选项上,以防止Nest微服务客户端和服务器组件之间发生冲突。默认情况下,`ClientKafka`组件和`ServerKafka`组件将各自分别附加`-client`和`-server`到各自的选项中。请注意下面提供的值如何以这种方式转换(如注释中所示)。 >main.ts ```typescript const app = await NestFactory.createMicroservice(AppModule, { transport: Transport.KAFKA, options: { client: { clientId: 'hero', // hero-server brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' // hero-consumer-server }, } }); ``` 对于客户端: >heroes.controller.ts ```typescript @Client({ transport: Transport.KAFKA, options: { client: { clientId: 'hero', // hero-client brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' // hero-consumer-client } } }) client: ClientKafka; ``` > 可以通过在您自己的自定义的提供者中扩展`ClientKafka`和`KafkaServer`并通过覆盖构造函数来自定义`Kafka`客户端口和使用者命名约定。 由于`Kafka`微服务的消息模式将两个主题用于请求和回复通道,因此应从请求主题中获得一个回复模式。默认情况下,回复主题的名称是请求主题名称和`.reply`的组合。 >heroes.controller.ts ```typescript onModuleInit() { this.client.subscribeToResponseOf('hero.get'); // hero.get.reply } ``` > 可以通过在您自己的自定义的提供者中扩展`ClientKafka`并通过覆盖`getResponsePatternName`方法来自定义`Kafka`答复主题的命名约定。