## 连接池的意义
对于基于php-fpm的传统php-web应用,包括且不限于Mysql,Redis,RabbitMq,每次请求到来都需要为其新建一套独享的的连接,这直接带来了一些典型问题:
1. **连接开销** : 连接随着http请求到来而新建,随着请求返回而销毁,大量连接新建销毁是对系统资源的浪费。
2. **连接数量过高** :每一个请求都需要一套自己的连接,系统连接数和并发数会成一个近线性的关系。如果系统并发量达到了1w,那么就需要建立1w个对应的连接,这对于Mysql之类的后端服务而言,是一个大的负荷。
3. **空闲连接**:假设我们有一个接口使用了一个Mysql连接。该接口在一开始进行一次sql查询后,后面的操作都是sql无关的,那么该请求占据的空闲连接完全就是一种资源的浪费。
对于异步系统而言,这个问题变得更加的严峻。一个请求处理进程要对同一个服务进行并发的操作,意味着这个请求要持有1个以上同类的连接,这对于系统压力而言,无疑是雪上加霜了,所以连接池对于基于Swoole的Web框架而言已经是一个必需实现的机制了。事实上基本所有的框架都内置了连接池管理机制,比如 [easyswoole Pool管理器](http://www.easyswoole.com/Manual/3.x/Cn/_book/Components/CoroutinePool/pool.html),[swoft 连接池](https://doc.swoft.org/master/zh-CN/core/connection-pool.html) 等等。
## 实现Mysql连接池
一个基本连接池,大致要实现以下功能:
1. 创建连接:连接池启动后,初始化一定的空闲连接,指定为最少的连接min。当连接池为空,不够用时,创建新的连接放到池里,但不能超过指定的最大连接max数量。
2. 连接释放:每次使用完连接,一定要调用释放方法,把连接放回池中,给其他程序或请求使用。
3. 连接分配:连接池中用pop和push的方式对等入队和出队分配与回收。能实现阻塞分配,也就是在池空并且已创建数量大于max,阻塞一定时间等待其他请求的连接释放,超时则返回null。
4. 连接管理:对连接池中的连接,定时检活和释放空闲连接等。
**连接池管理器**
~~~
<?php
/**
* 连接池抽象类
*/
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少连接数
private $max;//最大连接数
private $count;//当前连接数
private $connections;//连接池组
protected $spareTime;//用于空闲连接回收判断
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time() ,
'db' => $db ,
];
}
return $obj;
}
/**
* 初始化最小数量连接池
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0;$i < $this->min;$i ++) {
$obj = $this->createObject();
$this->count ++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if(!$this->connections->isEmpty()){
return $this->connections->pop($timeOut);
}
// 大量并发请求过多,连接池connections为空
if ($this->count < $this->max) {
// 连接数没达到最大,新建连接返回
$this->count ++;
$obj = $this->createObject();
return $obj;
}
// timeout为出队的最大的等待时间
// 如果超过最大等待时间后会返回false,客户端要判断一下
// 此处还起到一个限流作用
return $this->connections->pop($timeOut);
}
/*
* 链接使用完进行回收
*/
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 处理空闲连接
*/
public function gcSpareObject()
{
//大约2分钟检测一次连接
swoole_timer_tick(
120000 , function (){
$list = [];
while (true) {
if (!$this->connections->isEmpty()) {
// 等待的时间要快,免得链接被用掉
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
// 超过$this->spareTime的认为是空闲连接,pop掉
if (time() - $last_used_time > $this->spareTime) {
$this->count --;
} else {
// 没超过就继续push回去
array_push($list , $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
// keepMin(); 处理完之后就要保证最低连接数
$this->keepMin();
}
);
}
private function keepMin()
{
if ($this->count >= $this->min) {
return $this->count;
} else {
$num = $this->min - $this->count;
}
for ($i = 0;$i < $num;$i ++) {
$obj = $this->createObject();
$this->count ++;
$this->connections->push($obj);
}
return $this->count;
}
}
~~~
**pdo同步客户端链接**
~~~
<?php
require "AbstractPool.php";
class MysqlPoolPdo extends AbstractPool
{
//数据库配置
protected $dbConfig = [
'host' => 'mysql:host=127.0.0.1:3306;dbname=test' ,
'port' => 3306 ,
'user' => 'root' ,
'password' => '123456' ,
'database' => 'test' ,
'charset' => 'utf8' ,
'timeout' => 2 ,
];
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'] , $this->dbConfig['user'] , $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0' , 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on(
"WorkerStart" , function (){
// 初始化最少数量(min指定)的连接对象,放进类型为Channel的connections对象中。
MysqlPoolPdo::getInstance()->init();
}
);
$httpServer->on(
"request" , function ($request , $response){
$db = null;
// 从channle中pop数据库链接对象出来
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj['db'];
}
if ($db) {
// 此时如果并发了10个请求,server因为配置了1个worker,所以再pop到一个对象返回时,遇到sleep()的查询,
// 因为用的连接对象是pdo的查询,是同步阻塞的,所以此时的woker进程只能等待,完成后才能进入下一个请求。
// 因此,池中的其余连接其实是多余的,同步客户端的请求速度只能和woker的数量有关。
// ab -c 10 -n 10 http://127.0.0.1:9501/
$db->query("select sleep(2)");
$ret = $db->query("select * from tb_game limit 1");
// 使用完链接对象就回收
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
}
);
$httpServer->start();
~~~
**协程异步客户端链接**
~~~
<?php
require "AbstractPool.php";
class MysqlPoolCoroutine extends AbstractPool
{
protected $dbConfig = [
'host' => '127.0.0.1' ,
'port' => 3306 ,
'user' => 'root' ,
'password' => '123456' ,
'database' => 'test' ,
'charset' => 'utf8' ,
'timeout' => 10 ,
];
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolCoroutine();
}
return self::$instance;
}
protected function createDb()
{
$db = new Swoole\Coroutine\Mysql();
$db->connect(
$this->dbConfig
);
return $db;
}
}
$httpServer = new swoole_http_server('0.0.0.0' , 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on(
"WorkerStart" , function (){
MysqlPoolCoroutine::getInstance()->init();
}
);
$httpServer->on(
"request" , function ($request , $response){
$db = null;
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj['db'];
}
if ($db) {
// 遇上sleep阻塞时,woker进程不是在等待select的完成,而是切换到另外的协程去处理下一个请求。
$db->query("select sleep(2)");
$ret = $db->query("select * from tb_game limit 1");
// 完成后同样释放对象到池中
MysqlPoolCoroutine::getInstance()->free($obj);
$response->end(json_encode($ret));
}
}
);
$httpServer->start();
~~~
可以通过以下语句来观察两者之间的差异:
ab -c 10 -n 100 http://127.0.0.1:9501/
# mysql console内执行
SHOW STATUS LIKE 'Threads%';
show processlist;
网站之所以慢绝大多数原因是发生了阻塞。而发生阻塞的原因多半是mysql阻塞。
>[info] 更完整的连接池代码请参考 easyswoole /vendor/easyswoole/component/src/Pool/AbstractPool.php