🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
## **socket_select**(array **&$read**,array **&$write**,array **&$except**,int **$tv_sec** [,int **$tv_usec** = 0] ) :int 作用:获取read数组中活动的socket,并且把不活跃的从read数组中删除 接受套接字组成的数组 **$read**,并等待它们更改状态(就是**有新消息**到或者有客户端**连接**/**断开**时) 在套接字数组 $read 中最初应保有一个服务端监听套接字,每当该套接字可读时,就表示有一个用户发起了连接。此时你需要对该连接创建一个套接字,并加入到 $read 数组中 除了这个服务端监听套接字,客户端监听的套接字会变成可读的,用户套接字也会变成可读的,此时你就可以读取用户发来的数据了 select处于等待时,两个客户端中甲先发数据来,则socket_select会在readfds中保留甲的socket并往下运行,另一个客户端的socket就被丢弃了,所以再次循环时,变成只监听甲了,这个可以在新循环中把所有链接的客户端socket再次加进readfds中,则可以避免本程序的这个逻辑错误 **返回值**:<0表示有错误,其他值表示可操作的socket个数 第一个参数传入要检查的socket数组,函数返回后会变成包含可读取消息的socket的数组,第二百个和第三个是可写入(发送)数据的,第三个是出错的 第四个参数如果是null,表示知道出现可操作的socket,否则会持续阻塞度。0表示立即返回,其他值表示最多等待指定的秒数后返回 这是一个同步方法,必须得到响应之后才会继续下一步,常用在同步非阻塞IO 说明: * 1 新连接到来时,被监听的端口是活跃的,如果是新数据到来或者客户端关闭链接时,活跃的是对应的客户端socket而不是服务器上被监听的端口 * 2 如果客户端发来数据没有被读走,则socket_select将会始终显示客户端是活跃状态并将其保存在readfds数组中 * 3 如果客户端先关闭了,则必须手动关闭服务器上相对应的客户端socket,否则socket_select也始终显示该客户端活跃(这个道理跟"有新连接到来然后没有用socket_access把它读出来,导致监听的端口一直活跃"是一样的) 这个函数是同时接受多个连接的关键,我的理解它是为了阻塞程序继续往下执行和自动选择当前有活动的连接。 ``` $read = array($socket1, $socket2); $write = NULL; $except = NULL; $num_changed_sockets = socket_select($read, $write, $except, 0); if ($num_changed_sockets === false) { /* 错误处理 */ socket_last_error(); } else if ($num_changed_sockets > 0) { /*至少有一个套接字发生改变 */ } ``` ## **accept与select区别:** 一般经过创建套接字socket_socket()绑定socket_bind()以及socket_listen()之后,  就可以调用socket_select查看指定socket的状态和 socket_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)建立一个全新连接; 判断是否有客户端发起链接请求,   一般用select(),然后accept()。 应该先select,可以测一下超时。 如果先accept,select就没必要了。 还有就是select后,可以测一下资源,如果fd用完了,可以等归还。 直接的accept,发生这事这个连接就丢了。 ———————————————— ``` $readfds = array(); $writefds = array(); $sock = socket_create_listen(2000); //socket_set_nonblock($sock); // 非阻塞 //echo "sleep 10 second...\n"; //sleep(10); socket_getsockname($sock, $addr, $port); print "Server Listening on $addr:$port\n"; $readfds[(int)$sock]=$sock; $conn=socket_accept($sock); $readfds[]=$conn; $conn=socket_accept($sock); $readfds[]=$conn; $e = null; $t=100; $i=1; while(true){ echo "No.$i\n"; //当select处于等待时,两个客户端中甲先发数据来,则socket_select会在readfds中保留甲的socket并往下运行,另一个客户端的socket就被丢弃了,所以再次循环时,变成只监听甲了,这个可以在新循环中把所有链接的客户端socket再次加进readfds中,则可以避免本程序的这个逻辑错误 echo @socket_select($readfds, $writefds, $e, $t)."\n"; var_dump($readfds); if(in_array($sock, $readfds)){ echo "2000 port is activity"; $readfds[]=socket_accept($sock); } //将读取到的资源输出 foreach ($readfds as $s){ if($s!=$sock){ //新连接到来时,被监听的端口是活跃的,如果是新数据到来或者客户端关闭链接时,活跃的是对应的客户端socket而不是服务器上被监听的端口 //如果客户端发来数据没有被读走,则socket_select将会始终显示客户端是活跃状态并将其保存在readfds数组中 //如果客户端先关闭了,则必须手动关闭服务器上相对应的客户端socket,否则socket_select也始终显示该客户端活跃(这个道理跟"有新连接到来然后没有用socket_access把它读出来,导致监听的端口一直活跃"是一样的) $result=@socket_read($s, 1024,PHP_NORMAL_READ); if($result===false){ $err_code=socket_last_error(); $err_test=socket_strerror($err_code); echo "client ".(int)$s." has closed[$err_code:$err_test]\n"; //手动关闭客户端,最好清除一下$readfds数组中对应的元素 socket_shutdown($s); socket_close($s); }else{ echo $result; } } } usleep(3000000); $readfds[(int)$sock]=$sock; $i++; } //while无限循环,执行不到下面的代码 ``` 示例 ``` $port = 9050; // 创建类型为TCP / IP的流套接字 $sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 设置选项以重用端口 socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1); // 将套接字绑定到端口localhost地址的$port上 // 这意味着此端口上的所有连接现在都由我们负责发送/接收数据、断开连接等 socket_bind($sock, 0, $port); // 开始监听链接 socket_listen($sock); // 创建将与我们连接的所有客户端的列表 // 将侦听套接字添加到此列表 $clients = array($sock); while (true) { // 创建一个副本,这样$clients不会被socket_select修改 $read = $clients; // 获取所有要读取数据的客户端的列表 // 如果客户端没有数据,跳过此次循环转到下一个迭代 if (socket_select($read, $write = NULL, $except = NULL, 0) < 1) continue; // 检查是否有客户端尝试连接 if (in_array($sock, $read)) { // 接受客户端的链接,并将其套接字添加到$clients数组中 $newsock = socket_accept($sock); $clients[] = $newsock; // 向客户发送欢迎信息 socket_write($newsock, "目前有".(count($clients) - 1)." 个客户端链接到服务器\n"); //查询给定套接字的远程端,可能返回主机端口或者Unix文件系统路径(取决于其类型) socket_getpeername($newsock, $ip); echo "新的客户端连接: {$ip}\n"; // 从带数据的客户端数组中删除侦听套接字 $key = array_search($sock, $read); unset($read[$key]); } // 遍历所有要读取数据的客户端 foreach ($read as $read_sock) { // 一直读到换行符或1024字节 // 客户端断开连接时,socket_read会显示错误,因此请忽略错误消息 $data = @socket_read($read_sock, 1024, PHP_NORMAL_READ); // 检查客户端是否断开连接 if ($data === false) { // 删除$clients数组的客户端 $key = array_search($read_sock, $clients); unset($clients[$key]); echo "client disconnected.\n"; // 继续下一个要读取的客户端(如果有) continue; } // 删除尾部/开头的空白 $data = trim($data); // 检查删除空格后是否有任何数据 if (!empty($data)) { // 将此消息发送到$CLIENTS数组中的所有客户端(第一个客户端除外,它是一个侦听套接字) foreach ($clients as $send_sock) { // 如果我们从侦听套接字或客户端收到消息,请转到列表中的下一个 if ($send_sock == $sock || $send_sock == $read_sock) continue; // 将消息写入客户端-在消息末尾添加换行符 socket_write($send_sock, $data."\n"); } //foreach } } //foreach } // 关闭侦听套接字 socket_close($sock); ``` ———————————————— 公共聊天室的例子: ``` <?php class webSocket { /** * 服务端地址 * * @var [type] */ private $address; /** * 服务端绑定的端口号 * * @var [type] */ private $port; /** * 监听端口 * * @var [type] */ private $socket; public function __construct($address='127.0.0.1',$port=9501) { $socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP); socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); socket_bind($socket,$address,$port); socket_listen($socket); $this->socket=$socket; $this->port=$port; $this->address=$address; echo mb_convert_encoding('已启动websockt服务器地址:'.$address.';端口:'.$port.PHP_EOL,'gbk'); } public function run() { //将原客户端加入监听连接池 $sockets[]=$this->socket; //定义写入监听连接池 $write=null; //定义权限接受连接池 $except=null; //定义超时时间 $time_out=null; //启动循环阻塞任务 while(true) { //复制连接池 $changes=$sockets; //设置同步阻塞监听函数 socket_select($changes,$write,$except,$time_out); //监听端口可读后操作 foreach ($changes as $sock) { //如果监听到的是原端口 if($sock==$this->socket) { //读取错误 if(($client=socket_accept($sock))===false) { die('failed to accept socket: '.socket_strerror($sock)."\n"); } //获取客户端发送内容 $content=trim(socket_read($client,1024)); //执行http协议升级websocket $this->handshaking($client,$content); $id=rand(1000,9999); $message='id:'.$id.'客户端已加入连接池'; //通知全体客户端 $this->sendAll($sockets,$sock,$message); //客户端加入连接池 $sockets[$id]=$client; //服务端提示 echo mb_convert_encoding('id:'.$id.'客户端已加入连接池'.PHP_EOL,'gbk'); } //已经握手完毕 else { //接收数据 socket_recv($sock,$buf,1024,0); //群发消息 $str=''; foreach ($sockets as $k => $v) { if($sock==$v) { $str='来自 '.$k.' 的消息:'; break; } } $this->sendAll($sockets,$sock,$str.$this->message($buf)); } } } } /** * 升级协议 * * @param [type] $client * @param [type] $content * @return void */ public function handshaking($client,$content) { //定义头部信息 $headers=array(); if(preg_match('/Sec-WebSocket-Key:.*\r\n/',$content,$matchs)) { $headers['Sec-WebSocket-Key'] =trim(chop(str_replace('Sec-WebSocket-Key:',"",$matchs[0]))); } //设置返回头 $secKey = $headers['Sec-WebSocket-Key']; $websocket_accept=base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))); $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . "WebSocket-Origin: $this->address\r\n" . "WebSocket-Location: ws://$this->address:$this->port/websocket/websocket\r\n". "Sec-WebSocket-Accept:$websocket_accept\r\n\r\n"; //写入缓冲 return socket_write($client,$upgrade,strlen($upgrade)); } /** * 解析接收数据 * @param $buffer * @return null|string */ public function message($buffer) { $len = $masks = $data = $decoded = null; $len = ord($buffer[1]) & 127; if ($len === 126) { $masks = substr($buffer, 4, 4); $data = substr($buffer, 8); } else if ($len === 127) { $masks = substr($buffer, 10, 4); $data = substr($buffer, 14); } else { $masks = substr($buffer, 2, 4); $data = substr($buffer, 6); } for ($index = 0; $index < strlen($data); $index++) { $decoded .= $data[$index] ^ $masks[$index % 4]; } return $decoded; } /** * 消息广播 * * @param [type] $socket * @param [type] $sock * @param [type] $message * @return void */ public function sendAll($socket,$sock,$message) { foreach ($socket as $so) { if($so!=$sock && $so!=$this->socket) { $this->send($so,$message); } } } /** * 发送数据 * @param $newClinet 新接入的socket * @param $msg 要发送的数据 * @return int|string */ public function send($clinet, $msg){ $msg = $this->frame($msg); socket_write($clinet, $msg, strlen($msg)); } /** * 处理数据帧 * * @param [type] $s * @return void */ public function frame($s) { $a = str_split($s, 125); if (count($a) == 1) { return "\x81" . chr(strlen($a[0])) . $a[0]; } $ns = ""; foreach ($a as $o) { $ns .= "\x81" . chr(strlen($o)) . $o; } return $ns; } } $socketobj=new webSocket(); $socketobj->run(); ``` 前端: ``` <!doctype html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width,initial-scale=1, maximum-scale=1, user-scalable=no"> <title>websocket</title> </head> <body> <input id="text" value=""> <input type="submit" value="send" onclick="start()"> <div id="msg"></div> <script> /** *0:未连接 *1:连接成功,可通讯 *2:正在关闭 *3:连接已关闭或无法打开 */ //创建一个webSocket 实例 var webSocket = new WebSocket("ws://127.0.0.1:9501"); webSocket.onerror = function (event){ onError(event); }; // 打开websocket webSocket.onopen = function (event){ onOpen(event); }; //监听消息 webSocket.onmessage = function (event){ onMessage(event); }; webSocket.onclose = function (event){ onClose(event); } //关闭监听websocket function onError(event){ document.getElementById("msg").innerHTML = "<p>close</p>"; console.log("error"+event.data); }; function onOpen(event){ console.log("open:"+sockState()); document.getElementById("msg").innerHTML = "<p>Connect to Service</p>"; }; function onMessage(event){ console.log("onMessage"); document.getElementById("msg").innerHTML += "<p>response:"+event.data+"</p>" }; function onClose(event){ document.getElementById("msg").innerHTML = "<p>close</p>"; console.log("close:"+sockState()); webSocket.close(); } function sockState(){ var status = ['未连接','连接成功,可通讯','正在关闭','连接已关闭或无法打开']; return status[webSocket.readyState]; } function start(event){ console.log(webSocket); var msg = document.getElementById('text').value; document.getElementById('text').value = ''; console.log("send:"+sockState()); console.log("msg="+msg); webSocket.send("msg="+msg); document.getElementById("msg").innerHTML += "<p>准备请求的数据:"+msg+"</p>" }; function close(event){ webSocket.close(); } </script> </body> </html> ```