简介

Hprose 过滤器的功能虽然比较强大,可以将 Hprose 的功能进行扩展。但是有些功能使用它仍然难以实现,比如缓存。

为此,Hprose 2.0 引入了更加强大的中间件功能。Hprose 中间件不仅可以对输入输出的数据进行操作,它还可以对调用本身的参数和结果进行操作,甚至你可以跳过中间的执行步骤,或者完全由你来接管中间数据的处理。

Hprose 中间件跟普通的 HTTP 服务器中间件有些类似,但又有所不同。

Hprose 中间件在客户端服务器端都支持。

Hprose 中间件分为两种:

  • 调用中间件
  • 输入输出中间件
    另外,输入输出中间件又可以细分为 beforeFilterafterFilter 两种,但它们本质上没有什么区别,只是在执行顺序上有所区别。

执行顺序

Hprose 中间件的顺序执行是按照添加的前后顺序执行的,假设添加的中间件处理器分别为:handler1, handler2handlerN,那么执行顺序就是 handler1, handler2handlerN

不同类型的 Hprose 中间件和 Hprose 其它过程的执行流程如下图所示:

  1. +------------------------------------------------------------------+
  2. | +-----------------batch invoke----------------+ |
  3. | +------+ | +-----+ +------+ +------+ +-----+ | |
  4. | |invoke| | |begin| |invoke| ... |invoke| | end | | |
  5. | +------+ | +-----+ +------+ +------+ +-----+ | |
  6. | ^ +---------------------------------------------+ |
  7. | | ^ |
  8. | | | |
  9. | v v |
  10. | +-------------------+ +------------------+ |
  11. | | invoke middleware | | batch middleware | |
  12. | +-------------------+ +------------------+ |
  13. | ^ ^ |
  14. | | +---------------+ | |
  15. | +---->| encode/decode |<-----+ |
  16. | +---------------+ |
  17. | ^ |
  18. | | |
  19. | v |
  20. | +--------------------------+ |
  21. | | before filter middleware | |
  22. | +--------------------------+ |
  23. | ^ |
  24. | | _ _ ___ ____ ____ ____ ____ |
  25. | v |__| |__] |__/ | | [__ |___ |
  26. | +--------+ | | | | \ |__| ___] |___ |
  27. | | filter | |
  28. | +--------+ ____ _ _ ____ _ _ ___ |
  29. | ^ | | | |___ |\ | | |
  30. | | |___ |___ | |___ | \| | |
  31. | v |
  32. | +-------------------------+ |
  33. | | after filter middleware | |
  34. | +-------------------------+ |
  35. +------------------------------------------------------------------+
  36. ^
  37. |
  38. |
  39. v
  40. +------------------------------------------------------------------+
  41. | +--------------------------+ |
  42. | | before filter middleware | |
  43. | +--------------------------+ |
  44. | ^ |
  45. | | _ _ ___ ____ ____ ____ ____ |
  46. | v |__| |__] |__/ | | [__ |___ |
  47. | +--------+ | | | | \ |__| ___] |___ |
  48. | | filter | |
  49. | +--------+ ____ ____ ____ _ _ ____ ____ |
  50. | ^ [__ |___ |__/ | | |___ |__/ |
  51. | | ___] |___ | \ \/ |___ | \ |
  52. | v |
  53. | +-------------------------+ |
  54. | | after filter middleware | |
  55. | +-------------------------+ |
  56. | ^ |
  57. | | |
  58. | v |
  59. | +---------------+ |
  60. | +----------->| encode/decode |<---------------------+ |
  61. | | +---------------+ | |
  62. | | | | |
  63. | | | | |
  64. | | v | |
  65. | | +---------------+ | |
  66. | | | before invoke |-------------+ | |
  67. | | +---------------+ | | |
  68. | | | | | |
  69. | | | | | |
  70. | | v v | |
  71. | | +-------------------+ +------------+ | |
  72. | | | invoke middleware |--->| send error |--+ |
  73. | | +-------------------+ +------------+ |
  74. | | | ^ |
  75. | | | | |
  76. | | v | |
  77. | | +--------------+ | |
  78. | | | after invoke |--------------+ |
  79. | | +--------------+ |
  80. | | | |
  81. | | | |
  82. | +--------------------+ |
  83. +------------------------------------------------------------------+

调用中间件

调用中间件的形式为:

  1. function(string $name, array &$args, stdClass $context, Closure $next) {
  2. ...
  3. $result = $next($name, $args, $context);
  4. ...
  5. return $result;
  6. }

$name 是调用的远程函数/方法名。

$args 是调用参数,引用传递的数组。

$context 是调用上下文对象。

$next 表示下一个中间件。通过调用 $next 将各个中间件串联起来。

在调用 $next 之前的操作在调用发生前执行,在调用 $next 之后的操作在调用发生后执行,如果你不想修改返回结果,你应该将 $next 的返回值作为该中间件的返回值返回。

另外,对于服务器端来说,$next 的返回值 $result 总是 promise 对象。对于客户端来说,如果客户端是异步客户端,那么 $next 的返回值 $resultpromise 对象,如果客户端是同步客户端,$next 的返回值 $result 是实际结果。

跟踪调试

我们来看一个例子:

LogHandler.php

  1. use Hprose\Future;
  2.  
  3. $logHandler = function($name, array &$args, stdClass $context, Closure $next) {
  4. error_log("before invoke:");
  5. error_log($name);
  6. error_log(var_export($args, true));
  7. $result = $next($name, $args, $context);
  8. error_log("after invoke:");
  9. if (Future\isFuture($result)) {
  10. $result->then(function($result) {
  11. error_log(var_export($result, true));
  12. });
  13. }
  14. else {
  15. error_log(var_export($result, true));
  16. }
  17. return $result;
  18. };

Server.php

  1. use Hprose\Socket\Server;
  2.  
  3. function hello($name) {
  4. return "Hello $name!";
  5. }
  6.  
  7. $server = new Server('tcp://0.0.0.0:1143/');
  8. $server->addFunction('hello');
  9. $server->debug = true;
  10. $server->addInvokeHandler($logHandler);
  11. $server->start();

Client.php

  1. use Hprose\Client;
  2.  
  3. $client = Client::create('tcp://127.0.0.1:1143/', false);
  4. $client->addInvokeHandler($logHandler);
  5. var_dump($client->hello("world"));

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出


  1. before invoke:
  2. hello
  3. array (
  4. 0 => 'world',
  5. )
  6. after invoke:
  7. 'Hello world!'

客户端输出


  1. before invoke:
  2. hello
  3. array (
  4. 0 => 'world',
  5. )
  6. after invoke:
  7. 'Hello world!'
  8. string(12) "Hello world!"

这个例子中是使用的同步客户端,所以我们在 $logHandler 中判断了返回结果是同步结果还是异步结果,并根据结果的不同做了不同的处理。

如果我们的中间是单独为异步客户端或者服务器编写的,则不需要做这个判断。甚至我们还可以使用协程的方式来简化代码(需要 PHP 5.5+ 才可以)。

下面,我们来把上面的例子翻译成一个只针对异步客户端和服务器的使用协程方式编写的日志中间件:

coLogHandler.php

  1. $coLogHandler = function($name, array &$args, stdClass $context, Closure $next) {
  2. error_log("before invoke:");
  3. error_log($name);
  4. error_log(var_export($args, true));
  5. $result = (yield $next($name, $args, $context));
  6. error_log("after invoke:");
  7. error_log(var_export($result, true));
  8. };

客户端和服务器的代码以及运行结果这里就省略了,如果你写的正确,运行结果跟上面的是一致的。

这个例子看上去要简单清爽的多,在这个例子中,我们使用 yield 关键字调用了 $next 方法,因为后面没有再次调用 yield,所以这个返回值也是该协程的返回值。

注意,不要用 Future\wrap 来包装这个协程,包装之后,不支持引用参数传递。

缓存调用

我们再来看一个实现缓存调用的例子,在这个例子中我们也使用了上面的日志中间件,用来观察我们的缓存是否真的有效。

CacheHandler.php

  1. class CacheHandler {
  2. private $cache = array();
  3. function handle($name, array &$args, stdClass $context, Closure $next) {
  4. if (isset($context->userdata->cache)) {
  5. $key = hprose_serialize($args);
  6. if (isset($this->cache[$name])) {
  7. if (isset($this->cache[$name][$key])) {
  8. return $this->cache[$name][$key];
  9. }
  10. }
  11. else {
  12. $this->cache[$name] = array();
  13. }
  14. $result = $next($name, $args, $context);
  15. $this->cache[$name][$key] = $result;
  16. return $result;
  17. }
  18. return $next($name, $args, $context);
  19. }
  20. }

Client.php

  1. use Hprose\Client;
  2. use Hprose\InvokeSettings;
  3.  
  4. $cacheSettings = new InvokeSettings(array("userdata" => array("cache" => true)));
  5. $client = Client::create('tcp://127.0.0.1:1143/', false);
  6. $client->addInvokeHandler(array(new CacheHandler(), 'handle'));
  7. $client->addInvokeHandler($logHandler);
  8. var_dump($client->hello("cache world", $cacheSettings));
  9. var_dump($client->hello("cache world", $cacheSettings));
  10. var_dump($client->hello("no cache world"));
  11. var_dump($client->hello("no cache world"));

我们的服务器仍然使用上面例子中的服务器。在确保服务器已启动的情况下,我们运行客户端,可以看到它们分别输出以下结果:

服务器输出


  1. before invoke:
  2. hello
  3. array (
  4. 0 => 'cache world',
  5. )
  6. after invoke:
  7. 'Hello cache world!'
  8. before invoke:
  9. hello
  10. array (
  11. 0 => 'no cache world',
  12. )
  13. after invoke:
  14. 'Hello no cache world!'
  15. before invoke:
  16. hello
  17. array (
  18. 0 => 'no cache world',
  19. )
  20. after invoke:
  21. 'Hello no cache world!'

客户端输出


  1. before invoke:
  2. hello
  3. array (
  4. 0 => 'cache world',
  5. )
  6. after invoke:
  7. 'Hello cache world!'
  8. string(18) "Hello cache world!"
  9. string(18) "Hello cache world!"
  10. before invoke:
  11. hello
  12. array (
  13. 0 => 'no cache world',
  14. )
  15. after invoke:
  16. 'Hello no cache world!'
  17. string(21) "Hello no cache world!"
  18. before invoke:
  19. hello
  20. array (
  21. 0 => 'no cache world',
  22. )
  23. after invoke:
  24. 'Hello no cache world!'
  25. string(21) "Hello no cache world!"

我们看到输出结果中 'cache world' 的日志只被打印了一次,而 'no cache world' 的日志被打印了两次。这说明 'cache world' 确实被缓存了。

在这个例子中,我们用到了 userdata 设置项和 $context->userdata,通过 userdata 配合 Hprose 中间件,我们就可以实现自定义选项功能了。

输入输出中间件

输入输出中间件可以完全代替 Hprose 过滤器。使用输入输出中间件还是使用 Hprose 过滤器完全看开发者喜好。

输入输出中间件的形式为:

  1. function(string $request, stdClass $context, Closure $next) {
  2. ...
  3. $result = $next($request, $context);
  4. ...
  5. return $result;
  6. }

$request 是原始请求数据,对于客户端来说它是输出数据,对于服务器端来说,它是输入数据。该数据的类型为 string 类型对象。

$context 是调用上下文对象。

$next 表示下一个中间件。通过调用 $next 将各个中间件串联起来。

$next 的返回值 $response 是返回的响应数据。对于客户端来说,它是输入数据。对于服务器端来说,它是输出数据。跟调用中间一样,服务器和异步客户端返回的 $response 是一个 promise 对象,而同步客户端返回的是是一个 string 数据。

跟踪调试

下面我们来看一下 Hprose 过滤器中的跟踪调试的例子在这里如何实现。

logHandler2.php

  1. use Hprose\Future;
  2.  
  3. $logHandler2 = function($request, stdClass $context, Closure $next) {
  4. error_log($request);
  5. $response = $next($request, $context);
  6. Future\run('error_log', $response);
  7. return $response;
  8. };

Server.php

  1. use Hprose\Socket\Server;
  2.  
  3. function hello($name) {
  4. return "Hello $name!";
  5. }
  6.  
  7. $server = new Server('tcp://0.0.0.0:1143/');
  8. $server->addFunction('hello');
  9. $server->debug = true;
  10. $server->addBeforeFilterHandler($logHandler2);
  11. $server->start();

Client.php

  1. use Hprose\Client;
  2.  
  3. $client = Client::create('tcp://127.0.0.1:1143/', false);
  4. $client->addBeforeFilterHandler($logHandler2);
  5. var_dump($client->hello("world"));

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出


  1. Cs5"hello"a1{s5"world"}z
  2. Rs12"Hello world!"z

客户端输出


  1. Cs5"hello"a1{s5"world"}z
  2. Rs12"Hello world!"z
  3. string(12) "Hello world!"

这个结果跟使用 Hprose 过滤器的例子的结果一模一样。

但是我们发现,这里使用 Hprose 中间件要写的代码比起 Hprose 过滤器来要多一些。主要原因是在 Hprose 中间件中,$next 的返回值为 promise 对象,需要异步处理,而 Hprose 过滤器只需要同步处理就可以了。在这个例子中,我们是直接使用 Future\run 来处理异步结果的。

另外,因为这个例子中,我们没有使用过滤器功能,因此使用 addBeforeFilterHander 方法或者 addAfterFilterHandler 方法添加中间件处理器效果都是一样的。

但如果我们使用了过滤器的话,那么 addBeforeFilterHander 添加的中间件处理器的 $request 数据是未经过过滤器处理的。过滤器的处理操作在 $next 的最后一环中执行。$next 返回的响应 $response 是经过过滤器处理的。

如果某个通过 addBeforeFilterHander 添加的中间件处理器跳过了 $next 而直接返回了结果的话,则返回的 $response 也是未经过过滤器处理的。而且如果某个 addBeforeFilterHander 添加的中间件处理器跳过了 $next,不但过滤器不会执行,而且在它之后使用 addBeforeFilterHander 所添加的中间件处理器也不会执行,addAfterFilterHandler 方法所添加的所有中间件处理器也都不会执行。

addAfterFilterHandler 添加的处理器所收到的 $request 都是经过过滤器处理以后的,但它当中使用 $next 方法返回的 $response 是未经过过滤器处理的。

下面,我们在来看一个结合了压缩过滤器和输入输出缓存中间件的例子。

压缩、缓存、计时

CompressFilter.php

  1. use Hprose\Filter;
  2.  
  3. class CompressFilter implements Filter {
  4. public function inputFilter($data, stdClass $context) {
  5. return gzdecode($data);
  6. }
  7. public function outputFilter($data, stdClass $context) {
  8. return gzencode($data);
  9. }
  10. }

上面的代码跟 Hprose 过滤器一章的压缩代码完全相同。

SizeHandler.php

  1. class SizeHandler {
  2. private $message;
  3. public function __construct($message) {
  4. $this->message = $message;
  5. }
  6. public function asynchandle($request, stdClass $context, Closure $next) {
  7. error_log($this->message . ' request size: ' . strlen($request));
  8. $response = (yield $next($request, $context));
  9. error_log($this->message . ' response size: ' . strlen($response));
  10. }
  11. public function synchandle($request, stdClass $context, Closure $next) {
  12. error_log($this->message . ' request size: ' . strlen($request));
  13. $response = $next($request, $context);
  14. error_log($this->message . ' response size: ' . strlen($response));
  15. return $response;
  16. }
  17. }

StatHandler.php

  1. class StatHandler {
  2. private $message;
  3. public function __construct($message) {
  4. $this->message = $message;
  5. }
  6. public function asynchandle($name, array &$args, stdClass $context, Closure $next) {
  7. $start = microtime(true);
  8. yield $next($name, $args, $context);
  9. $end = microtime(true);
  10. error_log($this->message . ': It takes ' . ($end - $start) . ' s.');
  11. }
  12. public function synchandle($name, array &$args, stdClass $context, Closure $next) {
  13. $start = microtime(true);
  14. $response = $next($name, $args, $context);
  15. $end = microtime(true);
  16. error_log($this->message . ': It takes ' . ($end - $start) . ' s.');
  17. return $response;
  18. }
  19. }

StatHandler2.php

  1. class StatHandler2 {
  2. private $message;
  3. public function __construct($message) {
  4. $this->message = $message;
  5. }
  6. public function asynchandle($request, stdClass $context, Closure $next) {
  7. $start = microtime(true);
  8. yield $next($request, $context);
  9. $end = microtime(true);
  10. error_log($this->message . ': It takes ' . ($end - $start) . ' s.');
  11. }
  12. public function synchandle($request, stdClass $context, Closure $next) {
  13. $start = microtime(true);
  14. $response = $next($request, $context);
  15. $end = microtime(true);
  16. error_log($this->message . ': It takes ' . ($end - $start) . ' s.');
  17. return $response;
  18. }
  19. }

这三个中间件,我们给它们分别编写了同步和异步处理程序。异步我们用的是协程方式,看上去跟同步一样简单。

CacheHandler2.php

  1. class CacheHandler2 {
  2. private $cache = array();
  3. function handle($request, stdClass $context, Closure $next) {
  4. if (isset($context->userdata->cache)) {
  5. if (isset($this->cache[$request])) {
  6. return $this->cache[$request];
  7. }
  8. $response = $next($request, $context);
  9. $this->cache[$request] = $response;
  10. return $response;
  11. }
  12. return $next($request, $context);
  13. }
  14. }

缓存中间件因为不涉及到对结果的判断,所以同步和异步写法是一样的。

Server.php

  1. use Hprose\Socket\Server;
  2.  
  3. $server = new Server('tcp://0.0.0.0:1143/');
  4. $server->addFunction(function($value) { return $value; }, 'echo')
  5. ->addBeforeFilterHandler(array(new StatHandler2("BeforeFilter"), 'asynchandle'))
  6. ->addBeforeFilterHandler(array(new SizeHandler("compressedr"), 'asynchandle'))
  7. ->addFilter(new CompressFilter())
  8. ->addAfterFilterHandler(array(new StatHandler2("AfterFilter"), 'asynchandle'))
  9. ->addAfterFilterHandler(array(new SizeHandler("Non compressed"), 'asynchandle'))
  10. ->addInvokeHandler(array(new StatHandler("Invoke"), 'asynchandle'))
  11. ->start();

在这里我们看到了发布方法,添加中间件,添加过滤器都支持链式调用。

Client.php

  1. use Hprose\Client;
  2. use Hprose\InvokeSettings;
  3.  
  4. $cacheSettings = new InvokeSettings(array("userdata" => array("cache" => true)));
  5.  
  6. $client = Client::create('tcp://127.0.0.1:1143/', false);
  7. $client->addBeforeFilterHandler(array(new CacheHandler2(), 'handle'))
  8. ->addBeforeFilterHandler(array(new StatHandler2('BeforeFilter'), 'synchandle'))
  9. ->addBeforeFilterHandler(array(new SizeHandler('Non compressed'), 'synchandle'))
  10. ->addFilter(new CompressFilter())
  11. ->addAfterFilterHandler(array(new StatHandler2('AfterFilter'), 'synchandle'))
  12. ->addAfterFilterHandler(array(new SizeHandler('compressed'), 'synchandle'))
  13. ->addInvokeHandler(array(new StatHandler("Invoke"), 'synchandle'));
  14.  
  15. $value = range(0, 99999);
  16. var_dump(count($client->echo($value, $cacheSettings)));
  17. var_dump(count($client->echo($value, $cacheSettings)));

客户端添加过滤器和中间件也支持链式调用。

分别启动服务器和客户端,就会看到如下输出:

服务器输出


  1. compressedr request size: 216266
  2. Non compressed request size: 688893
  3. Invoke: It takes 0.0062549114227295 s.
  4. Non compressed response size: 688881
  5. AfterFilter: It takes 0.039386034011841 s.
  6. compressedr response size: 216245
  7. BeforeFilter: It takes 0.066082954406738 s.

客户端输出


  1. Non compressed request size: 688893
  2. compressed request size: 216266
  3. compressed response size: 216245
  4. AfterFilter: It takes 0.068840026855469 s.
  5. Non compressed response size: 688881
  6. BeforeFilter: It takes 0.093698024749756 s.
  7. Invoke: It takes 0.10785388946533 s.
  8. int(100000)
  9. Invoke: It takes 0.012754201889038 s.
  10. int(100000)
  11. >

我们可以看到两次的执行结果都出来了,但是中间件的输出内容只有一次。原因就是第二次执行时,缓存中间件将缓存的结果直接返回了。因此后面所有的步骤就都略过了。

通过这个例子,我们可以看出,将 Hprose 中间件和 Hprose 过滤器结合,可以实现非常强大的扩展功能。如果你有什么特殊的需求,直接使用 Hprose 无法实现的话,就考虑一下是否可以添加几个 Hprose 中间件和 Hprose 过滤器吧。

原文: https://github.com/hprose/hprose-php/wiki/12-Hprose-%E4%B8%AD%E9%97%B4%E4%BB%B6