企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] * * * * * ## 1 成员列表 ### 1 成员属性 ~~~ ;版本号 const VERSION = '3.3.1'; ;启动状态,运行状态,停止状态,重载状态 const STATUS_STARTING = 1; const STATUS_RUNNING = 2; const STATUS_SHUTDOWN = 4; const STATUS_RELOADING = 8; ;子进程强制关闭时间,默认baclog长度,udp数据包最大 const KILL_WORKER_TIMER_TIME = 2; const DEFAUL_BACKLOG = 1024; const MAX_UDP_PACKAGE_SIZE = 65535; ;进程id编号,进程名称,worker进程数量 public $id = 0; public $name = 'none'; public $count = 1; ;进程用户,进程组 public $user = ''; public $group = ''; ;是否可以重载,是否复用端口 public $reloadable = true; public $reusePort = false; ;worker 3个回调接口 public $onWorkerStart = null; public $onWorkerStop = null; public $onWorkerReload = null; ;connect 6个回调接口 public $onConnect = null; public $onMessage = null; public $onClose = null; public $onError = null; public $onBufferFull = null; public $onBufferDrain = null; ;传输层协议,应用层协议 public $transport = 'tcp'; public $protocol = ''; ;所有连接 public $connections = array(); ;自动加载根目录 protected $_autoloadRootPath = ''; ;是否守护进程模式 public static $daemonize = false; ;输出文件,pid文件,日志文件 public static $stdoutFile = '/dev/null'; public static $pidFile = ''; public static $logFile = ''; ; 全局事件循环 public static $globalEvent = null; ; 主进程id protected static $_masterPid = 0; ; 监听socket protected $_mainSocket = null; ; socket名称,socket上下文选项 protected $_socketName = ''; protected $_context = null; ;workers实例数组,worker进程id数组 protected static $_workers = array(); protected static $_pidMap = array(); ;等待重启worker进程数组,worker的pid与进程编号映射 protected static $_pidsToRestart = array(); protected static $_idMap = array(); ;当前状态 protected static $_status = self::STATUS_STARTING; ;workername最大长度,socketname最大长度,username最大长度 protected static $_maxWorkerNameLength = 12; protected static $_maxSocketNameLength = 12; protected static $_maxUserNameLength = 12; ;状态文件,启动文件 protected static $_statisticsFile = ''; protected static $_startFile = ''; ;worker进程状态信息格式, protected static $_globalStatistics ;可选事件循环,当前事件循环名称 protected static $_availableEventLoops protected static $_eventLoopName ;内置协议 protected static $_builtinTransports ~~~ ### 2 成员函数 ~~~ 1 setProcessTitle() 设置进程名称 2 initId() 初始化$_idMap 3 log() 日志记录 4 getSocketName() 获取socket名称 5 getCurrentUser() 获取当前用户 6 listen() 启动监听端口 7 signalHandler() 信号处理函数 8 stopAll() 停止所有 9 reload() 重载 10 writeStatisticsToStatusFile() 状态信息 11 getAllWorkerPids() 获取worker的pid 12 forkOneWorker() 创建一个worker进程 13 getId() 获取workerid 14 setUserAndGroup() 设置用户信息 15 run() worker启动 16 getEventLoopName() 获取事件循环名称 17 reinstallSignal() worker信号处理注册 18 acceptConnection() 创建一个连接 19 acceptUdpConnection() upd数据包 20 stop() worker子进程关闭 ~~~ ## 2 函数分析 ### 1 setProcessTitle() `protected static function setProcessTitle($title) 设置进程名称` > $title:进程名称 调用cli_set_process_title()或者setproctitle() ### 2 initId() `protected static function initId() 初始化$_idMap` 将workers的pid与workers的进程编号关联到$_idMap ### 3 log() `protected static function log($msg) 日志信息` > $msg:待记录信息 debug模式 直接输出 daemoniz模式 输出到日志文件 ### 4 getSocketName() `public function getSocketName() 获取socketname` 将第一个字母小写返回,或者返回none ### 5 getCurrentUser() ~~~ protected static function getCurrentUser() 当前进程的用户 ~~~ 调用posix_getpwuid() 获取当前进程的用户信息 ### 6 listen()[重点] `public function listen() 启动端口监听` ~~~ if (!$this->_socketName || $this->_mainSocket) { return; } ~~~ 检查_socketName与_mainSocket参数。 ~~~ Autoloader::setRootPath($this->_autoloadRootPath); ~~~ 注册自动加载根目录 ~~~ $local_socket = $this->_socketName; list($scheme, $address) = explode(':', $this->_socketName, 2); ~~~ 解析$_socketName为协议$scheme,和监听地址$address ~~~ if (!isset(self::$_builtinTransports[$scheme])) { $scheme = ucfirst($scheme); $this->protocol = '\\Protocols\\' . $scheme; if (!class_exists($this->protocol)) { $this->protocol = "\\Workerman\\Protocols\\$scheme"; if (!class_exists($this->protocol)) { throw new Exception("class \\Protocols\\$scheme not exist"); } } $local_socket = $this->transport . ":" . $address; } else { $this->transport = self::$_builtinTransports[$scheme]; } ~~~ 应用层协议检测与初始化 协议有关 见 另 :Protocols协议 `$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;` 协议标识字段获取 ~~~ if ($this->reusePort) { stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1); } ~~~ 端口复用选项设置 ~~~ if ($this->transport === 'unix') { umask(0); list(, $address) = explode(':', $this->_socketName, 2); if (!is_file($address)) { register_shutdown_function(function () use ($address) { @unlink($address); }); } } $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); if (!$this->_mainSocket) { throw new Exception($errmsg); } ~~~ >[info] Uinx套接字协议 首先检查$address是否是文件 然后创建unix套接字服务socket ~~~ if (function_exists('socket_import_stream') && $this->transport === 'tcp') { $socket = socket_import_stream($this->_mainSocket); @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); } stream_set_blocking($this->_mainSocket, 0); ~~~ >[info] tcp协议处理 ~~~ if (self::$globalEvent) { if ($this->transport !== 'udp') { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } } ~~~ 注册事件监听器 事件相关见 另:Event事件 ### 7 signalHandler()[重点] `public static function signalHandler($signal) 主进程信号处理转发` > $signal:待处理信号 根据$signal,分别调用stopAll(),reload()或者 writeStatisticsToStatusFile() ### 8 stopAll()[重点] `public static function stopAll() 主进程stop信号处理` ~~~ self::$_status = self::STATUS_SHUTDOWN; ~~~ 设置当前状态为关闭状态 ~~~ if (self::$_masterPid === posix_getpid()) { self::log("Workerman[" . basename(self::$_startFile) . "] Stopping ..."); $worker_pid_array = self::getAllWorkerPids(); // Send stop signal to all child processes. foreach ($worker_pid_array as $worker_pid) { posix_kill($worker_pid, SIGINT); Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL), false); } } ~~~ 主进程关闭处理 记录关闭信息 获取所有worker子进程id 发送关闭信号到所有worker子进程 ~~~ else { foreach (self::$_workers as $worker) { $worker->stop(); } exit(0); } ~~~ 子进程关闭处理 直接关闭 ### 9 reload() `protected static function reload() 进程重载` `if (self::$_masterPid === posix_getpid()) {}` >[info] 主进程重载 ~~~ if (self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN) { self::log("Workerman[" . basename(self::$_startFile) . "] reloading"); self::$_status = self::STATUS_RELOADING; } ~~~ 设置当前状态为重载状态 ~~~ $reloadable_pid_array = array(); foreach (self::$_pidMap as $worker_id => $worker_pid_array) { $worker = self::$_workers[$worker_id]; if ($worker->reloadable) { foreach ($worker_pid_array as $pid) { $reloadable_pid_array[$pid] = $pid; } } else { foreach ($worker_pid_array as $pid) { // Send reload signal to a worker process which reloadable is false. posix_kill($pid, SIGUSR1); } } } ~~~ 发送reload信号到子进程 `self::$_pidsToRestart = array_intersect(self::$_pidsToRestart, $reloadable_pid_array); ` 获取所有等待重载进程id ~~~ if (empty(self::$_pidsToRestart)) { if (self::$_status !== self::STATUS_SHUTDOWN) { self::$_status = self::STATUS_RUNNING; } return; } ~~~ 检测是否完全重载 ~~~ $one_worker_pid = current(self::$_pidsToRestart); posix_kill($one_worker_pid, SIGUSR1); ~~~ 发送reload信号到没有完成重载的子进程 ~~~ $worker = current(self::$_workers); if ($worker->onWorkerReload) { try { call_user_func($worker->onWorkerReload, $worker); } catch (\Exception $e) { echo $e; exit(250); } } if ($worker->reloadable) { self::stopAll(); } ~~~ >[info] 子进程重载 回调onWorkerReload函数 不可重载则关闭所有 ### 10 protected static function writeStatisticsToStatusFile() `if (self::$_masterPid === posix_getpid()) {}` 主进程状态信息保存 子进程状态新保存 ### 20 stop() `public function stop() 关闭worker子进程` 回调onWorkerStop 移除监听器 关闭socket描述符 ## 3 函数关系