[TOC]
### rabbitMQ 工作队列 轮询分发
![](https://box.kancloud.cn/e70f0a7540d79e117e46f4f3d0a3fa57_411x229.png)
>[danger] 简单队列是一对一的关系,一个生成者对应一个消费者,实际开发中,一般消费者是以业务相结合的,需要时间去处理业务,如果只有一个消费者,那么生产者就会积压很多消息,消费不出去
*****
代码演示:
```
'use strict';
const Controller = require('egg').Controller;
/**
* 队列一对多演示
* 生产者 ----> 队列 ----> 消费者
* ----> 消费者
----> 消费者
*/
// 频道名称
const queueName = 'hasMany'
class UserController extends Controller {
// 生成者
async send() {
const { msg } = this.ctx.query;
//1. 创建频道
const ch = await this.app.amqplib.createChannel();
// 2. 创建队列 开启持久化存储
await ch.assertQueue(queueName, { durable: true });
// 3. 发送消息
let ok = null;
for(let i=0; i<50; i++) {
// 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。
ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
}
//4. 关闭连接
await ch.close();
this.ctx.body = ok;
this.ctx.status = 200;
}
// 消费者
async work1() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 关闭消息自动确认模式
,需要手动 ack
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work1: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者1号失败'
this.ctx.status = 500
}
}
async work2() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 开启自动确认模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1000)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work2: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者2号失败'
this.ctx.status = 500
}
}
async work3() {
// 1. 创建频道
const ch = await this.app.amqplib.createChannel();
//2. 选择队列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 开启自动确认模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work3: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消费者3号失败'
this.ctx.status = 500
}
}
}
module.exports = UserController;
```
- 概述
- 起步
- 跨域配置
- 路径别名
- 路由
- api版本控制
- 错误和异常
- 全局异常处理
- 数据库
- 创建迁移文件
- sequelize数据类型
- 配置
- 新增
- 查询
- 条件查询
- 模糊查询
- 排序查询
- 聚合查询
- 分组查询
- 分页查询
- 修改
- 删除
- 获取器
- 修改器
- 静态属性
- 字段验证
- 外键约束
- 关联模型
- 一对一
- 一对多
- 左外连接
- 多对多
- 字段显示隐藏
- 事务
- 字段自增
- 验证层
- egg-validate
- indicative验证器
- egg-validate-plus
- betterValidate
- 校验规则
- 中间件
- 安全
- 数据加密
- 单向加密
- 示例代码
- 封装egg加密
- 上传
- path模块
- 单文件上传
- 多文件上传
- 按照日期存储
- 工具函数
- egg常用工具函数
- 缓存
- 配置缓存插件
- 设置缓存
- 获取缓存
- 删除缓存
- 消息队列
- rabbitMQ
- 安装
- 简单队列
- 工作队列
- 工作队列(dispach分发)
- 消息应答和持久化
- redis
- 数据类型
- 字符串类型(String)
- 哈希类型(Hash)
- 列表(List)
- 无序集合(Set)
- 可排序集合(Zset)
- 邮件系统
- nodeMailer
- 第三方模块
- 生成随机数
- JWT
- JWT鉴权
- 生成Token
- 短信服务
- 阿里大鱼短信验证码
- 发送短信逻辑
- 阿里短信Node类