# 自定义进程
自定义进程的使用非常灵活,只需要继承 GoProcess 类,并在配置文件中注册即可。
下面用一个 redis 队列的 demo 进行说明。
~~~
<?php
namespace app\Process;
use ESD\Core\Message\Message;
use ESD\Core\Server\Process\Process;
use ESD\Coroutine\Co;
use ESD\Go\GoProcess;
use ESD\Plugins\Redis\RedisConfig;
use ESD\Plugins\Redis\RedisOneConfig;
/**
* Created by PhpStorm.
* User: anythink
* Date: 2019/6/11
* Time: 2:08 PM
*/
class QueueTask extends GoProcess {
use GetLogger;
/**
* @var RedisConfig
*/
private $_configClass;
/**
* @var RedisOneConfig
*/
private $config;
/**
* @var \Redis
*/
protected $redis;
public function loadConfig($default = 'default')
{
$this->_configClass = DIGet(RedisConfig::class);
$this->config = $this->_configClass->getRedisConfigs()[$default];
}
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
while(true){
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if($this->config->getPassword() != ''){
$this->redis->auth($this->config->getPassword());
}
try{
while($val = $this->redis->brPop(['test'],0)){
goWithContext(function () use($val){
$this->process($val);
});
}
}catch (\RedisException $e){
$this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode());
}
//连接超时每隔一秒进行一次重试
Co::sleep(1);
}
}
public function process($val){
Co::sleep(2);
$this->info('process val' , $val);
}
public function onPipeMessage(Message $message, Process $fromProcess)
{
$res = $message->getData();
$this->debug('QueueTask onPipeMessage' . $res);
}
}
~~~
>[danger] 此类实例化了一个新的 redis 连接访问阻塞的 redis 队列,请勿直接使用 redis 连接池,否则会长时间占用连接,导致 worker 进程的 redis 可用连接变少。
在 process 方法中实现自身的业务逻辑。
由于process 方法使用了协程调度,所以在 process 方法内执行外部API请求注意以下2点:
1. api请求的 client 是否支持协程,比如使用 saber-plugin,或guzzehttp-saber。不支持协程的 client 将会转为同步模式,大幅度降低队列处理的QPS,此时需要多开几个自定义 process。
2. 使用协程需要注意 api 服务端的流控规则,请参考对应服务端的限流规则合理调整并发数。
# 注册自定义进程
在配置文件中添加如下配置:
~~~
esd:
process:
queue-0:
name: queue-0
class_name: app\Process\QueueTask
group_name: TaskGroup
queue-1:
name: queue-1
class_name: app\Process\QueueTask
group_name: TaskGroup
queue-2:
name: queue-2
class_name: app\Process\QueueTask
group_name: TaskGroup
~~~
- 前言
- 捐赠ESD项目
- 使用篇-通用
- 环境
- 安装
- 规范
- 压力测试
- 配置
- 如何设置YML配置
- server配置
- 端口配置
- 项目结构
- 事件派发
- 日志
- 注解
- DI容器
- 自定义进程
- 并发及协程池
- Console插件
- Scheduled插件
- Redis插件
- AOP插件
- Saber插件
- Mysql插件
- mysql事务
- Actuator插件
- Whoops插件
- Cache插件
- PHPUnit插件
- Security插件
- Session插件
- EasyRoute插件
- http路由
- ProcessRpc插件
- AutoReload插件
- AnnotationsScan插件
- Tracing-plugin插件
- MQTT插件
- Pack插件
- AMQP插件
- Validate插件
- Uid插件
- Topic插件
- Blade插件
- CsvReader插件
- hashed-wheel-timer-plugin插件
- 使用篇-HTTP
- 路由
- 静态文件
- 路由定义
- 修饰方法
- 路由分组
- 资源路由
- 端口作用域
- 异常处理
- 跨域请求
- 路由缓存
- 控制器
- 控制器初始化
- 前置操作
- 跳转和重定向
- 异常处理
- 请求
- 请求对象
- 请求信息
- request消息
- response消息
- stream消息
- url接口
- 验证器
- 内置验证器
- 内置过滤器
- 使用篇-WS
- 如何使用
- 路由
- 使用篇-TCP
- 插件篇-PluginSystem
- 微服务篇-ESDCloud
- CircuitBreaker插件
- SaberCloud插件
- 分布式链路追踪系统
- Consul插件