企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 5.5 连接池 连接池的重要性,这时就不赘述了,下面具体介绍框架中实现的哪些连接池。 ## Redis连接池 ### 主要特性 - 支持异步+协程 - 支持断线重连 - 支持自动提取和归还连接 - 统一同步和异步调用方式 ### 配置 ```php <?php /** * 本地环境 */ $config['redis']['p1']['ip'] = '127.0.0.1'; $config['redis']['p1']['port'] = 6379; //$config['redis']['p1']['password'] = 'xxxx'; //$config['redis']['p1']['select'] = 1; // Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY //$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP; // PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY //$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE; // 是否将key md5后储存,默认为0,开启为1 //$config['redis']['p1']['hashKey'] = 1; // 设置key的前缀 //$config['redis']['p1']['keyPrefix'] = 'demo_'; return $config; ``` 示例配置代码: [./php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf/pinguo/config/docker/redis.php) - $config['redis'] 代表Redis连接池相关配置 - p1,p2,p3,p4,p5,p6 这里的p仅代表一台或者一组Redis服务器,在使用连接池时会用到,如果Redis服务器端分片(比如twemproxy)就填写为集群导出的地址与端口等信息。 - ip Redis服务器地址 - port Redis服务器端口 - password Redis认证密钥 - select Redis DB - redisSerialize Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY - phpSerialize PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY - hashKey 是否将key md5后储存,默认为0,开启为1 - keyPrefix 设置key的前缀 ### Redis连接池的使用 ```php /** * Redis示例控制器 * * @author camera360_server@camera360.com * @copyright Chengdu pinguo Technology Co.,Ltd. */ namespace App\Controllers; use PG\MSF\Controllers\Controller; use App\Models\Demo as DemoModel; class Redis extends Controller { // Redis连接池读写示例 public function actionPoolSetGet() { yield $this->getRedisPool('p1')->set('key1', 'val1'); $val = yield $this->getRedisPool('p1')->get('key1'); $this->outputJson($val); } } ``` 1. $this->getRedisPool($name) 获取连接池对象,并选择名为$name的连接池,$name由配置文件中声明,比如上述配置中的tw 2. 连接池对象的所有方法映射为标准的Redis操作指令 如:`SETEX key seconds value`映射为`$this->getRedisPool($name)->setex($key, $seconds, $value)` 3. string类型的简化操作 `$this->getRedisPool($name)->cache($key, $value = '', $expire = 0)`,`$key`为redis key,`$value`为缓存的值,`$expire`为过期时间,默认不会过期。 4. 执行lua脚本 `$this->getRedisPool($name)->evalMock($script, $args = array(), $numKeys = 0)` 如: ```php <?php function luaExample() { $num = 100; $lua = " local allWorks = {} local recWorks = {} local random = nil for k, v in pairs(KEYS) do local works = redis.call('sRandMember', v, '" . $num . "') if works ~= nil then for key, val in pairs(works) do table.insert(allWorks, val) end end end while #recWorks < " . $num . " and #allWorks > 0 do random = math.random(#allWorks) table.insert(recWorks, allWorks[random]) table.remove(allWorks, random) end return cjson.encode(recWorks) "; $keys = ['feedId1', 'feedId2', 'feedId3']; $this->getRedisPool('tw')->evalMock($lua, $keys, count($keys)); } ``` ## Redis代理 在Redis连接池的基本上,MSF框架还实现了Redis代理的基本功能,主要特性有: - 支持分布式自动分片 - 支持master-slave读写分离 - 支持故障自动failover ### 配置 ```php <?php /** * 本地环境 */ $config['redis']['p1']['ip'] = '127.0.0.1'; $config['redis']['p1']['port'] = 6379; //$config['redis']['p1']['password'] = 'xxxx'; //$config['redis']['p1']['select'] = 1; // Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY //$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP; // PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY //$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE; // 是否将key md5后储存,默认为0,开启为1 //$config['redis']['p1']['hashKey'] = 1; // 设置key的前缀 //$config['redis']['p1']['keyPrefix'] = 'demo_'; $config['redis']['p2']['ip'] = '127.0.0.1'; $config['redis']['p2']['port'] = 6380; $config['redis']['p3']['ip'] = '127.0.0.1'; $config['redis']['p3']['port'] = 6381; $config['redis']['p4']['ip'] = '127.0.0.1'; $config['redis']['p4']['port'] = 7379; $config['redis']['p5']['ip'] = '127.0.0.1'; $config['redis']['p5']['port'] = 7380; $config['redis']['p6']['ip'] = '127.0.0.1'; $config['redis']['p6']['port'] = 7381; $config['redis_proxy']['master_slave'] = [ 'pools' => ['p1', 'p2', 'p3'], 'mode' => \PG\MSF\Marco::MASTER_SLAVE, ]; $config['redis_proxy']['cluster'] = [ 'pools' => [ 'p4' => 1, 'p5' => 1, 'p6' => 1 ], 'mode' => \PG\MSF\Marco::CLUSTER, ]; return $config; ``` 示例配置代码: [https://github.com/pinguo/php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/redis.php) - $config['redis_proxy'] 代表Redis代理相关配置 - cluster 这里的cluster仅代表一组Redis服务器集群,是一个标识 - mode Redis集群类型,\PG\MSF\Marco::CLUSTER代表分布式的Redis集群;\PG\MSF\Marco::MASTER_SLAVE代表主从结构的Redis集群 - pools 当mode设置为\PG\MSF\Marco::CLUSTER时,pools为array,他的key表示Redis连接池名称,value表示Redis连接池权重;当mode设置为\PG\MSF\Marco::MASTER_SLAVE,pools为英文逗号分隔的Redis连接池名称列表。 ### Redis代理的使用 ```php <?php /** * Redis示例控制器 * * @author camera360_server@camera360.com * @copyright Chengdu pinguo Technology Co.,Ltd. */ namespace App\Controllers; use PG\MSF\Controllers\Controller; use App\Models\Demo as DemoModel; class Redis extends Controller { // Redis代理使用示例(分布式) public function actionProxySetGet() { for ($i = 0; $i <= 100; $i++) { yield $this->getRedisProxy('cluster')->set('proxy' . $i, $i); } $val = yield $this->getRedisProxy('cluster')->get('proxy22'); $this->outputJson($val); } // Redis代理使用示例(主从) public function actionMaserSlaveSetGet() { for ($i = 0; $i <= 100; $i++) { yield $this->getRedisProxy('master_slave')->set('M' . $i, $i); } $val = yield $this->getRedisProxy('master_slave')->get('M66'); $this->outputJson($val); } } ``` ## Redis连接池与代理的关系 ![Redis连接池与代表的关系](../images/redis连接池和代理.png "Redis连接池与代表的关系") ## MySQL连接池 ### 配置 ```php <?php /** * Docker环境 */ $config['mysql']['master']['host'] = '127.0.0.1'; $config['mysql']['master']['port'] = 3306; $config['mysql']['master']['user'] = 'root'; $config['mysql']['master']['password'] = '123456'; $config['mysql']['master']['charset'] = 'utf8'; $config['mysql']['master']['database'] = 'demo'; $config['mysql']['slave1']['host'] = '127.0.0.1'; $config['mysql']['slave1']['port'] = 3306; $config['mysql']['slave1']['user'] = 'root'; $config['mysql']['slave1']['password'] = '123456'; $config['mysql']['slave1']['charset'] = 'utf8'; $config['mysql']['slave1']['database'] = 'demo'; $config['mysql']['slave2']['host'] = '127.0.0.1'; $config['mysql']['slave2']['port'] = 3306; $config['mysql']['slave2']['user'] = 'root'; $config['mysql']['slave2']['password'] = '123456'; $config['mysql']['slave2']['charset'] = 'utf8'; $config['mysql']['slave2']['database'] = 'demo'; $config['mysql_proxy']['master_slave'] = [ 'pools' => [ 'master' => 'master', 'slaves' => ['slave1', 'slave2'], ], 'mode' => \PG\MSF\Marco::MASTER_SLAVE, ]; return $config; ``` 示例配置代码: [https://github.com/pinguo/php-msf-demo/app/config/docker/mysql.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/mysql.php) ### 执行SQL ```php <?php /** * MySQL示例控制器 * * app/data/demo.sql可以导入到mysql再运行示例方法 * * @author camera360_server@camera360.com * @copyright Chengdu pinguo Technology Co.,Ltd. */ namespace App\Controllers; use PG\MSF\Controllers\Controller; class MySQL extends Controller { // MySQL连接池示例 public function actionBizLists() { // SQL DBBuilder更多参考 https://github.com/jstayton/Miner $bizLists = yield $this->getMysqlPool('master')->select("*")->from('biz')->go(); $this->outputJson($bizLists); } // 直接执行sql public function actionShowDB() { /** * @var \PG\MSF\Pools\Miner $DBBuilder */ $dbs = yield $this->getMysqlPool('master')->go(null, 'show databases'); $this->outputJson($dbs); } // 事务示例 public function actionTransaction() { /** * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlPool */ $mysqlPool = $this->getMysqlPool('master'); // 开启一个事务,并返回事务ID $id = yield $mysqlPool->goBegin(); $up = yield $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id); $ex = yield $mysqlPool->select('*')->from('user')->where('id', 3)->go($id); if ($ex['result']) { yield $mysqlPool->goCommit($id); $this->outputJson('commit'); } else { yield $mysqlPool->goRollback($id); $this->outputJson('rollback'); } } } ``` 示例代码: [https://github.com/pinguo/php-msf-demo/app/Controllers/MySQL.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Controllers/MySQL.php) ### DBQueryBuilder 目前php-msf整合的是DB Query Builder是[Miner](https://github.com/jstayton/Miner),更多SQL的拼装请参考它。 另外,$this->getMysqlPool('连接池配置名'),获取的连接池对象,可以在上面直接调用Miner的相关方法,进行sql拼装。 ### 关于 go($id = null, $sql = null) `go($id = null, $sql = null)`是以协程方法执行SQL,它会创建一个MySQL协程,其中`$id`为事务ID,如果未启用事务,默认为null。`$sql`为手工书写待执行的SQL。 ### 事务 事务操作的一般流程为: 1. 开启一个事务,并返回事务ID 2. 执行一个SQL,设置事务ID,执行一个SQL,设置事务ID,... 3. 提交(回滚)事务 用代码实现即: ``` try { $id = yield $mysqlPool->goBegin(); $res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id) $res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id) } catch (\Exception $e) { yield $mysqlPool->goRollback($id); throw $e; } yield $mysqlPool->goCommit($id); ``` ## MySQL代理 在MySQL连接池的基本上,MSF框架还实现了MySQL代理的基本功能,主要特性有: * 支持master-slave读写分离 * 支持事务 ### 配置代理 如上述配置代码 ```php $config['mysql_proxy']['master_slave'] = [ 'pools' => [ 'master' => 'master', 'slaves' => ['slave1', 'slave2'], ], 'mode' => \PG\MSF\Marco::MASTER_SLAVE, ]; ``` - $config['mysql_proxy'] 代表MySQL代理相关配置 - master_slave 这里的master_slave仅代表一组MySQL服务器集群,是一个标识 - mode MySQL集群类型\PG\MSF\Marco::MASTER_SLAVE代表主从结构的MySQL集群 - pools 当mode设置为\PG\MSF\Marco::MASTER_SLAVE, `pools.master`表示MySQL主节点对应的连接池标识;`pools.slaves`为数字索引MySQL从节点对应的连接池标识列表 ### MySQL代理的使用 ```php <?php /** * MySQL示例控制器 * * app/data/demo.sql可以导入到mysql再运行示例方法 * * @author camera360_server@camera360.com * @copyright Chengdu pinguo Technology Co.,Ltd. */ namespace App\Controllers; use PG\MSF\Controllers\Controller; class MySQL extends Controller { // MySQL代理使用示例 public function actionProxy() { /** * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy */ $mysqlProxy = $this->getMysqlProxy('master_slave'); $bizLists = yield $mysqlProxy->select("*")->from('user')->go(); $up = yield $mysqlProxy->update('user')->set('name', '徐典阳-6')->where('id', 3)->go(); $this->outputJson($bizLists); } // MySQL代理事务,事务只会在主节点上执行 public function actionProxyTransaction() { /** * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy */ $mysqlProxy = $this->getMysqlProxy('master_slave'); // 开启一个事务,并返回事务ID $id = yield $mysqlProxy->goBegin(); $up = yield $mysqlProxy->update('user')->set('name', '徐典阳-2')->where('id', 3)->go($id); $ex = yield $mysqlProxy->select('*')->from('user')->where('id', 3)->go($id); if ($ex['result']) { yield $mysqlProxy->goCommit($id); $this->outputJson('commit'); } else { yield $mysqlProxy->goRollback($id); $this->outputJson('rollback'); } } } ``` MySQL代理基于连接池,它和连接池的使用唯一区别在于从`$this->getMysqlPool`切换为`$this->getMysqlProxy`,所有的调用方式和连接池保持一致,就是这么简单。 ## MySQL同步模式 有一些场景,需要用到MySQL同步查询数据,比如Task在Tasker进程中执行,由于Tasker是同步阻塞的进程模型,在处理数据过程中又需要查询数据库中的数据,然后再计算相关数据,这个时候就需要使用MySQL同步模式。 php-msf框架内部已经将异步和同步查询MySQL数据的差异屏蔽,同步模式下采用长连接,如果连接断开,驱动会自动重连,唯一的区别在于同步模式不需要添加yield关键字,如: ### MySQL同步Task ```php <?php /** * Demo Task * * 注意理论上本文件代码应该在Tasker进程中执行 */ namespace App\Tasks; use \PG\MSF\Tasks\Task; /** * Class Demo * @package App\Tasks */ class Demo extends Task { /** * 连接池执行同步查询 * * @return array */ public function syncMySQLPool() { $user = $this->getMysqlPool('master')->select("*")->from("user")->go(); return $user; } /** * 代理执行同步查询 * * @return array */ public function syncMySQLProxy() { $user = $this->getMysqlProxy('master_slave')->select("*")->from("user")->go(); return $user; } /** * 连接池执行同步事务 * * @return boolean */ public function syncMySQLPoolTransaction() { $mysqlPool = $this->getMysqlPool('master'); $id = $mysqlPool->begin(); // 开启一个事务,并返回事务ID $up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id); $ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id); if ($ex['result']) { $mysqlPool->commit(); return true; } else { $mysqlPool->rollback(); return false; } } /** * 代理执行同步事务 * * @return boolean */ public function syncMySQLProxyTransaction() { $mysqlPool = $this->getMysqlProxy('master_slave'); $id = $mysqlPool->begin(); // 开启一个事务,并返回事务ID $up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id); $ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id); if ($ex['result']) { $mysqlPool->commit(); return true; } else { $mysqlPool->rollback(); return false; } } } ``` 示例代码: [https://github.com/pinguo/php-msf-demo/app/Tasks/Demo.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Tasks/Demo.php) ### 调用MySQL同步查询数据 ```php <?php /** * MySQL示例控制器 * * app/data/demo.sql可以导入到mysql再运行示例方法 * * @author camera360_server@camera360.com * @copyright Chengdu pinguo Technology Co.,Ltd. */ namespace App\Controllers; use PG\MSF\Controllers\Controller; use App\Tasks\Demo as DemoTask; class MySQL extends Controller { // 通过Task,同步执行MySQL查询(连接池) public function actionSyncMySQLPoolTask() { /** * @var DemoTask $demoTask */ $demoTask = $this->getObject(DemoTask::class); $user = yield $demoTask->syncMySQLPool(); $this->outputJson($user); } // 通过Task,同步执行MySQL查询(代理) public function actionSyncMySQLProxyTask() { /** * @var DemoTask $demoTask */ $demoTask = $this->getObject(DemoTask::class); $user = yield $demoTask->syncMySQLProxy(); $this->outputJson($user); } // 通过Task,同步执行MySQL事务查询(连接池) public function actionSyncMySQLPoolTaskTransaction() { /** * @var DemoTask $demoTask */ $demoTask = $this->getObject(DemoTask::class); $user = yield $demoTask->syncMySQLPoolTransaction(); $this->outputJson($user); } // 通过Task,同步执行MySQL事务查询(代理) public function actionSyncMySQLProxyTaskTransaction() { /** * @var DemoTask $demoTask */ $demoTask = $this->getObject(DemoTask::class); $user = yield $demoTask->syncMySQLProxyTransaction(); $this->outputJson($user); } } ```