# 队列
* * * * *
**简介**
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。
实现高性能,高可用,可伸缩和最终一致性架构。
是大型分布式系统不可缺少的中间件。
场景:异步处理、应用解耦、流量削锋、日志处理
* * *
以上就是关于为什么要使用队列的大致说明
下面介绍在OneBase使用TP官方开发的队列think-queue。
首先需要仔细理解下面这张图
![](https://img.kancloud.cn/be/3c/be3c0c9e5690c586ebb6e9c3a74c9005_1200x533.png)
执行流程
1. 命令行`Command`开始监听队列`queue:work`
2. 执行进程`Worker`获取新消息`Queue::pop()`
3. 消息队列`Queue`返回一个可用的`Job`实例`$job`
3.1 生产者推送`Queue::push()`新消息到消息队列`Queue`
3.2 消息队列`Queue`返回是否推送成功给生产者
4. 执行进程`Worker`调用`$job`的`fire()`方法
5. 消息`Job`解析`$job`的`payload`,实例化一个消费者,并调用消费者实例的`fire($job, $data)`方法。
6. 消费者读取消息内容`$data`,处理业务逻辑,删除或重发该消息 `$job->delete()` 或 `$job->release()`。
7. 消息`Job`从Database或Redis中删除消息或重发消息
8. 消息`Job`返回消息处理结果给执行进程`Worker`
9. 执行进程`Worker`在终端输出响应或结束运行
**一、队列配置**
队列配置在app目录下config.php配置文件中,可以配置驱动,建议Redis。
此处为演示方便使用的数据库驱动。
```
// +----------------------------------------------------------------------
// | 队列配置
// +----------------------------------------------------------------------
'queue' => [
// sync驱动表示取消消息队列还原为同步执行
// 'connector' => 'Sync',
// Redis驱动
// 'connector' => 'redis',
// "expire"=>60,//任务过期时间默认为秒,禁用为null
// "default"=>"default",//默认队列名称
// "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
// "port"=>Env::get("redis.port", 6379),//Redis端口
// "password"=>Env::get("redis.password", "123456"),//Redis密码
// "select"=>5,//Redis数据库索引
// "timeout"=>0,//Redis连接超时时间
// "persistent"=>false,//是否长连接
// Database驱动
"type"=>"Database",//数据库驱动
"expire"=>60,//任务过期时间,单位为秒,禁用为null
"default"=>"default",//默认队列名称
"table"=>"jobs",//存储消息的表明,不带前缀
"dsn"=>[],
]
```
**二、消息创建与推送**
```
// 将任务加入队列
public function push()
{
$job_data = [];
$job_data["member_id"] = time();
$job_data["to_member_id"] = time();
$job_data["params"] = ['xx' => 'cc', 'vv' => 'bb'];
$is_pushed = Queue::push("app\queue\controller\Test", $job_data, 'test_job_queue');
if($is_pushed !== false ) {
echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
} else {
echo date("Y-m-d H:i:s")." a new job pushed fail";
}
}
```
$job_data 是队列中依赖的业务数据
app\queue\controller\Test 是处理队列数据的类路径
test_job_queue 是队列的名称
**三、启动消息队列**
![](https://img.kancloud.cn/c2/46/c2466278f15aa69d7d5e5af3fabeee5a_723x72.png)
此时执行多次消息推送代码。
![](https://img.kancloud.cn/2f/13/2f130a1e7b1f91e34ecc635373d1dbb9_642x134.png)
![](https://img.kancloud.cn/39/79/3979c08f19032be2ff6a6982108e02af_1287x400.png)
可以看到 消息已经入库,但是没有处理。
**四、消费与删除**
打开 queue 模块下的控制器与逻辑目录,参考Test与TestLogic 来编写相关数据处理业务逻辑代码。
![](https://img.kancloud.cn/b2/55/b255a4b88ef6cf2fe14bb312603a05cc_741x666.png)
**五、启动监听处理**
命令行执行:php think queue:listen --queue test_job_queue
![](https://img.kancloud.cn/03/0c/030c1df55cb583e410d383ad235d6e36_798x627.png)
就能看到 数据表中的任务迅速消失,此时再调用消息创建推送到表中,也会迅速处理。
到此队列基本上算是使用起来了,任务种类不同比如批量下载任务和批量消息发送任务 可以写多个不同的消费者类进行处理。^_^
- 序言
- 基础
- 安装环境
- 安装演示
- 规范
- 目录
- 介绍
- 后台介绍
- 后台首页
- 会员管理
- 系统管理
- 系统设置与配置管理
- 菜单管理
- 系统回收站
- 服务管理
- 插件管理
- 文章管理
- 接口管理
- 优化维护
- SEO管理
- 数据库
- 文件清理
- 行为日志
- 执行记录
- 统计分析
- 接口介绍
- 接口文档
- 错误码设计
- Token介绍
- 前台介绍
- 架构
- 架构总览
- 生命周期
- 入口文件
- 模块设计
- 依赖注入
- 控制器架构
- 逻辑架构
- 验证架构
- 服务架构
- 模型架构
- 行为架构
- 插件架构
- 配置
- 配置介绍
- 配置加载
- 配置扩展
- 请求
- 请求信息
- 日志
- 后台行为日志
- 系统执行日志
- 框架日志
- 数据
- 数据库设计
- 数据字典
- 数据库操作
- 事务控制
- 混合操作
- 实战
- 控制器
- 逻辑与验证
- 视图与模型
- 插件研发
- 服务研发
- 接口研发
- 杂项
- 数据导入导出
- 二维码条形码
- 邮件发送
- 云存储服务
- 支付服务
- 短信服务
- 微信分享
- 生成海报
- 聊天室
- PJAX
- Demo
- Widget
- 附录
- 常量参考
- 配置参考
- 函数参考
- 进阶
- Redis
- 自动缓存
- 全自动缓存
- 索引
- 数据签名
- 全自动事务
- 队列