race与timeout

看到这里,你可能已经发现到我们封装的异步接口的问题了: 没有任何超时处理。

通常情况我们会为每个异步调用添加定时器,回调成功取消定时器,否则在定时器回调透传异常,例如:

  1. <?php
  2. // helper function
  3. function once(callable $fun)
  4. {
  5. $has = false;
  6. return function(...$args) use($fun, &$has) {
  7. if ($has === false) {
  8. $fun(...$args);
  9. $has = true;
  10. }
  11. };
  12. }
  13. // helper function
  14. function timeoutWrapper(callable $fun, $timeout)
  15. {
  16. return function($k) use($fun, $timeout) {
  17. $k = once($k);
  18. $fun($k);
  19. swoole_timer_after($timeout, function() use($k) {
  20. // 这里异常可以从外部传入
  21. $k(null, new \Exception("timeout"));
  22. });
  23. };
  24. }
  1. <?php
  2. // 为callcc添加超时处理
  3. function callcc(callable $fun, $timeout = 0)
  4. {
  5. if ($timeout > 0) {
  6. $fun = timeoutWrapper($fun, $timeout);
  7. }
  8. return new CallCC($fun);
  9. }
  10. // 我们的dns查询有了超时透传异常的能力了
  11. function async_dns_lookup($host, $timeout = 100)
  12. {
  13. return callcc(function($k) use($host) {
  14. swoole_async_dns_lookup($host, function($host, $ip) use($k) {
  15. $k($ip);
  16. });
  17. }, $timeout);
  18. }
  19. spawn(function() {
  20. try {
  21. yield async_dns_lookup("www.xxx.com", 1);
  22. } catch (\Exception $ex) {
  23. echo $ex; // ex!
  24. }
  25. });

但是,我们可以有更优雅通用的方式来超时处理:

  1. <?php
  2. class Any implements Async
  3. {
  4. public $parent;
  5. public $tasks;
  6. public $continuation;
  7. public $done;
  8. public function __construct(array $tasks, AsyncTask $parent = null)
  9. {
  10. assert(!empty($tasks));
  11. $this->tasks = $tasks;
  12. $this->parent = $parent;
  13. $this->done = false;
  14. }
  15. public function begin(callable $continuation)
  16. {
  17. $this->continuation = $continuation;
  18. foreach ($this->tasks as $id => $task) {
  19. (new AsyncTask($task, $this->parent))->begin($this->continuation($id));
  20. };
  21. }
  22. private function continuation($id)
  23. {
  24. return function($r, $ex = null) use($id) {
  25. if ($this->done) {
  26. return;
  27. }
  28. $this->done = true;
  29. if ($this->continuation) {
  30. $k = $this->continuation;
  31. $k($r, $ex);
  32. }
  33. };
  34. }
  35. }
  1. <?php
  2. // helper function
  3. function await($task, ...$args)
  4. {
  5. if ($task instanceof \Generator) {
  6. return $task;
  7. }
  8. if (is_callable($task)) {
  9. $gen = function() use($task, $args) { yield $task(...$args); };
  10. } else {
  11. $gen = function() use($task) { yield $task; };
  12. }
  13. return $gen();
  14. }
  15. function race(array $tasks)
  16. {
  17. $tasks = array_map(__NAMESPACE__ . "\\await", $tasks);
  18. return new Syscall(function(AsyncTask $parent) use($tasks) {
  19. if (empty($tasks)) {
  20. return null;
  21. } else {
  22. return new Any($tasks, $parent);
  23. }
  24. });
  25. }

我们构造了一个与Promise.race相同语义的接口,而我们之前构造Async接口则可以看成简陋版的Promise.then + Promise.catch。

  1. <?php
  2. // 我们重新来看这个简单dns查询函数
  3. function async_dns_lookup($host)
  4. {
  5. return callcc(function($k) use($host) {
  6. swoole_async_dns_lookup($host, function($host, $ip) use($k) {
  7. $k($ip);
  8. });
  9. });
  10. }
  11. // 我们有了一个纯粹的超时透传异常的函数
  12. function timeout($ms)
  13. {
  14. return callcc(function($k) use($ms) {
  15. swoole_timer_after($ms, function() use($k) {
  16. $k(null, new \Exception("timeout"));
  17. });
  18. });
  19. }
  20. // 当我们采取race语义并发执行dns查询与超时异常函数
  21. // 其实我们构造了一个更为灵活的超时处理方案
  22. spawn(function() {
  23. try {
  24. $ip = (yield race([
  25. async_dns_lookup("www.baidu.com"),
  26. timeout(100),
  27. ]));
  28. $res = (yield race([
  29. (new HttpClient($ip, 80))->awaitGet("/"),
  30. timeout(200),
  31. ]));
  32. var_dump($res->statusCode);
  33. } catch (\Exception $ex) {
  34. echo $ex;
  35. swoole_event_exit();
  36. }
  37. });

我们非常容易构造出更多支持超时的接口, 但我们代码看起来比之前更加清晰;

  1. <?php
  2. class HttpClient extends \swoole_http_client
  3. {
  4. public function awaitGet($uri, $timeout = 1000)
  5. {
  6. return race([
  7. callcc(function($k) use($uri) {
  8. $this->get($uri, $k);
  9. }),
  10. timeout($timeout),
  11. ]);
  12. }
  13. // ...
  14. }
  15. function async_dns_lookup($host, $timeout = 100)
  16. {
  17. return race([
  18. callcc(function($k) use($host) {
  19. swoole_async_dns_lookup($host, function($host, $ip) use($k) {
  20. $k($ip);
  21. });
  22. }),
  23. timeout($timeout),
  24. ]);
  25. }
  26. // test
  27. spawn(function() {
  28. try {
  29. $ip = (yield race([
  30. async_dns_lookup("www.baidu.com"),
  31. timeout(100),
  32. ]));
  33. $res = (yield (new HttpClient($ip, 80))->awaitGet("/"));
  34. var_dump($res->statusCode);
  35. } catch (\Exception $ex) {
  36. echo $ex;
  37. swoole_event_exit();
  38. }
  39. });