-
博文分类专栏
- Jquery基础教程
-
- 文章:(15)篇
- 阅读:48320
- shell命令
-
- 文章:(42)篇
- 阅读:159874
- Git教程
-
- 文章:(36)篇
- 阅读:241661
- leetCode刷题
-
- 文章:(76)篇
- 阅读:144192
-
PHP中zeromq使用2017-09-09 21:42 阅读(10761) 评论(1)
在前一篇(PHP的ZMQ扩展zeromq源码安装)文章中,我们配置好了zeromq,现在到了使用的时候。ZMQ 提供了三个基本的通信模型,即Request-Reply Publisher-Subscriber Parallel Pipeline,下面将针对每一种模式,从两个方面去探究,怎么向消息队列中写消息,怎么从消息队列中读取消息。
一、zemq的使用步骤
1、使用ZMQContext创建一个上下文
2、使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
PUB,SUB REQ,REP REQ,ROUTER (take care, REQ inserts an extra null frame) DEALER,REP (take care, REP assumes a null frame) DEALER,ROUTER DEALER,DEALER ROUTER,ROUTER PUSH,PULL PAIR,PAIR 分类包括 轮询,REQ,PUSH,DEALER 多播,PUB 公平排队,REP,SUB,PULL,DEALER 明确寻址,ROUTER 单播,PAIR
3、指定IP以及端口,如果是服务端就bind,如果是客户端就conncet
进程内部通信,inproc:// 进程间通信,ipc:// 网络间通信,tcp:// 多播,pgm://
4、使用send/recv发送/接收消息
二、ZMQ常用的通信模式案例
1、Request-Reply (请求应答模式)
该模式的简介:server监听tcp的某个端口(下面案例中,绑定了8082端口),等待client 发起请求,并做相应的处理。如下图
server.php 文件
$ip = '127.0.0.1'; $context = new \ZMQContext(1); $server = new \ZMQSocket($context, \ZMQ::SOCKET_REP);//第二个参数指定通信模型 $server->bind("tcp://" . $ip . ':8082'); while(true){ $res = $server->recv();//监听用户的请求 echo 'a request comes ,and the content is '.$res; $server->send("Request-Reply 通信 by dq_test"); }
client.php文件
$ip = '127.0.0.1'; $context = new \ZMQContext(1); $client = new \ZMQSocket($context, \ZMQ::SOCKET_REQ); $client->connect("tcp://" . $ip . ':8082'); $msg = array('name' => 'dq', 'lang' =>'php'); echo "准备发送请求 "; $rs = $client->send(json_encode($msg));//发送请求 echo "已经发送,等待反馈\n"; $res = $client->recv(); //获取请求返回值 echo "当前请求的返回值:".$res."\n";
首先,得启动sever.php脚本,如下图
启动前后,通过lsof查看8082端口占用情况。
现在开始测试client.php
注意
1.服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socke。
2、Publisher-Subscriber(发布订阅模式)
该模式的简介:有一个节点发布消息,其他所有节点可以接受消息,有点类似天气预报或是广播哦。如下图
消息发布者 publish.php代码如下
$ip = '127.0.0.1'; $context = new \ZMQContext(1); $publish = new \ZMQSocket($context, \ZMQ::SOCKET_PUB);//第二个参数指定通信模型 $publish->bind("tcp://" . $ip . ':8083'); while(true) { $publish->send("dq Request-Reply 通信 by dq_test"); sleep(1); }
消息订阅者 subscribe.php代码如下
$ip = '127.0.0.1'; $context = new \ZMQContext(1); $subscribe = new \ZMQSocket($context, \ZMQ::SOCKET_SUB);//第二个参数指定通信模型 $subscribe->connect("tcp://" . $ip . ':8083'); //设置频道 ,发布者可能有多个频道 $filter = 'dq'; //接受消息的频道,必须设置,否则获取不到消息 $subscribe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); while(true) { $rs = $subscribe->recv(); echo $rs."\n"; }
执行上面程序
添加订阅者
/usr/local/php/bin/php subscribe.php &
启动发布程序
/usr/local/php/bin/php publish.php
执行效果如下图:
效率分析
其发布任务的效率还是不错的,发布10w个任务,每秒发布的个数分别如下:
个订阅者的个数 每秒发布任务的个数 0 167万 1 76.9万 2 76.9万 3 52.6万 具体如下图:
注意
1、发布端(publish.php)需要一直处在运行状态,所有在上面程序中使用了while(true)死循环。若发布端(publish.php)中断,订阅者(subscribe.php)将会堵塞。
2、如果中途有订阅者(subscribe.php)退出,并不影响他继续的广播,当订阅者(subscribe.php)再次连接上来的时候,仍然可以收到消息。
3、Parallel Pipeline(并行管道模式)
该模式的简介:该模式给人的感觉就是总-分-总,一个地方进行分发任务到各个节点,等节点完成后,汇总到一个地方进行统计。懒得画图,借用一下官方的图,如下
任务的发布者 serverPusher.php
<?php $context = new ZMQContext(); // Socket to send messages on $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sender->bind("tcp://*:8034"); echo "Press Enter when the workers are ready: "; $fp = fopen('php://stdin', 'r'); $line = fgets($fp, 512); fclose($fp); echo "Sending tasks to workers…", PHP_EOL; // The first message is "0" and signals start of batch $sender->send(0); // Send 100 tasks $total_msec = 0; // Total expected cost in msecs for ($task_nbr = 0; $task_nbr < 100000; $task_nbr++) { // Random workload from 1 to 100msecs $sender->send($task_nbr); } printf ("Total expected cost: %d msec\n", $total_msec); sleep (1); // Give 0MQ time to deliver
任务的接受者 worker.php
<?php $context = new ZMQContext(); $index = 0; // Socket to receive messages on $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); $receiver->connect("tcp://localhost:8034"); // Socket to send messages to $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sender->connect("tcp://localhost:8035"); // Process tasks forever while (true) { $string = $receiver->recv(); $index +=1; // Simple progress indicator for the viewer echo '任务id:'.$string; echo ';已经处理了'.$index.'个任务', PHP_EOL; // Do the work usleep($string * 1000); // Send results to sink $sender->send("good"); }
任务的汇总者 serverSummary.php
<?php $context = new ZMQContext(); $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); $receiver->bind("tcp://*:8035"); // Wait for start of batch $string = $receiver->recv(); // Start our clock now $tstart = microtime(true); // Process 100 confirmations $total_msec = 0; // Total calculated cost in msecs for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) { $string = $receiver->recv(); if ($task_nbr % 10 == 0) { echo ":"; } else { echo "."; } } $tend = microtime(true); $total_msec = ($tend - $tstart) * 1000; echo PHP_EOL; printf ("Total elapsed time: %d msec", $total_msec); echo PHP_EOL;
这种模式声称是负载均衡,其实是平均分配而已。如下,101个任务,分发给3个woker
可以看出任务id对3进行求余,余1分配给第一个人woker,余0分配给第二个woker,余2分配给第三个woker。如果仅仅是这样也就算了,啃爹的还不仅仅是这些,比如我们现在要分发1000个任务,如下图1、2、3在任务分发前启动,4在任务分发后启动,中途中断3后,隔几秒再启动,然后中断2,最后中断1。
由上图,可以看出,当4启动以后,并不会被分发任务,也就是说当woker没有在任务分发前启动,是不会被分发任务的。其次,3中断后,原本该分发给3的任务,并不会分配给1和2,从而导致部分任务丢失。也就是说,在任务分发的时候,若有woker中断,会导致任务丢失。当有两个woker的时候,这种丢失看得更明显如下图:
三、补充
1、错误排查
2、zemq的不足之处
我感觉主要是其不支持持久化,当然这也成就了它的快速。
3、参考
https://github.com/anjuke/zguide-cn
http://zguide.zeromq.org/page:all