企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
ZMQ三种基本模型 ~~~ 1、Request-Reply 模型(请求与应答模式) 2、Publisher-Subscriber模型(发布与订阅模式) 3、Parallel Pipeline模型(并行管道模式) ~~~ ## **Request-Reply 模型(请求与应答模式)** 客户端和服务端都可以是 1:N 的模型。通常把 1 认为是服务端,N 认为是客户端 。ZMQ可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为N:M(只需要加入若干路由节点)。 server.php代码如下: ~~~ <?php //创建一个新的套接字上下文 $context = new ZMQContext(); //创建一个ZMQ响应套接字 $rep = new ZMQSocket($context, ZMQ::SOCKET_REP); //绑定端口 $rep->bind("tcp://127.0.0.1:6666"); while(true) { //循环处理消息 //获取消息 $req = $rep->recv(); echo "Received Message: {$req} \r\n"; sleep(1); //向客户端发送消息 $rep->send('World'); } ~~~ client.php代码如下: ~~~ <?php //创建一个新的套接字上下文 $context = new ZMQContext(); //创建一个ZMQ请求套接字 $req = new ZMQSocket($context, ZMQ::SOCKET_REQ); //连接到端口 $req->connect("tcp://127.0.0.1:6666"); for($ix = 0; $ix < 5; ++$ix) { //发送请求 $req->send('Hello'); $reply = $req->recv(); echo "Received Reply: {$reply} \r\n"; } ~~~ >[danger]需要注意如下几点: 1、服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。 2、在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。 3、服务端收到信息以后,会send一个"World"给客户端。 值得注意的是一定是client连接上来以后, send 消息给 Server,然后 Server 再 recv 然后响应 client,这种一问一答式的。 如果 Server 先 send,client 先 recv 是会报错的。 4、ZMQ通信通信单元是消息,他除了知道 Bytes 的大小,他并不关心的消息格式。 因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等。 5、虽然可以使用 ZMQ 实现 HTTP 协议,但是,这绝不是它所擅长的。 ## **Publisher-Subscriber模型(发布与订阅模式)** server.php代码如下: ``` <?php //创建一个新的套接字上下文 $context = new ZMQContext(); //创建一个ZMQ发布套接字 $pub = new ZMQSocket($context, ZMQ::SOCKET_PUB); //绑定端口 $pub->bind("tcp://127.0.0.1:7777"); while(true) { $typeArr = array('A', 'B', 'C', 'D'); $tmp = mt_rand(0, 3); $type = $typeArr[$tmp]; $num = mt_rand(1, 9999); $update = sprintf('%s %04d', $type, $num); echo "Send Message: {$update} \r\n"; sleep(1); //向客户端发送消息 $pub->send($update); } ``` client.php代码如下: ``` <?php //创建一个新的套接字上下文 $context = new ZMQContext(); //创建一个ZMQ订阅套接字 $sub = new ZMQSocket($context, ZMQ::SOCKET_SUB); //连接到端口 $sub->connect("tcp://127.0.0.1:7777"); $filter = 'B'; //设置消息过滤器,只接收类型为'B'的消息 $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); for($ix = 0; $ix < 50; ++$ix) { $update = $sub->recv(); sscanf($update, '%s %d', $type, $num); echo "type: {$type} num: {$num} \r\n"; } ``` 上述代码表示,Pub端不断的发送类型在(A,B,C,D)中的随机数$num,而Sub端通过设置消息过滤,只接收类型为B的消息,接收50次后就退出。 >[danger]注意点如下: 1、与Hello World不同的是,Socket的类型变成SOCKET_PUB和SOCKET_SUB类型。 2、客户端需要$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); 设置一个过滤值,相当于设定一个订阅频道,否则什么信息也收不到。 3、服务器端一直不断的广播中,如果中途有 Sub 端退出,并不影响他继续的广播, 当 Sub 再连接上来的时候,收到的就是后来发送的新的信息了。 这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题。 4、但是,如果 Pub 中途离开,所有的 Sub 会阻塞住,等待 Pub 再上线的时候,会继续接受信息。 ## **Parallel Pipeline模型(并行管道模式)** send.php代码如下: ``` <?php //创建一个新的套接字上下文 $context = new ZMQContext(); //创建一个ZMQ分发套接字 $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH); //绑定端口 $sed->bind("tcp://127.0.0.1:8881"); //统计从0到1000000累加数 $num = 1000000; //分50步 $step = 50; //每步大小 $size = $num / $step; //开始大小 $start = 0; //结束大小 $end = 0; //将任务分发给worker节点 for($ix = 1; $ix <= $step; ++$ix) { $end = $start + $size; $data = sprintf('%d %d', $start, $end); echo "start: {$start} end: {$end} \r\n"; $sed->send($data); $start = $end; } ``` worker.php代码如下: ``` <?php //创建一个新的套接字上下文 $context = new ZMQContext(); $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); $rev->connect("tcp://127.0.0.1:8881"); $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sed->connect("tcp://127.0.0.1:8882"); while(true) { //获取分发过来的数据 $data = $rev->recv(); sscanf($data, '%d %d', $start, $end); //计算累加和 $total = 0; for($ix = $start; $ix < $end; ++$ix) { $total += $ix; } echo "worker start: {$start} end: {$end} total: {$total} \r\n"; //把计算的结果发送给result $sed->send($total); } ``` result.php代码如下: ``` <?php //创建一个新的套接字上下文 $context = new ZMQContext(); $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); $rev->bind("tcp://127.0.0.1:8882"); $step = 50; $total = 0; for($ix = 1; $ix <= $step; ++$ix) { //接收worker计算后的结果 $result = $rev->recv();   //累加结果 $total += $result; } //输出最后的计算结果 echo "result: {$total} \r\n"; ``` send.php通过把累加任务分发给50个worker节点计算,然后worker节点计算完成后,把结果发送给result.php进行统一的汇总。