企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
* mqtt是啥?我的博客有写这个东西:[传送门](https://www.fenxiangy.com/note/mqttprotocol.html) ![此处输入图片的描述](https://image-static.segmentfault.com/268/168/268168973-5abaf31b61add_articlex "此处输入图片的描述") php想要实现mqtt需要使用到php中的[socket](https://so.csdn.net/so/search?q=socket&spm=1001.2101.3001.7020)函数; [`socket函数是什么?`](https://www.cnblogs.com/aipiaoborensheng/p/6708963.html) 此次使用的是网上开源mqtt案例:其中使用的是 stream\_socket\_xxxx 系列函数 [`什么是stream_socket_xxxx系列函数`](https://segmentfault.com/q/1010000005685588) 大概意思是: > 正如你所指出的,'stream'是PHP核心(内置的,始终可用),而'套接字'是很少包含的扩展的一部分。除此之外,它们几乎完全相同。您可以同时使用TCP和UDP两种流,也可以使用阻塞和非阻塞模式,这些模式涵盖了所有用例的99%。 > > 我能想到的唯一常见的例外是ICMP。例如,'ping'。但是,看起来目前还没有一种安全的方式来从PHP执行ICMP。这种调用需要通过套接字扩展来实现SOCK\_RAW,这需要执行“root”权限。此外,并非所有路由器都会在TCP,UDP和ICMP之外路由其他数据包类型。这限制了套接字扩展的实用性。 ### MQTT类代码: ~~~php /* phpMQTT */ class Mqtt { private $socket; /* holds the socket */ private $msgid = 1; /* counter for message id */ public $keepalive = 10; /* default keepalive timmer */ public $timesinceping; /* host unix time, used to detect disconects */ public $topics = array(); /* used to store currently subscribed topics */ public $debug = false; /* should output debug messages */ public $address; /* broker address */ public $port; /* broker port */ public $clientid; /* client id sent to brocker */ public $will; /* stores the will of the client */ private $username; /* stores username */ private $password; /* stores password */ public $cafile; function __construct($address, $port, $clientid, $cafile = NULL){ $this->broker($address, $port, $clientid, $cafile); } /* sets the broker details */ function broker($address, $port, $clientid, $cafile = NULL){ $this->address = $address; $this->port = $port; $this->clientid = $clientid; $this->cafile = $cafile; } function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){ while($this->connect($clean, $will, $username, $password)==false){ sleep(10); } return true; } /* connects to the broker inputs: $clean: should the client send a clean session flag */ function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ if($will) $this->will = $will; if($username) $this->username = $username; if($password) $this->password = $password; if ($this->cafile) { $socketContext = stream_context_create(["ssl" => [ "verify_peer_name" => true, "cafile" => $this->cafile ]]); $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } else { $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT); } if (!$this->socket ) { if($this->debug) error_log("stream_socket_create() $errno, $errstr \n"); return false; } stream_set_timeout($this->socket, 5); stream_set_blocking($this->socket, 0); $i = 0; $buffer = ""; $buffer .= chr(0x00); $i++; $buffer .= chr(0x06); $i++; $buffer .= chr(0x4d); $i++; $buffer .= chr(0x51); $i++; $buffer .= chr(0x49); $i++; $buffer .= chr(0x73); $i++; $buffer .= chr(0x64); $i++; $buffer .= chr(0x70); $i++; $buffer .= chr(0x03); $i++; //No Will $var = 0; if($clean) $var+=2; //Add will info to header if($this->will != NULL){ $var += 4; // Set will flag $var += ($this->will['qos'] << 3); //Set will qos if($this->will['retain']) $var += 32; //Set will retain } if($this->username != NULL) $var += 128; //Add username to header if($this->password != NULL) $var += 64; //Add password to header $buffer .= chr($var); $i++; //Keep alive $buffer .= chr($this->keepalive >> 8); $i++; $buffer .= chr($this->keepalive & 0xff); $i++; $buffer .= $this->strwritestring($this->clientid,$i); //Adding will to payload if($this->will != NULL){ $buffer .= $this->strwritestring($this->will['topic'],$i); $buffer .= $this->strwritestring($this->will['content'],$i); } if($this->username) $buffer .= $this->strwritestring($this->username,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i); $head = " "; $head{0} = chr(0x10); $head{1} = chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer); $string = $this->read(4); if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ if($this->debug) echo "Connected to Broker\n"; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", ord($string{0}),ord($string{3}))); return false; } $this->timesinceping = time(); return true; } /* read: reads in so many bytes */ function read($int = 8192, $nb = false){ // print_r(socket_get_status($this->socket)); $string=""; $togo = $int; if($nb){ return fread($this->socket, $togo); } while (!feof($this->socket) && $togo>0) { $fread = fread($this->socket, $togo); $string .= $fread; $togo = $int - strlen($string); } return $string; } /* subscribe: subscribes to topics */ function subscribe($topics, $qos = 0){ $i = 0; $buffer = ""; $id = $this->msgid; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; foreach($topics as $key => $topic){ $buffer .= $this->strwritestring($key,$i); $buffer .= chr($topic["qos"]); $i++; $this->topics[$key] = $topic; } $cmd = 0x80; //$qos $cmd += ($qos << 1); $head = chr($cmd); $head .= chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer, $i); $string = $this->read(2); $bytes = ord(substr($string,1,1)); $string = $this->read($bytes); } /* ping: sends a keep alive ping */ function ping(){ $head = " "; $head = chr(0xc0); $head .= chr(0x00); fwrite($this->socket, $head, 2); if($this->debug) echo "ping sent\n"; } /* disconnect: sends a proper disconect cmd */ function disconnect(){ $head = " "; $head{0} = chr(0xe0); $head{1} = chr(0x00); fwrite($this->socket, $head, 2); } /* close: sends a proper disconect, then closes the socket */ function close(){ $this->disconnect(); stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } /* publish: publishes $content on a $topic */ function publish($topic, $content, $qos = 0, $retain = 0){ $i = 0; $buffer = ""; $buffer .= $this->strwritestring($topic,$i); //$buffer .= $this->strwritestring($content,$i); if($qos){ $id = $this->msgid++; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; } $buffer .= $content; $i+=strlen($content); $head = " "; $cmd = 0x30; if($qos) $cmd += $qos << 1; if($retain) $cmd += 1; $head{0} = chr($cmd); $head .= $this->setmsglength($i); fwrite($this->socket, $head, strlen($head)); fwrite($this->socket, $buffer, $i); } /* message: processes a recieved topic */ function message($msg){ $tlen = (ord($msg{0})<<8) + ord($msg{1}); $topic = substr($msg,2,$tlen); $msg = substr($msg,($tlen+2)); $found = 0; foreach($this->topics as $key=>$top){ if( preg_match("/^".str_replace("#",".*", str_replace("+","[^\/]*", str_replace("/","\/", str_replace("$",'\$', $key))))."$/",$topic) ){ if(is_callable($top['function'])){ call_user_func($top['function'],$topic,$msg); $found = 1; } } } if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n"; } /* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){ if(1){ $sockets = array($this->socket); $w = $e = NULL; $cmd = 0; //$byte = fgetc($this->socket); if(feof($this->socket)){ if($this->debug) echo "eof receive going to reconnect for good measure\n"; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } $byte = $this->read(1, true); if(!strlen($byte)){ if($loop){ usleep(100000); } }else{ $cmd = (int)(ord($byte)/16); if($this->debug) echo "Recevid: $cmd\n"; $multiplier = 1; $value = 0; do{ $digit = ord($this->read(1)); $value += ($digit & 127) * $multiplier; $multiplier *= 128; }while (($digit & 128) != 0); if($this->debug) echo "Fetching: $value\n"; if($value) $string = $this->read($value); if($cmd){ switch($cmd){ case 3: $this->message($string); break; } $this->timesinceping = time(); } } if($this->timesinceping < (time() - $this->keepalive )){ if($this->debug) echo "not found something so ping\n"; $this->ping(); } if($this->timesinceping<(time()-($this->keepalive*2))){ if($this->debug) echo "not seen a package in a while, disconnecting\n"; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } } return 1; } /* getmsglength: */ function getmsglength(&$msg, &$i){ $multiplier = 1; $value = 0 ; do{ $digit = ord($msg{$i}); $value += ($digit & 127) * $multiplier; $multiplier *= 128; $i++; }while (($digit & 128) != 0); return $value; } /* setmsglength: */ function setmsglength($len){ $string = ""; do{ $digit = $len % 128; $len = $len >> 7; // if there are more digits to encode, set the top bit of this digit if ( $len > 0 ) $digit = ($digit | 0x80); $string .= chr($digit); }while ( $len > 0 ); return $string; } /* strwritestring: writes a string to a buffer */ function strwritestring($str, &$i){ $ret = " "; $len = strlen($str); $msb = $len >> 8; $lsb = $len % 256; $ret = chr($msb); $ret .= chr($lsb); $ret .= $str; $i += ($len+2); return $ret; } function printstr($string){ $strlen = strlen($string); for($j=0;$j<$strlen;$j++){ $num = ord($string{$j}); if($num > 31) $chr = $string{$j}; else $chr = " "; printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr); } } } ~~~ ### 实现部分 #### 发送到主题 ~~~php // 发送给订阅号信息,创建socket,无sam队列 $server = "127.0.0.1"; // 服务代理地址(mqtt服务端地址) $port = 1883; // 通信端口 $username = ""; // 用户名(如果需要) $password = ""; // 密码(如果需要 $client_id = "clientx9293670xxctr"; // 设置你的连接客户端id $mqtt = new Mqtt($server, $port, $client_id); //实例化MQTT类 if ($mqtt->connect(true, NULL, $username, $password)) { //如果创建链接成功 $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); // 发送到 xxx3809293670ctr 的主题 一个信息 内容为 setr=3xxxxxxxxx Qos 为 0 $mqtt->close(); //发送后关闭链接 } else { echo "Time out!\n"; } ~~~ #### 订阅主题 ~~~perl /*// 订阅信息,接收一个信息后退出 $server = "127.0.0.1"; // 服务代理地址(mqtt服务端地址) $port = 1883; // 通信端口 $username = ""; // 用户名(如果需要) $password = ""; // 密码(如果需要 $client_id = "clientx9293670xxctr"; // 设置你的连接客户端id $mqtt = new Mqtt($server, $port, $client_id); if(!$mqtt->connect(true, NULL, $username, $password)) { //链接不成功再重复执行监听连接 exit(1); } $topics['SN69143809293670state'] = array("qos" => 0, "function" => "procmsg"); // 订阅主题为 SN69143809293670state qos为0 $mqtt->subscribe($topics, 0); while($mqtt->proc()){ } //死循环监听 $mqtt->close(); function procmsg($topic, $msg){ //信息回调函数 打印信息 echo "Msg Recieved: " . date("r") . "\n"; echo "Topic: {$topic}\n\n"; echo "\t$msg\n\n"; $xxx = json_decode($msg); var_dump($xxxxxx->aa); die; } ~~~ 这是php实现方法,如果用php做发送端还是不错的.但是