# 5.5 连接池
连接池的重要性,这时就不赘述了,下面具体介绍框架中实现的哪些连接池。
## Redis连接池
### 主要特性
- 支持异步+协程
- 支持断线重连
- 支持自动提取和归还连接
- 统一同步和异步调用方式
### 配置
```php
<?php
/**
* 本地环境
*/
$config['redis']['p1']['ip'] = '127.0.0.1';
$config['redis']['p1']['port'] = 6379;
//$config['redis']['p1']['password'] = 'xxxx';
//$config['redis']['p1']['select'] = 1;
// Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE;
// 是否将key md5后储存,默认为0,开启为1
//$config['redis']['p1']['hashKey'] = 1;
// 设置key的前缀
//$config['redis']['p1']['keyPrefix'] = 'demo_';
return $config;
```
示例配置代码:
[./php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf/pinguo/config/docker/redis.php)
- $config['redis']
代表Redis连接池相关配置
- p1,p2,p3,p4,p5,p6
这里的p仅代表一台或者一组Redis服务器,在使用连接池时会用到,如果Redis服务器端分片(比如twemproxy)就填写为集群导出的地址与端口等信息。
- ip
Redis服务器地址
- port
Redis服务器端口
- password
Redis认证密钥
- select
Redis DB
- redisSerialize
Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
- phpSerialize
PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
- hashKey
是否将key md5后储存,默认为0,开启为1
- keyPrefix
设置key的前缀
### Redis连接池的使用
```php
/**
* Redis示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;
class Redis extends Controller
{
// Redis连接池读写示例
public function actionPoolSetGet()
{
yield $this->getRedisPool('p1')->set('key1', 'val1');
$val = yield $this->getRedisPool('p1')->get('key1');
$this->outputJson($val);
}
}
```
1. $this->getRedisPool($name)
获取连接池对象,并选择名为$name的连接池,$name由配置文件中声明,比如上述配置中的tw
2. 连接池对象的所有方法映射为标准的Redis操作指令
如:`SETEX key seconds value`映射为`$this->getRedisPool($name)->setex($key, $seconds, $value)`
3. string类型的简化操作
`$this->getRedisPool($name)->cache($key, $value = '', $expire = 0)`,`$key`为redis key,`$value`为缓存的值,`$expire`为过期时间,默认不会过期。
4. 执行lua脚本
`$this->getRedisPool($name)->evalMock($script, $args = array(), $numKeys = 0)`
如:
```php
<?php
function luaExample()
{
$num = 100;
$lua = "
local allWorks = {}
local recWorks = {}
local random = nil
for k, v in pairs(KEYS) do
local works = redis.call('sRandMember', v, '" . $num . "')
if works ~= nil then
for key, val in pairs(works) do
table.insert(allWorks, val)
end
end
end
while #recWorks < " . $num . " and #allWorks > 0 do
random = math.random(#allWorks)
table.insert(recWorks, allWorks[random])
table.remove(allWorks, random)
end
return cjson.encode(recWorks)
";
$keys = ['feedId1', 'feedId2', 'feedId3'];
$this->getRedisPool('tw')->evalMock($lua, $keys, count($keys));
}
```
## Redis代理
在Redis连接池的基本上,MSF框架还实现了Redis代理的基本功能,主要特性有:
- 支持分布式自动分片
- 支持master-slave读写分离
- 支持故障自动failover
### 配置
```php
<?php
/**
* 本地环境
*/
$config['redis']['p1']['ip'] = '127.0.0.1';
$config['redis']['p1']['port'] = 6379;
//$config['redis']['p1']['password'] = 'xxxx';
//$config['redis']['p1']['select'] = 1;
// Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE;
// 是否将key md5后储存,默认为0,开启为1
//$config['redis']['p1']['hashKey'] = 1;
// 设置key的前缀
//$config['redis']['p1']['keyPrefix'] = 'demo_';
$config['redis']['p2']['ip'] = '127.0.0.1';
$config['redis']['p2']['port'] = 6380;
$config['redis']['p3']['ip'] = '127.0.0.1';
$config['redis']['p3']['port'] = 6381;
$config['redis']['p4']['ip'] = '127.0.0.1';
$config['redis']['p4']['port'] = 7379;
$config['redis']['p5']['ip'] = '127.0.0.1';
$config['redis']['p5']['port'] = 7380;
$config['redis']['p6']['ip'] = '127.0.0.1';
$config['redis']['p6']['port'] = 7381;
$config['redis_proxy']['master_slave'] = [
'pools' => ['p1', 'p2', 'p3'],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
$config['redis_proxy']['cluster'] = [
'pools' => [
'p4' => 1,
'p5' => 1,
'p6' => 1
],
'mode' => \PG\MSF\Marco::CLUSTER,
];
return $config;
```
示例配置代码:
[https://github.com/pinguo/php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/redis.php)
- $config['redis_proxy']
代表Redis代理相关配置
- cluster
这里的cluster仅代表一组Redis服务器集群,是一个标识
- mode
Redis集群类型,\PG\MSF\Marco::CLUSTER代表分布式的Redis集群;\PG\MSF\Marco::MASTER_SLAVE代表主从结构的Redis集群
- pools
当mode设置为\PG\MSF\Marco::CLUSTER时,pools为array,他的key表示Redis连接池名称,value表示Redis连接池权重;当mode设置为\PG\MSF\Marco::MASTER_SLAVE,pools为英文逗号分隔的Redis连接池名称列表。
### Redis代理的使用
```php
<?php
/**
* Redis示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;
class Redis extends Controller
{
// Redis代理使用示例(分布式)
public function actionProxySetGet()
{
for ($i = 0; $i <= 100; $i++) {
yield $this->getRedisProxy('cluster')->set('proxy' . $i, $i);
}
$val = yield $this->getRedisProxy('cluster')->get('proxy22');
$this->outputJson($val);
}
// Redis代理使用示例(主从)
public function actionMaserSlaveSetGet()
{
for ($i = 0; $i <= 100; $i++) {
yield $this->getRedisProxy('master_slave')->set('M' . $i, $i);
}
$val = yield $this->getRedisProxy('master_slave')->get('M66');
$this->outputJson($val);
}
}
```
## Redis连接池与代理的关系
![Redis连接池与代表的关系](../images/redis连接池和代理.png "Redis连接池与代表的关系")
## MySQL连接池
### 配置
```php
<?php
/**
* Docker环境
*/
$config['mysql']['master']['host'] = '127.0.0.1';
$config['mysql']['master']['port'] = 3306;
$config['mysql']['master']['user'] = 'root';
$config['mysql']['master']['password'] = '123456';
$config['mysql']['master']['charset'] = 'utf8';
$config['mysql']['master']['database'] = 'demo';
$config['mysql']['slave1']['host'] = '127.0.0.1';
$config['mysql']['slave1']['port'] = 3306;
$config['mysql']['slave1']['user'] = 'root';
$config['mysql']['slave1']['password'] = '123456';
$config['mysql']['slave1']['charset'] = 'utf8';
$config['mysql']['slave1']['database'] = 'demo';
$config['mysql']['slave2']['host'] = '127.0.0.1';
$config['mysql']['slave2']['port'] = 3306;
$config['mysql']['slave2']['user'] = 'root';
$config['mysql']['slave2']['password'] = '123456';
$config['mysql']['slave2']['charset'] = 'utf8';
$config['mysql']['slave2']['database'] = 'demo';
$config['mysql_proxy']['master_slave'] = [
'pools' => [
'master' => 'master',
'slaves' => ['slave1', 'slave2'],
],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
return $config;
```
示例配置代码:
[https://github.com/pinguo/php-msf-demo/app/config/docker/mysql.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/mysql.php)
### 执行SQL
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以导入到mysql再运行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
class MySQL extends Controller
{
// MySQL连接池示例
public function actionBizLists()
{
// SQL DBBuilder更多参考 https://github.com/jstayton/Miner
$bizLists = yield $this->getMysqlPool('master')->select("*")->from('biz')->go();
$this->outputJson($bizLists);
}
// 直接执行sql
public function actionShowDB()
{
/**
* @var \PG\MSF\Pools\Miner $DBBuilder
*/
$dbs = yield $this->getMysqlPool('master')->go(null, 'show databases');
$this->outputJson($dbs);
}
// 事务示例
public function actionTransaction()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlPool
*/
$mysqlPool = $this->getMysqlPool('master');
// 开启一个事务,并返回事务ID
$id = yield $mysqlPool->goBegin();
$up = yield $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
$ex = yield $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
yield $mysqlPool->goCommit($id);
$this->outputJson('commit');
} else {
yield $mysqlPool->goRollback($id);
$this->outputJson('rollback');
}
}
}
```
示例代码:
[https://github.com/pinguo/php-msf-demo/app/Controllers/MySQL.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Controllers/MySQL.php)
### DBQueryBuilder
目前php-msf整合的是DB Query Builder是[Miner](https://github.com/jstayton/Miner),更多SQL的拼装请参考它。
另外,$this->getMysqlPool('连接池配置名'),获取的连接池对象,可以在上面直接调用Miner的相关方法,进行sql拼装。
### 关于 go($id = null, $sql = null)
`go($id = null, $sql = null)`是以协程方法执行SQL,它会创建一个MySQL协程,其中`$id`为事务ID,如果未启用事务,默认为null。`$sql`为手工书写待执行的SQL。
### 事务
事务操作的一般流程为:
1. 开启一个事务,并返回事务ID
2. 执行一个SQL,设置事务ID,执行一个SQL,设置事务ID,...
3. 提交(回滚)事务
用代码实现即:
```
try {
$id = yield $mysqlPool->goBegin();
$res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
$res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
} catch (\Exception $e) {
yield $mysqlPool->goRollback($id);
throw $e;
}
yield $mysqlPool->goCommit($id);
```
## MySQL代理
在MySQL连接池的基本上,MSF框架还实现了MySQL代理的基本功能,主要特性有:
* 支持master-slave读写分离
* 支持事务
### 配置代理
如上述配置代码
```php
$config['mysql_proxy']['master_slave'] = [
'pools' => [
'master' => 'master',
'slaves' => ['slave1', 'slave2'],
],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
```
- $config['mysql_proxy']
代表MySQL代理相关配置
- master_slave
这里的master_slave仅代表一组MySQL服务器集群,是一个标识
- mode
MySQL集群类型\PG\MSF\Marco::MASTER_SLAVE代表主从结构的MySQL集群
- pools
当mode设置为\PG\MSF\Marco::MASTER_SLAVE, `pools.master`表示MySQL主节点对应的连接池标识;`pools.slaves`为数字索引MySQL从节点对应的连接池标识列表
### MySQL代理的使用
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以导入到mysql再运行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
class MySQL extends Controller
{
// MySQL代理使用示例
public function actionProxy()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
*/
$mysqlProxy = $this->getMysqlProxy('master_slave');
$bizLists = yield $mysqlProxy->select("*")->from('user')->go();
$up = yield $mysqlProxy->update('user')->set('name', '徐典阳-6')->where('id', 3)->go();
$this->outputJson($bizLists);
}
// MySQL代理事务,事务只会在主节点上执行
public function actionProxyTransaction()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
*/
$mysqlProxy = $this->getMysqlProxy('master_slave');
// 开启一个事务,并返回事务ID
$id = yield $mysqlProxy->goBegin();
$up = yield $mysqlProxy->update('user')->set('name', '徐典阳-2')->where('id', 3)->go($id);
$ex = yield $mysqlProxy->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
yield $mysqlProxy->goCommit($id);
$this->outputJson('commit');
} else {
yield $mysqlProxy->goRollback($id);
$this->outputJson('rollback');
}
}
}
```
MySQL代理基于连接池,它和连接池的使用唯一区别在于从`$this->getMysqlPool`切换为`$this->getMysqlProxy`,所有的调用方式和连接池保持一致,就是这么简单。
## MySQL同步模式
有一些场景,需要用到MySQL同步查询数据,比如Task在Tasker进程中执行,由于Tasker是同步阻塞的进程模型,在处理数据过程中又需要查询数据库中的数据,然后再计算相关数据,这个时候就需要使用MySQL同步模式。
php-msf框架内部已经将异步和同步查询MySQL数据的差异屏蔽,同步模式下采用长连接,如果连接断开,驱动会自动重连,唯一的区别在于同步模式不需要添加yield关键字,如:
### MySQL同步Task
```php
<?php
/**
* Demo Task
*
* 注意理论上本文件代码应该在Tasker进程中执行
*/
namespace App\Tasks;
use \PG\MSF\Tasks\Task;
/**
* Class Demo
* @package App\Tasks
*/
class Demo extends Task
{
/**
* 连接池执行同步查询
*
* @return array
*/
public function syncMySQLPool()
{
$user = $this->getMysqlPool('master')->select("*")->from("user")->go();
return $user;
}
/**
* 代理执行同步查询
*
* @return array
*/
public function syncMySQLProxy()
{
$user = $this->getMysqlProxy('master_slave')->select("*")->from("user")->go();
return $user;
}
/**
* 连接池执行同步事务
*
* @return boolean
*/
public function syncMySQLPoolTransaction()
{
$mysqlPool = $this->getMysqlPool('master');
$id = $mysqlPool->begin();
// 开启一个事务,并返回事务ID
$up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
$ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
$mysqlPool->commit();
return true;
} else {
$mysqlPool->rollback();
return false;
}
}
/**
* 代理执行同步事务
*
* @return boolean
*/
public function syncMySQLProxyTransaction()
{
$mysqlPool = $this->getMysqlProxy('master_slave');
$id = $mysqlPool->begin();
// 开启一个事务,并返回事务ID
$up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
$ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
$mysqlPool->commit();
return true;
} else {
$mysqlPool->rollback();
return false;
}
}
}
```
示例代码:
[https://github.com/pinguo/php-msf-demo/app/Tasks/Demo.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Tasks/Demo.php)
### 调用MySQL同步查询数据
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以导入到mysql再运行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Tasks\Demo as DemoTask;
class MySQL extends Controller
{
// 通过Task,同步执行MySQL查询(连接池)
public function actionSyncMySQLPoolTask()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLPool();
$this->outputJson($user);
}
// 通过Task,同步执行MySQL查询(代理)
public function actionSyncMySQLProxyTask()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLProxy();
$this->outputJson($user);
}
// 通过Task,同步执行MySQL事务查询(连接池)
public function actionSyncMySQLPoolTaskTransaction()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLPoolTransaction();
$this->outputJson($user);
}
// 通过Task,同步执行MySQL事务查询(代理)
public function actionSyncMySQLProxyTaskTransaction()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLProxyTransaction();
$this->outputJson($user);
}
}
```
- 0 文档说明
- 1 为什么研发新框架
- 1.1 传统php-fpm工作模式的问题
- 1.2 压测数据对比
- 1.3 小结
- 2 微服务框架研发概览
- 2.1 通信框架技术选型
- 2.2 swoole
- 2.3 协程原理
- 2.4 异步、并发
- 2.5 小结
- 3 框架运行环境
- 3.1 环境变量
- 3.2 运行代码
- 3.3 docker
- 3.4 小结
- 4 框架结构
- 4.1 结构概述
- 4.2 控制器
- 4.3 模型
- 4.4 视图
- 4.5 同步任务
- 4.6 配置
- 4.7 路由
- 4.8 小结
- 5 框架组件
- 5.1 协程
- 5.2 类的加载
- 5.3 异步Http Client
- 5.4 请求上下文
- 5.5 连接池
- 5.6 对象池
- 5.7 RPC
- 5.8 公共库
- 5.9 RESTful
- 5.10 多语言
- 5.11 杂项
- 5.12 小结
- 6 常见问题
- 7 附录