企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Server->task [TOC] 投递一个异步任务到`task_worker`池中。此函数是非阻塞的,执行完毕会立即返回。`Worker`进程可以继续处理新的请求。使用`Task`功能,必须先设置[task\_worker\_num](task\_worker\_num.md),并且必须设置`Server`的`onTask`和`onFinish`事件回调函数。 ~~~ int Server::task(mixed $data, int $dst_worker_id = -1) $task_id = $serv->task("some data"); //swoole-1.8.6或更高版本 $serv->task("taskcallback", -1, function (swoole_server $serv, $task_id, $data) { echo "Task Callback: "; var_dump($task_id, $data); }); ~~~ ## 参数 * `$data`要投递的任务数据,必须是可序列化的`PHP`变量 * `$dst_worker_id`可以制定要给投递给哪个`Task`进程,传入`ID`即可,范围是`0 - (serv->task_worker_num -1)` ## 返回值 * 调用成功,返回值为整数`$task_id`,表示此任务的`ID`。如果有`finish`回应,`onFinish`回调中会携带`$task_id`参数 * 调用失败,返回值为`false`,`$task_id`可能为`0`,因此必须使用`===`判断是否失败 ## 说明 * 未指定目标`Task`进程,调用`task`方法会判断`Task`进程的忙闲状态,底层只会向处于空闲状态的`Task`进程投递任务。如果所有`Task`进程均处于忙的状态,底层会轮询投递任务到各个进程。可以使用[server->stats](Server-stats.md)方法获取当前正在排队的任务数量。 * 1.8.6版本增加了第三个参数,可以直接设置`onFinish`函数,如果任务设置了回调函数,Task返回结果时会直接执行指定的回调函数,不再执行Server的`onFinish`回调 > `$dst_worker_id`在`1.6.11+`后可用,默认为随机投递 > `$task_id`是从`0-42亿`的整数,在当前进程内是唯一的 > `task`方法不能在`task`进程/用户自定义进程中调用 此功能用于将慢速的任务异步地去执行,比如一个聊天室服务器,可以用它来进行发送广播。当任务完成时,在task进程中调用`$serv->finish("finish")`告诉worker进程此任务已完成。当然`swoole_server->finish`是可选的。 task底层使用Unix Socket管道通信,是全内存的,没有IO消耗。单进程读写性能可达100万/s,不同的进程使用不同的管道通信,可以最大化利用多核。 > AsyncTask功能在1.6.4版本增加,默认不启动task功能,需要在手工设置task\_worker\_num来启动此功能 > task\_worker的数量在swoole\_server::set参数中调整,如task\_worker\_num => 64,表示启动64个进程来接收异步任务 ## 配置参数 swoole\_server->task/taskwait/finish 3个方法当传入的`$data`数据超过8K时会启用临时文件来保存。当临时文件内容超过`server->package_max_length`时底层会抛出一个警告。此警告不影响数据的投递,过大的Task可能会存在性能问题。 ~~~ WARN: task package is too big. ~~~ > server->package\_max\_length 默认为2M ## 注意事项 * 使用`task`必须为Server设置`onTask`和`onFinish`回调,否则`swoole_server->start`会失败 * `task`操作的次数必须小于`onTask`处理速度,如果投递容量超过处理能力,task会塞满缓存区,导致worker进程发生阻塞。worker进程将无法接收新的请求 * 使用`addProcess`添加的用户进程中无法使用`task`投递任务,请使用`sendMessage`接口与`Task`工作进程通信 ## 代码示例 ~~~ <?php $serv = new swoole_server("127.0.0.1", 9501, SWOOLE_BASE); $serv->set(array( 'worker_num' => 2, 'task_worker_num' => 4, )); $serv->on('Receive', function(swoole_server $serv, $fd, $from_id, $data) { echo "接收数据" . $data . "\n"; $data = trim($data); $task_id = $serv->task($data, 0); $serv->send($fd, "分发任务,任务id为$task_id\n"); }); $serv->on('Task', function (swoole_server $serv, $task_id, $from_id, $data) { echo "Tasker进程接收到数据"; echo "#{$serv->worker_id}\tonTask: [PID={$serv->worker_pid}]: task_id=$task_id, data_len=".strlen($data).".".PHP_EOL; $serv->finish($data); }); $serv->on('Finish', function (swoole_server $serv, $task_id, $data) { echo "Task#$task_id finished, data_len=".strlen($data).PHP_EOL; }); $serv->on('workerStart', function($serv, $worker_id) { global $argv; if($worker_id >= $serv->setting['worker_num']) { ¦ swoole_set_process_name("php {$argv[0]}: task_worker"); } else { ¦ swoole_set_process_name("php {$argv[0]}: worker"); } }); $serv->start(); ~~~