多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
## RabbitMQ [RabbitMQ](https://www.rabbitmq.com/) 是一个开源的轻量级消息代理,支持多种消息协议。它可以通过分布式部署、联合配置来满足高弹性、高可用性的需求。此外,它是部署最广泛的开源消息代理,在全球范围内从初创企业到大企业都在使用。 ### 安装 在开始之前,我们必须安装所需的包: ```bash $ npm i --save amqplib amqp-connection-manager ``` ### 概述 为了使用 **RabbitMQ** 传输器,传递以下选项对象到 `createMicroservice()` 方法。 > main.ts ```typescript const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false }, }, }); ``` > `Transport` 需要从 `@nestjs/microservices` 包导入。 ### 选项 `options`对象和选择的传输器有关,`RabbitMQ`传输器暴露了一些属性: -| - ---|--- urls|连接urls queue|服务器要监听的队列名称 prefetchCount|频道预读取的数量 isGlobalPrefetchCount|使能预读取的频道 noAck|设置为`false`以启用手动确认模式 queueOptions|额外的队列选项([更多](https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue)) socketOptions|额外的socket选项([更多](https://www.squaremobius.net/amqp.node/channel_api.html#socket-options)) ### 客户端 像其他微服务传输器一样,你可以在创建`ClientProxy`实例时传输[一些选项](https://docs.nestjs.com/microservices/basics#client)。 一种来创建实例的方法是使用`ClientsModule`。要使用`ClientsModule`创建一个客户端实例,引入并使用`register()`方法并传递一个 `options` 对象,该对象具有与前面在 `createMicroservice()` 方法具有相同的属性。`name`属性被用于注入`token`,更多关于`ClientsModule`内容参见[这里](https://docs.nestjs.com/microservices/basics#client)。 ```typescript @Module({ imports: [ ClientsModule.register([ { name: 'MATH_SERVICE', transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false }, }, }, ]), ] ... }) ``` 也可以使用其他创建客户端的实例( `ClientProxyFactory` 或 `@Client()` )。 ### 上下文 在更复杂的场景中,您可能希望访问关于传入请求的更多信息。在`RabbitMQ` 中,您可以访问 `RmqContext`对象。 ```typescript @MessagePattern('notifications') getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) { console.log(`Pattern: ${context.getPattern()}`); } ``` ?> `@Payload()`, `@Ctx()` 和 `RedisContext` 需要从 `@nestjs/microservices` 包导入. 要实用原生的`RabbitMQ`消息(包含`properties`, `fields`, 和`content`), 使用 `RmqContext`对象的`getMessage()`方法: ```typescript @MessagePattern('notifications') getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) { console.log(context.getMessage()); } ``` 要获取`RabbitMQ`频道的引用,使用`RmqContext`对象的`getChannelRef`方法。 ```typescript @MessagePattern('notifications') getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) { console.log(context.getChannelRef()); } ``` ### 消息确认 要确保消息没有丢失,RabbitMQ支持[消息确认](https://www.rabbitmq.com/confirms.html)。消息确认是指消费者发回给RabbitMQ确认消息已收到,RabbitMQ可以删除它了。如果消费者不工作(频道关闭,连接关闭或者TCP连接丢失)也没有发送确认,RabbitMQ会认为消息没有被处理,因此会重新将其加入队列。 要使能手动消息确认模式,将`noAck`设置为`false`: ```typescript options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', noAck: false, queueOptions: { durable: false }, }, ``` 当手动消费者确认开启时,我们必须从工作者到到信号发送一个合适的确认信息,以表示我们已经完成了一件工作。 ```typescript @MessagePattern('notifications') getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) { const channel = context.getChannelRef(); const originalMsg = context.getMessage(); channel.ack(originalMsg); } ``` ### 记录建设者 要配置消息选项,您可以使用`RmqRecordBuilder`该类(注意:这对于基于事件的流也是可行的)。例如,要设置`headers`和`priority`属性,使用`setOptions`方法,如下: ~~~typescript const message = ':cat:'; const record = new RmqRecordBuilder(message) .setOptions({ headers: { ['x-version']: '1.0.0', }, priority: 3, }) .build(); this.client.send('replace-emoji', record).subscribe(...); ~~~ > **提示**`RmqRecordBuilder``@nestjs/microservices`类是从包中导出的。 您也可以通过访问 来在服务器端读取这些值`RmqContext`,如下所示: ~~~typescript @MessagePattern('replace-emoji') replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string { const { properties: { headers } } = context.getMessage(); return headers['x-version'] === '1.0.0' ? '🐱' : '🐈'; } ~~~