简介

Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端或者服务器也可以使用。

当然,在旧版本的客户端调用推送服务,或者在旧版本的服务器上自己实现推送,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。

Hprose 2.0 的客户端都支持推送功能,服务器端需要 Hprose 的独立服务器(即 fpm 方式的 HTTP 服务器不支持推送功能),例如:\Hprose\Socket\Serverhprose-swoole 版本的所有服务器都支持推送功能。

下面我们来分别介绍一下客户端和服务器端增加的关于推送的 API。

客户端

客户端关于推送的方法只有两个,它们分别是:

subscribe 方法

  1. function subscribe($name, $callback)
  2. function subscribe($name, $id, $callback)
  3. function subscribe($name, $callback, $timeout)
  4. function subscribe($name, $id, $callback, $timeout)
  5. function subscribe($name, $callback, $timeout, $failswitch)
  6. function subscribe($name, $id, $callback, $timeout, $failswitch)

subscribe 方法的用处是订阅服务器端的推送服务。该方法有两种方式,一种是自动获取设置客户端 $id,另一种是手动设置客户端 $id

参数 $name 是订阅的主题名,它实际上也是一个服务器端的方法,该方法与普通方法的区别是,它只有一个参数 $id,该参数表示客户端的唯一编号,该方法的返回值即推送信息,当返回值为 NULL 或者抛出异常时,客户端会忽略并再次调用该 $name 对应的方法。当该方法返回推送消息时,$callback 回调函数会执行,并同时再次调用该 $name 对应的方法。因此当没有推送消息时,该方法不应该立即返回值,而应该挂起等待,直到超时或者有推送消息时再返回结果。

当然,对于开发者来说,自己实现一个完善的推送方法还是有一定难度的。因此,Hprose 2.0 的服务器端已经提供了一整套的专门用于推送的 API,通过这些 API,可以方便的自动实现用于推送的服务方法。在后面介绍服务器端时,我们再介绍这部分内容。

参数 $id 是客户端的唯一编号,如果省略的话,客户端会使用自动编号机制,如果该自动编号未初始化,会自动调用一个名字为 # 的服务器端远程方法,之所以使用这个特殊的名字是为了防止跟用户发布的普通方法发生冲突。Hprose 2.0 服务器已经自动实现了该方法,但是用户也可以用自己的实现来替换它,它的默认实现是一个唯一字符串。当用户指定了 $id 参数时,客户端会将它作为该 $name 对应方法的参数值传给服务器端,但不会修改客户端自动获取的 $id 属性值。

参数 $callback 是用来处理推送消息的回调函数,该参数不能省略。

参数 $timeout 是等待推送消息的超时时间,单位是毫秒(ms),可以省略,默认值与 timeout 属性值相同。超时之后并不会产生异常,而是重新请求推送。因此,如果用户要在服务器端自己实现推送方法,应当注意处理好同一个客户端对同一个推送方法可能会进行重复调用的问题。如果使用 Hprose 2.0 提供的推送 API,则不需要关心这一点。

参数 $failswitch 表示当客户端与服务器端通讯中发生网络故障,是否自动切换服务器。默认值是 false,表示不切换。

对于同一个推送主题,subscribe 方法允许被多次调用,这样可以对同一个推送主题指定多个不同的回调方法。但通常没有必要也不推荐这样做。

unsubscribe 方法

  1. function unsubscribe($name)
  2. function unsubscribe($name, $callback)
  3. function unsubscribe($name, $id)
  4. function unsubscribe($name, $id, $callback)

该方法用于取消订阅推送主题。当调用该方法时,带有 $callback 参数,将只取消对该 $callback 方法的回调,除非这是该主题上最后一个 $callback,否则对该主题远程方法的调用并不会中断。当所有的 $callback 都被取消之后,或者当调用该方法时,没有指定 $callback 参数时,将会中断对该主题远程方法的循环调用。

如果 $id 参数未指定,那么当客户端自动获取的 $id 属性有值时,将只取消对该 $id 属性值对应的推送主题的订阅。当客户端自动获取的 $id 属性未初始化时,将会取消该主题上所有的订阅。

通常来说,当你调用 subscribe 方法时如果指定了 $id 参数,那么当调用 unsubscribe 方法时你也应该指定相同的 $id 参数。当你调用 subscribe 方法时没有指定 $id 参数,那么当调用 unsubscribe 方法时你也不需要指定 $id 参数。

isSubscribed 方法

  1. function isSubscribed($name)

当名称为 $name 的主题已被订阅时,返回 true,否则返回 false

subscribedList 方法

  1. function subscribedList()

返回已被订阅的主题的列表,返回值是一个字符串数组。数组元素为已订阅的主题名称。

服务器端

服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。

timeout 属性

该属性设置推送空闲超时的。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 NULL,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

getTimeout 方法

获取 timeout 属性值。

setTimeout 方法

设置 timeout 属性值。

heartbeat 属性

该属性用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒(ms),即 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 heartbeat 时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

timeoutheartbeat 属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat 时间可以检测到客户端以掉线。

timeoutheartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout 设置的过短,否则会严重增加服务器的负担。

因此,timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat 的时间设置的过长。

如果 heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

getHeartbeat 方法

返回 heartbeat 属性值。

setHeartbeat 方法

设置 heartbeat 属性值。

onSubscribe 属性

该属性用来设置客户端订阅事件。

该事件方法格式为:

  1. void onSubscribe(string $topic, string $id, \Hprose\Service service);

当编号为 $id 的客户端订阅主题 $topic 时,触发 onSubscribe 事件。

onUnsubscribe 属性

该属性用来设置客户端取消订阅事件。

该事件方法格式为:

  1. void onUnsubscribe(string $topic, string $id, \Hprose\Service service);

当编号为 $id 的客户端退订主题 $topic 时,触发 onUnsubscribe 事件。

publish 方法

  1. void publish(string $topic);
  2. void publish(string $topic, array $options);
  3. void publish(array $topics);
  4. void publish(array $topics, array $options);

该方法用于发布一个或一组推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

$topic 为主题名,$topics 为一组主题名。

$options 可以包含两个设置,分别是 timeoutheartbeat

这里 timeoutheartbeat 参数在前面的属性介绍里已经说明过了,这里不再重复。

publish 方法仅仅是告诉客户端,现在有一个叫做 $topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

广播

  1. void broadcast(string $topic, mixed $result);
  2. void broadcast(string $topic, mixed $result, callable $callback);
  3.  
  4. void push(string $topic, mixed $result);

这里有两个推送方法:broadcastpush

这两个方法功能相同,但是 broadcast 方法支持回调,该回调方法有两个参数,这个参数都是数组类型,第一个数组中是所有推送成功的客户端 $id,第二个数组中是所有推送失败的客户端 $id。而 push 方法不支持回调。

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

时间推送服务器

  1. use Hprose\Swoole\Server;
  2.  
  3. $server = new Server("tcp://0.0.0.0:2016");
  4. $server->publish('time');
  5. $server->on('workerStart', function($serv) use ($server) {
  6. $serv->tick(1000, function() use ($server) {
  7. $server->push('time', microtime(true));
  8. });
  9. });
  10. $server->start();

时间显示客户端

  1. use Hprose\Swoole\Client;
  2.  
  3. $client = new Client("tcp://127.0.0.1:2016");
  4. $count = 0;
  5. $client->subscribe('time', function($date) use ($client, &$count) {
  6. if (++$count > 10) {
  7. $client->unsubscribe('time');
  8. swoole_event_exit();
  9. }
  10. else {
  11. var_dump($date);
  12. }
  13. });

先运行服务器,后运行客户端,该然后客户端会每隔一秒钟输出一行,最后输出结果为:


  1. float(1469610600.1522)
  2. float(1469610601.1521)
  3. float(1469610602.1523)
  4. float(1469610603.1521)
  5. float(1469610604.1522)
  6. float(1469610605.1523)
  7. float(1469610606.1521)
  8. float(1469610607.1522)
  9. float(1469610608.1521)
  10. float(1469610609.1521)

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 $server 对象。那这时还能进行推送吗?

答案是可以,没问题。我们前面说过,在服务方法中我们可以得到一个 $context 参数,这个 $context 参数中就包含有一个 clients 对象,这个对象上就包含了所有跟推送有关的方法,这些方法跟 $server 对象上的推送方法是完全一样的,例如:

  1. $context->clients->broadcast($topic, $result[, $callback]);
  2. $context->clients->push($topic, $result);

我们再来看一个例子:

服务器

  1. use Hprose\Swoole\Server;
  2.  
  3. function hello($name, $context) {
  4. $context->clients->push("news", "this is a pushed message: $name");
  5. $context->clients->broadcast("news", array('x' => 1, 'y' => 2));
  6. $fdinfo = $context->server->connection_info($context->socket);
  7. return "Hello $name! -- {$fdinfo['remote_ip']}";
  8. }
  9.  
  10. $server = new Server("tcp://0.0.0.0:1980");
  11. $server->publish('news');
  12. $server->addFunction('hello', array('passContext' => true));
  13. $server->start();

客户端

  1. use Hprose\Swoole\Client;
  2. use Hprose\Future;
  3.  
  4. Future\co(function() {
  5. $client = new Client("tcp://127.0.0.1:1980");
  6. $id = (yield $client->getId());
  7. $client->subscribe('news', $id, function($news) {
  8. var_dump($news);
  9. });
  10. var_dump((yield $client->hello('hprose')));
  11. });

假设我们运行两个客户端,则第一个客户端显示:


  1. string(32) "this is a pushed message: hprose"
  2. string(26) "Hello hprose! -- 127.0.0.1"
  3. array(2) {
  4. ["x"]=>
  5. int(1)
  6. ["y"]=>
  7. int(2)
  8. }
  9. string(32) "this is a pushed message: hprose"
  10. array(2) {
  11. ["x"]=>
  12. int(1)
  13. ["y"]=>
  14. int(2)
  15. }

第二个客户端显示:


  1. string(32) "this is a pushed message: hprose"
  2. string(26) "Hello hprose! -- 127.0.0.1"
  3. array(2) {
  4. ["x"]=>
  5. int(1)
  6. ["y"]=>
  7. int(2)
  8. }

这两个客户端显示结果之后并不会退出,如果有其它客户端再次运行时,这两个客户端还会继续显示推送信息。也就是说,对于已经执行了 subscribe 的客户端,在未执行对应的 unsubscribe 方法之前,该客户端会一直运行,接收推送数据,即使服务器已经关闭,客户端也不会退出。

多播

  1. void multicast(string $topic, array $ids, mixed $result);
  2. void multicast(string $topic, array $ids, mixed $result, callable $callback);
  3.  
  4. void push(string $topic, array $ids, mixed $result);

跟广播类似,多播也有这样几种形式。跟广播相比,多播多了一个 $ids 参数,它是一个客户端 $id 的数组。也就是说,你可以向指定的一组客户端推送消息。在 $context->clients 上同样包含这些方法。

单播

  1. void unicast(string $topic, string $id, mixed $result);
  2. void unicast(string $topic, string $id, mixed $result, callable $callback);
  3.  
  4. void push(string $topic, string $id, mixed $result);

单播是跟多播的形式也类似,只不过客户端 $ids 数组参数变成了一个客户端 $id 参数。

但是还有一点要注意,unicast 的回调方法跟 broadcastmulticast 不同,unicast 的回调方法只有一个参数,而且是一个布尔值,该值为 true 是表示推送成功,为 false 表示推送失败。

$context->clients 上同样包含这些方法。

idlist 方法

  1. array idlist(string $topic);

该方法用于获取当前在线的所有客户端的 $id 列表。在 $context->clients 上也包含该方法。

exist 方法

  1. bool exist(string $topicstring $id);

该方法用于快速判断 $id 是否在当前在线的客户端列表中。在 $context->clients 上也包含该方法。

注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。

原文: https://github.com/hprose/hprose-php/wiki/10-%E6%8E%A8%E9%80%81%E6%9C%8D%E5%8A%A1