企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 在 Laravel 中使用 Workerman GatewayWorker 进行 socket 长连接通讯和TCP文件传输 初次接触[GatewayWorker框架](http://doc2.workerman.net/),先阅读示例:Linux系统快速开始(从一个精简的聊天demo开始)。查看文件目录结构 ~~~ GatewayWorker ├── Applications // 这里是所有开发者应用项目 │ └── YourApp // 其中一个项目目录,目录名可以自定义 │ ├── Events.php // 开发者只需要关注这个文件 │ ├── start_gateway.php // gateway进程启动脚本,包括端口号等设置 │ ├── start_businessworker.php // businessWorker进程启动脚本 │ └── start_register.php // 注册服务启动脚本 │ ├── start.php // 全局启动脚本,此脚本会依次加载Applications/项目/start_*.php启动脚本 │ └── vendor // GatewayWorker框架和Workerman框架源码目录,此目录开发者不用关心 ~~~ 在 Laravel 框架中使用,有两种方式: * 方式一:根据示例,复制`Applications`目录下的文件,到 Laravel 框架到`app`目录下,可以自定义新目录,并且修改*start.php*文件中的相应的加载位置。 * 方式二:使用 Laravel 框架提供的`php artisan`命令工具来做启动。把`start.php`、`start_gateway.php`、`start_register.php`、`start_businessworker.php`四个文件内容,合并在一起。 ### 我选的是第二种, 第一种情况下 workerman 里面我用不了laravel 代码,不知道是什么原因,所以我只直接使用第二种。 ## 1、安装 GatewayWorker ~~~ composer require workerman/gateway-worker composer require workerman/gatewayclient ~~~ ## 2、创建 Workerman 启动文件 创建一个 artisan 命令行工具来启动 Socket 服务端,在`app/Console/Commands`目录下建立命令行文件。 ~~~ php artisan make:command GatewayworkerCommand ~~~ app/Console/Commands/GatewayworkerCommand 内容如下: ~~~ <?php namespace App\Console\Commands; use Illuminate\Console\Command; use Illuminate\Support\Facades\Log; use Workerman\Worker; use GatewayWorker\Gateway; use GatewayWorker\Register; use GatewayWorker\BusinessWorker; use App\Gatewayworker\Events; class GatewayworkerCommand extends Command { /** 命令 php artisan workerman start * @var string */ protected $signature = 'Workerman {action} {--d}'; protected $description = 'Start a Workerman server'; public function __construct() { parent::__construct(); } public function handle() { global $argv; if (!in_array($action = $this->argument('action'), ['start', 'stop', 'restart', 'status'])) { $this->error('Error Arguments'); exit; } $argv[0] = 'Workerman'; $argv[1] = $action; $argv[2] = $this->option('d') ? '-d' : '';//该参数是以daemon(守护进程)方式启动 $this->startRun(); } public function startRun(){ $this->startGateWay(); $this->startBusinessWorker(); $this->startRegister(); Worker::$pidFile = '/www/wwwroot/admin/vendor/workerman/'.$this->worker_name.'.pid'; Worker::runAll(); } private function startBusinessWorker() { $worker = new BusinessWorker(); $worker->name = 'BusinessWorker'; $worker->count = 2; $worker->registerAddress = '127.0.0.1:1236'; $worker->eventHandler = Events::class; } private function startGateWay() { $context = array( 'ssl' => array( // 使用绝对路径 'local_cert' => '/www/wwwroot/admin/app/ssl/server.pem' , 'local_pk' => '/www/wwwroot/admin/app/ssl/server.key' , 'verify_peer' => false, 'allow_self_signed' => true ) ); //$gateway = new Gateway("tcp://0.0.0.0:5901"); $gateway = new Gateway("websocket://0.0.0.0:5901", $context ); $gateway->transport = 'ssl'; $gateway->name = 'Gateway'; $gateway->count = 2; $gateway->lanIp = '127.0.0.1'; $gateway->registerAddress = '127.0.0.1:1236'; // 心跳间隔 $gateway->pingInterval = 10; // 发给客户端你的心跳数据 $gateway->pingData = '{"type":"ping"}'; } private function startRegister() { new Register('text://0.0.0.0:1236'); } } ~~~ ## 3、创建事件监听文件 创建一个*app/Gatewayworker/Events.php*文件来监听处理 workman 的各种事件。 ~~~ <?php namespace App\Gatewayworker; use App\Models\User; use GatewayWorker\Lib\Gateway; use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Storage; use Workerman\Timer; class Events { static $fp = ""; /*$fp = fopne()*/ static $i = 0; /*循环次数*/ public static function onWorkerStart($businessWorker) { // echo "onWorkerStart\r\n"; } public static function onConnect($client_id) { // 连接到来后,定时30秒关闭这个链接,需要30秒内发认证并删除定时器阻止关闭连接的执行 $auth_timer_id = Timer::add(10, function($client_id){ Gateway::closeClient($client_id); }, array($client_id), false); Gateway::updateSession($client_id, array('auth_timer_id' => $auth_timer_id)); /*客户端连接到时候触发*/ Gateway::sendToClient($client_id, json_encode([ 'type' =>'init', 'client_id' => $client_id, ])); } public static function onMessage($client_id, $data) { $_SESSION = Gateway::getSession($client_id); /*验证用户*/ /*第一次进入的用户*/ if (!isset($_SESSION['token'])) { $data = json_decode($data, true); /*验证token,其他属性是否合法*/ if (!isset($data['token'])) { self::gateSend($client_id, 'socket已断开', 'error'); Gateway::closeClient($client_id); } else { // 认证成功,删除 30关闭连接定 的时器 $_SESSION['token'] = $data['token']; $_SESSION['file_name'] = $data['file_name']; $_SESSION['file_size'] = $data['file_size']; $_SESSION['service_type'] = $data['service_type']; Timer::del($_SESSION['auth_timer_id']); self::gateSend($client_id, '连接成功'); } }else{ /*第二次直接发送文件*/ if ($data == 'pong'){ return ; } $file_patch = 'temp/file_trans'; /*public下面的文件存储目录*/ $fileSize = self::saveFile($file_patch, $data); /*file_put end*/ if ($fileSize >= $_SESSION['file_size']){ Log::info('客户端发送的:'. $_SESSION['file_size']); Log::info('我计算的:'. $fileSize); $file = asset($file_patch.'/'.$_SESSION['file_name']); usleep(100000); self::closeFile(); self::gateSend($client_id,'文件上传成功','success',['progress'=>100,'file'=>$file]); Gateway::closeClient($client_id); }else{ $progress = (int) floor(number_format($fileSize / $_SESSION['file_size'] * 100)); self::gateSend($client_id,'文件上传中','success',['progress'=>$progress]); } } } public static function onClose($client_id) { self::gateSend($client_id,'tcp连接断开','error'); } /** * @param $client_id * @param $msg * @param $status * @param $options * @return void */ public static function gateSend($client_id, $msg = '', $status = 'success', $options = []){; Gateway::sendToClient($client_id, json_encode( array_merge([ 'message' => $msg, 'status' => $status ],$options) )); } /** * @param $file_patch * @param $content * @return false|int */ private static function saveFile($file_patch = '', $content){ $file_patch = public_path($file_patch); is_dir($file_patch) or mkdir($file_patch); $save_patch = $file_patch . '/' . $_SESSION['file_name']; /*一次打开多次写入*/ if (self::$i == 0){ if (is_file($save_patch)) @unlink($save_patch); self::$fp = fopen($save_patch,'a'); /*写入方式打开,将文件指针指向文件末尾进行写入,如果文件不存在则尝试创建之*/ } $file_data = $content; //$length = file_put_contents($save_patch, $file_data, FILE_APPEND); fwrite(self::$fp, $file_data); self::$i++; /*文件大小计算*/ $file = $file_patch . '/' . $_SESSION['file_name']; return filesize($file); } } ~~~ ## 4、开启服务 在命令行里面执行,支持的命令有`start|stop|restart|status`,其中`--d`的意思是 daemon 模式,后台守护进程。 ~~~ php artisan workerman start ~~~ ~~~ Workerman[artisan] start in DEBUG mode -------------------------------------------- WORKERMAN --------------------------------------------- Workerman version:4.0.20 PHP version:8.0.12 --------------------------------------------- WORKERS ---------------------------------------------- proto user worker listen processes status tcp root Gateway websocket://0.0.0.0:5901 2 [OK] tcp root BusinessWorker none 4 [OK] tcp root Register text://0.0.0.0:1236 1 [OK] ---------------------------------------------------------------------------------------------------- Press Ctrl+C to stop. Start success. onWorkerStart onWorkerStart onWorkerStart onWorkerStart ~~~ ## 5、在浏览器中测试 为了websocket 测试第三步Events.php重新配置 ~~~ <?php namespace App\Gatewayworker; use GatewayWorker\Lib\Gateway; class Events { public static function onWorkerStart($businessWorker) { echo "onWorkerStart\r\n"; } public static function onConnect($client_id) { Gateway::sendToClient($client_id, json_encode(['type' => 'onConnect', 'client_id' => $client_id])); echo "onConnect\r\n"; } public static function onWebSocketConnect($client_id, $data) { echo "onWebSocketConnect\r\n"; } public static function onMessage($client_id, $message) { Gateway::sendToClient($client_id, json_encode(['type' => 'onMessage', 'client_id' => $client_id, 'name' => json_decode($message)->name])); echo "onMessage\r\n"; } public static function onClose($client_id) { echo "onClose\r\n"; } } ~~~ 注意:这里是websocket情况下浏览器才输出,websocket 测试,注意第二步的,我的域名是https,所以我设置了https证书 ~~~ $gateway = new Gateway("websocket://0.0.0.0:5901", $context ); $gateway->transport = 'ssl'; ~~~ 用 chrome 浏览器打开此 Laravel 框架建设的网站,打开控制台(快捷键按F12)。在*Console*里输入: ~~~ var ws = new WebSocket("wss://127.0.0.1:5901"); ws.onopen = function(e) { ws.send('{"name":"one","user_id":"111"}'); ws.send('{"name":"two","user_id":"222"}'); }; ws.onmessage = function(e) { console.log("收到服务端的消息:" + e.data); }; ws.onclose = function(e) { console.log("服务已断开" ); } ~~~ ![](https://img.kancloud.cn/6a/a4/6aa49f3e7231904851a3564f0225cc7c_544x436.png) 成功! 以下微信小程序tcp传输测试 pages/index/index.js ~~~ const tcp = wx.createTCPSocket(); const fs = wx.getFileSystemManager() let file = 'tts.mp3' const filePath = '/'+file; Page({ onLoad() { this.handle(); }, handle(){ tcp.connect({address: 'translate.api.ulunix.cn', port: 5901}) tcp.onMessage((res) => { let unit8Arr = new Uint8Array(res.message) let encodedString = String.fromCharCode.apply(null, unit8Arr) let decodedString = decodeURIComponent(escape((encodedString))) let res_data = JSON.parse(decodedString) console.log(res_data) if(res_data.type =='init'){ let uid = 45; fs.getFileInfo({ filePath: filePath, // 获取的临时或本地文件路径 success(res) { console.log(res.size); let data = {'token':uid,'file_name':file,'service_type':'file_trans','file_size':res.size}; tcp.write(JSON.stringify(data)) }, fail(err){ console.log(err); } }) } if(res_data.type =='ping'){ tcp.write('pong') } }) tcp.onError((res) => { console.log(res); }) tcp.onClose((res) => { console.log(res); }) }, //点击以后开始传输 tcp(){ fs.readFile({ filePath: filePath, // 获取的临时或本地文件路径 success(resd) { tcp.write(resd.data) }, }) } }) ~~~