## 连接池的意义 对于基于php-fpm的传统php-web应用,包括且不限于Mysql,Redis,RabbitMq,每次请求到来都需要为其新建一套独享的的连接,这直接带来了一些典型问题: 1. **连接开销** : 连接随着http请求到来而新建,随着请求返回而销毁,大量连接新建销毁是对系统资源的浪费。 2. **连接数量过高** :每一个请求都需要一套自己的连接,系统连接数和并发数会成一个近线性的关系。如果系统并发量达到了1w,那么就需要建立1w个对应的连接,这对于Mysql之类的后端服务而言,是一个大的负荷。 3. **空闲连接**:假设我们有一个接口使用了一个Mysql连接。该接口在一开始进行一次sql查询后,后面的操作都是sql无关的,那么该请求占据的空闲连接完全就是一种资源的浪费。 对于异步系统而言,这个问题变得更加的严峻。一个请求处理进程要对同一个服务进行并发的操作,意味着这个请求要持有1个以上同类的连接,这对于系统压力而言,无疑是雪上加霜了,所以连接池对于基于Swoole的Web框架而言已经是一个必需实现的机制了。事实上基本所有的框架都内置了连接池管理机制,比如 [easyswoole Pool管理器](http://www.easyswoole.com/Manual/3.x/Cn/_book/Components/CoroutinePool/pool.html),[swoft 连接池](https://doc.swoft.org/master/zh-CN/core/connection-pool.html) 等等。 ## 实现Mysql连接池 一个基本连接池,大致要实现以下功能: 1. 创建连接:连接池启动后,初始化一定的空闲连接,指定为最少的连接min。当连接池为空,不够用时,创建新的连接放到池里,但不能超过指定的最大连接max数量。 2. 连接释放:每次使用完连接,一定要调用释放方法,把连接放回池中,给其他程序或请求使用。 3. 连接分配:连接池中用pop和push的方式对等入队和出队分配与回收。能实现阻塞分配,也就是在池空并且已创建数量大于max,阻塞一定时间等待其他请求的连接释放,超时则返回null。 4. 连接管理:对连接池中的连接,定时检活和释放空闲连接等。 **连接池管理器** ~~~ <?php /** * 连接池抽象类 */ use Swoole\Coroutine\Channel; abstract class AbstractPool { private $min;//最少连接数 private $max;//最大连接数 private $count;//当前连接数 private $connections;//连接池组 protected $spareTime;//用于空闲连接回收判断 private $inited = false; protected abstract function createDb(); public function __construct() { $this->min = 10; $this->max = 100; $this->spareTime = 10 * 3600; $this->connections = new Channel($this->max + 1); } protected function createObject() { $obj = null; $db = $this->createDb(); if ($db) { $obj = [ 'last_used_time' => time() , 'db' => $db , ]; } return $obj; } /** * 初始化最小数量连接池 * @return $this|null */ public function init() { if ($this->inited) { return null; } for ($i = 0;$i < $this->min;$i ++) { $obj = $this->createObject(); $this->count ++; $this->connections->push($obj); } return $this; } public function getConnection($timeOut = 3) { $obj = null; if(!$this->connections->isEmpty()){ return $this->connections->pop($timeOut); } // 大量并发请求过多,连接池connections为空 if ($this->count < $this->max) { // 连接数没达到最大,新建连接返回 $this->count ++; $obj = $this->createObject(); return $obj; } // timeout为出队的最大的等待时间 // 如果超过最大等待时间后会返回false,客户端要判断一下 // 此处还起到一个限流作用 return $this->connections->pop($timeOut); } /* * 链接使用完进行回收 */ public function free($obj) { if ($obj) { $this->connections->push($obj); } } /** * 处理空闲连接 */ public function gcSpareObject() { //大约2分钟检测一次连接 swoole_timer_tick( 120000 , function (){ $list = []; while (true) { if (!$this->connections->isEmpty()) { // 等待的时间要快,免得链接被用掉 $obj = $this->connections->pop(0.001); $last_used_time = $obj['last_used_time']; // 超过$this->spareTime的认为是空闲连接,pop掉 if (time() - $last_used_time > $this->spareTime) { $this->count --; } else { // 没超过就继续push回去 array_push($list , $obj); } } else { break; } } foreach ($list as $item) { $this->connections->push($item); } unset($list); // keepMin(); 处理完之后就要保证最低连接数 $this->keepMin(); } ); } private function keepMin() { if ($this->count >= $this->min) { return $this->count; } else { $num = $this->min - $this->count; } for ($i = 0;$i < $num;$i ++) { $obj = $this->createObject(); $this->count ++; $this->connections->push($obj); } return $this->count; } } ~~~ **pdo同步客户端链接** ~~~ <?php require "AbstractPool.php"; class MysqlPoolPdo extends AbstractPool { //数据库配置 protected $dbConfig = [ 'host' => 'mysql:host=127.0.0.1:3306;dbname=test' , 'port' => 3306 , 'user' => 'root' , 'password' => '123456' , 'database' => 'test' , 'charset' => 'utf8' , 'timeout' => 2 , ]; public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolPdo(); } return self::$instance; } protected function createDb() { return new PDO($this->dbConfig['host'] , $this->dbConfig['user'] , $this->dbConfig['password']); } } $httpServer = new swoole_http_server('0.0.0.0' , 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on( "WorkerStart" , function (){ // 初始化最少数量(min指定)的连接对象,放进类型为Channel的connections对象中。 MysqlPoolPdo::getInstance()->init(); } ); $httpServer->on( "request" , function ($request , $response){ $db = null; // 从channle中pop数据库链接对象出来 $obj = MysqlPoolPdo::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj['db']; } if ($db) { // 此时如果并发了10个请求,server因为配置了1个worker,所以再pop到一个对象返回时,遇到sleep()的查询, // 因为用的连接对象是pdo的查询,是同步阻塞的,所以此时的woker进程只能等待,完成后才能进入下一个请求。 // 因此,池中的其余连接其实是多余的,同步客户端的请求速度只能和woker的数量有关。 // ab -c 10 -n 10 http://127.0.0.1:9501/ $db->query("select sleep(2)"); $ret = $db->query("select * from tb_game limit 1"); // 使用完链接对象就回收 MysqlPoolPdo::getInstance()->free($obj); $response->end(json_encode($ret)); } } ); $httpServer->start(); ~~~ **协程异步客户端链接** ~~~ <?php require "AbstractPool.php"; class MysqlPoolCoroutine extends AbstractPool { protected $dbConfig = [ 'host' => '127.0.0.1' , 'port' => 3306 , 'user' => 'root' , 'password' => '123456' , 'database' => 'test' , 'charset' => 'utf8' , 'timeout' => 10 , ]; public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolCoroutine(); } return self::$instance; } protected function createDb() { $db = new Swoole\Coroutine\Mysql(); $db->connect( $this->dbConfig ); return $db; } } $httpServer = new swoole_http_server('0.0.0.0' , 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on( "WorkerStart" , function (){ MysqlPoolCoroutine::getInstance()->init(); } ); $httpServer->on( "request" , function ($request , $response){ $db = null; $obj = MysqlPoolCoroutine::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj['db']; } if ($db) { // 遇上sleep阻塞时,woker进程不是在等待select的完成,而是切换到另外的协程去处理下一个请求。 $db->query("select sleep(2)"); $ret = $db->query("select * from tb_game limit 1"); // 完成后同样释放对象到池中 MysqlPoolCoroutine::getInstance()->free($obj); $response->end(json_encode($ret)); } } ); $httpServer->start(); ~~~ 可以通过以下语句来观察两者之间的差异: ab -c 10 -n 100 http://127.0.0.1:9501/ # mysql console内执行 SHOW STATUS LIKE 'Threads%'; show processlist; 网站之所以慢绝大多数原因是发生了阻塞。而发生阻塞的原因多半是mysql阻塞。 >[info] 更完整的连接池代码请参考 easyswoole /vendor/easyswoole/component/src/Pool/AbstractPool.php