# AMQP异步任务系统
需要了解的知识。
* [Process](Process.md)
* [RabbitMQ](http://www.rabbitmq.com/documentation.html)
本系统由SD框架和RabbitMQ搭建。
## 创建异步作业进程
通过继承AMQPTaskProcess,我们来创建一个异步任务作业的进程类。
```
class MyAMQPTaskProcess extends AMQPTaskProcess
{
public function start($process)
{
parent::start($process);
$this->createDirectConsume('msgs');
}
/**
* 路由消息返回class名称
* @param $body
* @return string
*/
protected function route($body)
{
return TestAMQPTask::class;
}
}
```
通过createDirectConsume函数可以快速创建一个消费队列。
* createDirectConsume
```
function createDirectConsume($queue, $prefetch_count = 2, $global = false, $exchange = null, $consumerTag = null)
```
一般情况我们只需要设置queue和prefetch_count这俩个参数。
queue为消费队列的名称,prefetch_count=2代表这个队列只能被这个进程同时消费2次,直到消费成功或者失败,简单的来说并发为2。
global参数代表这个并发是针对队列还是进程的。false是针对队列,true代表是进程。
我们可以多次调用createDirectConsume来消费不同的队列。
* route
route路由的作用,$body是消费得到的值,这个函数需要返回一个class名。
## 创建作业任务
创建类继承AMQPTask。
```
class TestAMQPTask extends AMQPTask
{
/**
* @var TestModel
*/
public $TestModel;
public function initialization(AMQPMessage $message)
{
parent::initialization($message);
$this->TestModel = $this->loader->model(TestModel::class, $this);
}
/**
* handle
* @param $body
*/
public function handle($body)
{
var_dump($body);
$this->ack();
}
}
```
* initialization
和Model,Controller一样用于初始化,或者进行loader
* handle
处理任务,处理任务一定需要调用ack或者是reject。
* ack
任务处理完毕
* reject
```
function reject($requeue = true)
```
任务被拒绝,requeue=true代表这个任务回到队列,false代表任务被抛弃。
## 创建用户进程
在AppServer中创建进程
```
/**
* 用户进程
*/
public function startProcess()
{
parent::startProcess();
for ($i=0;$i<5;$i++)
{
ProcessManager::getInstance()->addProcess(MyAMQPTaskProcess::class,true,$i);
}
}
```
这样我们创建了5个异步任务进程。
# 注意
1. 消费队列必须存在,不然会报错
2. 一定在handle处理结束后调用ack或者reject
3. AMQPTask的initialization和handle均支持协程
- Introduction
- SD 3.X文档连接
- 导言
- 用户案例
- 基于Swoole扩展分布式全栈开发框架
- 选择SD框架助力企业开发
- 捐赠SwooleDistributed项目
- 框架性能报告
- 更新日志
- VIP服务福利
- 安装与配置
- 【推荐】全自动安装部署
- 环境要求
- 使用Composer安装/更新SD框架
- 通过Docker安装
- 代码结构
- 启动命令
- 服务器配置
- 服务器基础配置server.php
- 客户端协议配置client.php
- business.php
- log.php
- 微服务及集群配置consul.php
- fileHeader.php
- mysql.php
- redis.php
- 定时任务配置timerTask.php
- 服务器端口配置ports.php
- catCache.php
- 验证服务启动成功
- 微服务-Consul
- 日志工具-GrayLog
- 集群-Cluster
- 内核优化
- 入门教学
- 开发流程
- 开发前必读
- 开发规范
- 基本流程
- 框架入口
- Model数据模型
- Controller控制器
- 协程
- 协程基础
- 迭代器
- 调度器
- 使用协程的优势
- 通过协程的方法屏蔽异步同步的区别
- Select多路选择器
- 协程Sleep
- 通用协程方法
- 设置超时
- 设置无异常
- 设置降级函数
- initAsynPools
- dump
- 封装器与路由器
- 封装器
- sendToUid
- 路由器
- sendToUids
- 对象池
- 扩展组件
- 中间件
- Redis使用介绍
- RedisAsynPool
- Redis具体使用
- sendToAll
- RedisRoute
- Redis+Lua
- Mysql使用介绍
- MysqlAsynPool
- Mysql返回值
- 如何获取构建的mysql语句
- 如何执行一个SQL
- 如何执行事务
- stopTask
- Mysql具体使用
- 异步客户端
- Loader
- MqttClient
- model
- SdTcpRpcPool
- task
- HttpClientPool
- view
- TcpClientPool
- AMQP
- initialization
- Memory
- destory
- Cache
- Lock
- Pool
- EventDispatcher
- Process
- Cluster
- TimerTask
- Reload
- Consul
- Context
- 自定义进程
- 进程间RPC
- $http_input
- CatCache
- $http_output
- TimerCallBack
- 专题
- HTTP专栏
- TCP专栏
- 基础知识
- WebSocket专栏
- 微服务
- Consul配置
- RPC
- REST
- AMQP异步任务系统
- MQTT简易服务器
- Docker化以及资源编排
- 快速搭建公司内部统一的开发环境
- 使用HTTPS/WSS
- 订阅/发布
- 游戏专题
- 类介绍
- AppServer
- clearState
- onOpenServiceInitialization
- SwooleDistributedServer
- get_instance
- kickUid
- bindUid
- unBindUid
- coroutineUidIsOnline
- coroutineCountOnline
- setTemplateEngine
- isWebSocket
- isTaskWorker
- getSocketName
- initAsynPools
- addAsynPool
- getAsynPool
- getServerAllTaskMessage
- Controller
- onExceptionHandle
- send
- sendToUid
- sendToUids
- sendToAll
- sendToGroup
- close
- getContext
- defaultMethod
- $redis_pool
- $mysql_pool
- $request_type
- $fd
- $uid
- $client_data
- $request
- $response
- $loader
- $logger
- $server
- $config
- Model
- initialization
- destory
- View
- Task
- stopTask
- HttpInput
- postGet
- post
- get
- getPost
- getAllPostGet
- getAllHeader
- getRawContent
- cookie
- getRequestHeader
- server信息
- getRequestMethod
- getRequestUri
- getPathInfo
- HttpOutput
- setStatusHeader
- setContentType
- setHeader
- end
- setCookie
- endFile
- 单元测试