ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
RPC通信 [编辑本页] Swoole框架提供的RPC服务器支持了单连接并发、PHP-FPM下长连接维持等特性。在车轮互联大规模应用,构建了4层架构的服务化架构。 服务器端代码:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Protocol/RPCServer.php 客户端代码:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Client/RPC.php 与Http协议对比 很多企业使用Http Rest实现RPC通信,实现简单可以利用到很多现成的工具和方案。但是Http通信协议存在2个严重的缺陷。 Http不支持单连接并发,如果要同时并发很多请求,必须创建大量TCP连接。如果php-fpm开启500个进程,每此需要128个并发,那么就需要创建64000个TCP连接。 Http对长连接支持不够好,很多Http程序都是设计为短连接的,在请求时创建TCP连接、请求结束时close,这会带来额外的网络通信消耗 Swoole框架的RPC客户端使用16字节固定包头+包体的通信方式,支持单连接并发、支持在php-fpm开启长连接。 php-fpm长连接 在php-fpm中维持TCP长连接主要借助swoole扩展提供的SWOOLE_KEEP选项,客户端设置此选项后,在请求结束时不会关闭连接,新的请求到来后可以复用TCP连接。另外底层内置了长连接检测的能力。 在执行$client->connect()自动检测连接是否可用,如果复用的连接已经失效,底层会重新创建一个新的TCP长连接。 在执行$client->connect()自动清理垃圾数据,避免上一次客户端超时残留的数据导致服务异常 $socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, WOOLE_SOCK_SYNC); $socket->set(array( 'open_length_check' => true, 'package_max_length' => $this->packet_maxlen, 'package_length_type' => 'N', 'package_body_offset' => RPCServer::HEADER_SIZE, 'package_length_offset' => 0, )); TCP包头 struct { uint32_t length; uint16_t reserved_1; //保留字段1 uint8_t reserved_2; //保留字段2 uint8_t type; uint32_t uid; uint32_t serid; char body[0]; } length:包体的长度 reserved_1 保留的16位字段 reserved_2 保留的8位字段 type:包体的打包格式,低4位用于表示包体打包的格式 =1使用PHP串化格式,=2使用JSON格式,其他格式暂未支持,高4位用于保存压缩格式,如gzip uid:用户自定义的ID,保留字段 serid:Request/Response 串号 包体格式 request {"call": "Service接口名称", "params": “接口参数列表”, "env": "相关环境信息"} call: 是指Service接口的名称,如车轮Service提供的接口 User\Info::get params:函数的参数列表,vector类型,params在底层会自动作为 User\Info::get 函数的参数传入调用,在PHP代码中相当于 call_user_func_array($call, $params) env:相关环境信息,map类型,客户端与服务器端可自由使用 response {"errno": "整数,服务器端返回码", "data": “接口返回值"} errno:错误码,正常调用为0 data:无固定格式,由 Service接口 返回值 决定 错误码列表 8001; //未就绪 8002; //连接服务器失败 8003; //服务器端超时 8004; //发送失败 8005; //server返回了错误码 8006; //解包失败了 8007; //错误的协议头 8008; //超过最大允许的长度 8009; //连接被关闭 9001; //错误的包头 9002; //请求包体长度超过允许的范围 9003; //服务器繁忙,超过处理能力 9204; //解包失败 9205; //参数错误 9206; //函数不存在 9207; //执行错误 9208 //不允许该服务器登录 9209 //认证不通过 9300 //被服务器踢掉了 Server端的实现中实现了打包格式的自适应,当发现调用端使用JSON格式时,Response包体也会打包为JSON。另外Swoole框架的RPC支持了gzip压缩,启用压缩后可以节约内网通信的流量。 单连接并发 客户端 请求串号就是单连接并发的秘诀了,客户端即使是同一个连接,也可以同时发出多个Request,这与Http协议是不同的,Http协议即使启用了Keep-Alive单个连接只能发出一次Request,必须等到服务器端发送Response才能发送下一个Request。RPC客户端收到Response会根据其中的串号,将不同的Response和Request对应起来。 有些Request可能会超时,RPC客户端通过对比请求ID可以判断出哪些Response可能是上次请求超时残留的数据,并进行丢弃处理。 服务器端 在车轮互联的RPC服务器中,大部分使用了同步阻塞模式,小部分使用了异步模式。 同步服务器的实现依赖swoole扩展提供的dispatch_mode=3选项,并设置worker_num为128。swoole底层实现了连接与请求分离,同一个连接不同的Request包会被分配到不同的Worker进程并发地进行处理。Response再由swoole底层逐个发送给客户端。服务器端也可以很好低支持单连接并发,即使只有一个TCP连接也可以利用到所有128个Worker进程的处理能力。 $server = new Swoole\Server('0.0.0.0', 8888); $server->set(array( 'worker_num' => 128, 'max_request' => 5000, 'dispatch_mode' => 3, 'open_length_check' => 1, 'package_max_length' => $AppSvr->packet_maxlen, 'package_length_type' => 'N', 'package_body_offset' => \Swoole\Protocol\RPCServer::HEADER_SIZE, 'package_length_offset' => 0, )); 串行调用 $res = $service->call('User\Info::unlock', '18958653669', 1); $result = $res->getResult(); //如果返回NULL,表示网络调用失败了,请检查$res->code $res = $service->call('User\Info::unlock', '18958653669', 1); $result = $res->getResult(); $res = $service->call('User\Info::unlock', '18958653669', 1); $result = $res->getResult(); 并行调用 $res1 = $service->call('User\Info::unlock', '18958653669', 1); $res2 = $service->call('User\Info::unlock', '18958653669', 1); $res3 = $service->call('User\Info::unlock', '18958653669', 1); //0.5表示500毫秒超时,$n表示成功返回的请求个数。如果少于发起的请求数,证明有个别请求超时了 $n = $service->wait(0.5); $result1 = $res1->getResult(); $result2 = $res2->getResult(); $result3 = $res3->getResult(); 实际上底层对于串行并行的处理方式是相同的,串行调用在执行getResult()时会自动wait一次,等待服务器端发送Response,RPC客户端的wait操作基于swoole_client_select实现。 function wait($timeout = 0.5) { $st = microtime(true); $success_num = 0; while (count($this->waitList) > 0) { $write = $error = $read = array(); foreach ($this->waitList as $obj) { /** * @var $obj RPC_Result */ if ($obj->socket !== null) { $read[] = $obj->socket; } } if (empty($read)) { break; } //去掉重复的socket Tool::arrayUnique($read); //等待可读事件 $n = $this->select($read, $write, $error, $timeout); if ($n > 0) { //可读 foreach($read as $connection) { $data = $this->recvPacket($connection); //socket被关闭了 if ($data === "") { foreach($this->waitList as $retObj) { if ($retObj->socket == $connection) { $retObj->code = RPC_Result::ERR_CLOSED; unset($this->waitList[$retObj->requestId]); $this->closeConnection($retObj->server_host, $retObj->server_port); } } continue; } elseif ($data === false) { continue; } $header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE)); //不在请求列表中,错误的请求串号 if (!isset($this->waitList[$header['serid']])) { trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING); continue; } $retObj = $this->waitList[$header['serid']]; //成功处理 $this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj); $success_num++; } } //发生超时 if ((microtime(true) - $st) > $timeout) { foreach ($this->waitList as $obj) { $obj->code = ($obj->socket->isConnected()) ? RPC_Result::ERR_TIMEOUT : RPC_Result::ERR_CONNECT; //执行after钩子函数 $this->afterRequest($obj); } //清空当前列表 $this->waitList = array(); return $success_num; } } //未发生任何超时 $this->waitList = array(); $this->requestIndex = 0; return $success_num; } $waitList 是所有Request的集合 多个Request使用的TCP连接可能是相同的几个,这里使用了Tool::arrayUnique进行去重 使用swoole_client_select等待Socket可读事件 在可读事件中调用recvPacket收包,并解析包头,收到Response时读取请求ID自动从waitList中移除Request 循环的默认会进行时间检测,发生超时或全部成功时退出,返回Response的数量