多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] ### rabbitMQ 工作队列 轮询分发 ![](https://box.kancloud.cn/e70f0a7540d79e117e46f4f3d0a3fa57_411x229.png) >[danger] 简单队列是一对一的关系,一个生成者对应一个消费者,实际开发中,一般消费者是以业务相结合的,需要时间去处理业务,如果只有一个消费者,那么生产者就会积压很多消息,消费不出去 ***** 代码演示: ``` 'use strict'; const Controller = require('egg').Controller; /** * 队列一对多演示 * 生产者 ----> 队列 ----> 消费者 * ----> 消费者 ----> 消费者 */ // 频道名称 const queueName = 'hasMany' class UserController extends Controller { // 生成者 async send() { const { msg } = this.ctx.query; //1. 创建频道 const ch = await this.app.amqplib.createChannel(); // 2. 创建队列 开启持久化存储 await ch.assertQueue(queueName, { durable: true }); // 3. 发送消息 let ok = null; for(let i=0; i<50; i++) { // 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。 ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true }); } //4. 关闭连接 await ch.close(); this.ctx.body = ok; this.ctx.status = 200; } // 消费者 async work1() { // 1. 创建频道 const ch = await this.app.amqplib.createChannel(); //2. 选择队列 await ch.assertQueue(queueName, { durable: true }); // 3. 接收消息 noAck 关闭消息自动确认模式 ,需要手动 ack const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => { setTimeout(() => { resolve(msg) }, 500) }, { noAck: false }) ); if (resultMsg !== null) { const { content } = resultMsg; //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它 ch.ack(resultMsg); await ch.close(); this.ctx.body = { work1: content.toString() }; this.ctx.status = 200; } else { this.ctx.body = '消费者1号失败' this.ctx.status = 500 } } async work2() { // 1. 创建频道 const ch = await this.app.amqplib.createChannel(); //2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的 await ch.assertQueue(queueName, { durable: true }); // 3. 接收消息 noAck 开启自动确认模式 const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => { setTimeout(() => { resolve(msg) }, 1000) }, { noAck: false }) ); if (resultMsg !== null) { const { content } = resultMsg; ch.ack(resultMsg); await ch.close(); this.ctx.body = { work2: content.toString() }; this.ctx.status = 200; } else { this.ctx.body = '消费者2号失败' this.ctx.status = 500 } } async work3() { // 1. 创建频道 const ch = await this.app.amqplib.createChannel(); //2. 选择队列 await ch.assertQueue(queueName, { durable: true }); // 3. 接收消息 noAck 开启自动确认模式 const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => { setTimeout(() => { resolve(msg) }, 1500) }, { noAck: false }) ); if (resultMsg !== null) { const { content } = resultMsg; //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它 ch.ack(resultMsg); await ch.close(); this.ctx.body = { work3: content.toString() }; this.ctx.status = 200; } else { this.ctx.body = '消费者3号失败' this.ctx.status = 500 } } } module.exports = UserController; ```