# 并发及协程池
说这个章节之前,先看一下自定义进程章节的示例。
## 自定义进程
自定义进程的使用非常灵活,只需要继承 GoProcess 类,并在配置文件中注册即可。
下面用一个 redis 队列的 demo 进行说明。
~~~
<?php
namespace app\Process;
use ESD\Core\Message\Message;
use ESD\Core\Server\Process\Process;
use ESD\Coroutine\Co;
use ESD\Go\GoProcess;
use ESD\Plugins\Redis\RedisConfig;
use ESD\Plugins\Redis\RedisOneConfig;
/**
* Created by PhpStorm.
* User: anythink
* Date: 2019/6/11
* Time: 2:08 PM
*/
class QueueTask extends GoProcess {
use GetLogger;
/**
* @var RedisConfig
*/
private $_configClass;
/**
* @var RedisOneConfig
*/
private $config;
/**
* @var \Redis
*/
protected $redis;
public function loadConfig($default = 'default')
{
$this->_configClass = DIGet(RedisConfig::class);
$this->config = $this->_configClass->getRedisConfigs()[$default];
}
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
while(true){
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if($this->config->getPassword() != ''){
$this->redis->auth($this->config->getPassword());
}
try{
while($val = $this->redis->brPop(['test'],0)){
goWithContext(function () use($val){
$this->process($val);
});
}
}catch (\RedisException $e){
$this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode());
}
//连接超时每隔一秒进行一次重试
Co::sleep(1);
}
}
public function process($val){
Co::sleep(2);
$this->info('process val' , $val);
}
public function onPipeMessage(Message $message, Process $fromProcess)
{
$res = $message->getData();
$this->debug('QueueTask onPipeMessage' . $res);
}
}
~~~
通过观察代码,我们发现消费redis队列的方法 QueueTask::process 是在 goWithContext 的协程内执行的。
>[danger] ❓那么问题来了:消费方法模拟了需要耗时2秒才能处理完毕。那么如果有队列有10条消息,请问10条消息需要多久才能消费完?
# 并发
带着上面的问题,接着来说说并发。答案是,20秒吗❓其实仅需2秒。 当我们的执行代码包裹在 goWithContext 里,那么该方法就会变成并发执行。
再简单看一个例子
~~~
goWithContext(){
Co::sleep(2)
});
goWithContext(){
Co::sleep(2)
});
~~~
以上代码,如果不支持协程的话需要4秒执行完毕,如果使用ESD框架,那么2段代码会并行执行,只需2秒。如果让10条消息在2秒内处理完,那么在传统的框架下,需要开10个worker并行处理。试想下,如果有100条,甚至1000条消息呢,你的服务器开得起这么多worker吗🙂
# 💡上限
上面所提到的并发,实际上是应用了协程的自动切换机制,此处不做过多的扩展,使用这种特性会受到 esd.server.max_coroutine 配置的限制,默认为3000,也就是说最多可以创建3000个协程。
# 💡 协程池
由于框架的worker 也同样受到 esd.server.max_coroutine 配置的限制,所以如果像上面的例子无脑使用,在高并发下就会出现如下问题
~~~
Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109
Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109
~~~
你的 request 可能无法继续处理请求了。当然可以提高max_coroutine的配置,但是终归不是保险的方案。
那么使用协程池来限制最大并发数,就是一个更优秀的选择。
看一下例子,还是上面的代码,这里只摘出修改的部分
~~~
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
$pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5);
$pool->preStartAllCoreThreads();
while (true) {
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if ($this->config->getPassword() != '') {
$this->redis->auth($this->config->getPassword());
}
try {
while ($val = $this->redis->brPop(['test'], 0)) {
$pool->execute(function() use ($val){
$this->process($val);
});
}
} catch (\RedisException $e) {
$this->info('RedisException ' . $e->getMessage() . '#' . $e->getCode());
}
Co::sleep(1);
}
}
~~~
通过 CoPoolFactory::createCoPool 创建一个协程池,以下代码创建了最低2个协程,最多10个协程的连接池来控制并发。注意协程池的名称不要重复。
~~~
$pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5);
$pool->preStartAllCoreThreads();
~~~
然后使用 $pool->execute 将需要执行的代码通过匿名函数进行传递。
~~~
$pool->execute(function() use ($val){
$this->process($val);
});
~~~
此时超过协程池最大数量的请求就会被阻塞,直到协程空闲。通过此手段,我们就实现了一个可以控制并发数的消费队列。
## 实际场景
如果队列消费是请求三方接口,就可以根据其限流规则合理规划协程池的数量,保证不会因为并发过大被警告。
# 带有返回的并发执行
在需要执行并发的控制器中使用 new Runnable 创建类,在构造函数中传递一个需要并发执行代码的匿名函数。
使用 CoroutineExecutor::getInstance()->execute() 执行。
使用 $ret_1->getResult() 获取返回的数据。
~~~
/**
* @GetMapping()
* @return string
*/
public function gorun(){
$ret_1 = new Runnable(function (){
Co::sleep(2);
return Co::getCid();
},true);
$ret_2 = new Runnable(function (){
Co::sleep(2);
return Co::getCid();
},true);
CoroutineExecutor::getInstance()->execute($ret_1);
CoroutineExecutor::getInstance()->execute($ret_2);
$data = [
'ret1' => $ret_1->getResult(),
'ret2' => $ret_2->getResult(),
];
return $this->successResponse($data);
}
~~~
- 前言
- 捐赠ESD项目
- 使用篇-通用
- 环境
- 安装
- 规范
- 压力测试
- 配置
- 如何设置YML配置
- server配置
- 端口配置
- 项目结构
- 事件派发
- 日志
- 注解
- DI容器
- 自定义进程
- 并发及协程池
- Console插件
- Scheduled插件
- Redis插件
- AOP插件
- Saber插件
- Mysql插件
- mysql事务
- Actuator插件
- Whoops插件
- Cache插件
- PHPUnit插件
- Security插件
- Session插件
- EasyRoute插件
- http路由
- ProcessRpc插件
- AutoReload插件
- AnnotationsScan插件
- Tracing-plugin插件
- MQTT插件
- Pack插件
- AMQP插件
- Validate插件
- Uid插件
- Topic插件
- Blade插件
- CsvReader插件
- hashed-wheel-timer-plugin插件
- 使用篇-HTTP
- 路由
- 静态文件
- 路由定义
- 修饰方法
- 路由分组
- 资源路由
- 端口作用域
- 异常处理
- 跨域请求
- 路由缓存
- 控制器
- 控制器初始化
- 前置操作
- 跳转和重定向
- 异常处理
- 请求
- 请求对象
- 请求信息
- request消息
- response消息
- stream消息
- url接口
- 验证器
- 内置验证器
- 内置过滤器
- 使用篇-WS
- 如何使用
- 路由
- 使用篇-TCP
- 插件篇-PluginSystem
- 微服务篇-ESDCloud
- CircuitBreaker插件
- SaberCloud插件
- 分布式链路追踪系统
- Consul插件