all与parallel

Any表示多个异步回调,任一回调完成则任务完成,All表示等待所有回调均执行完成才算任务完成,二者相同点是IO部分并发执行;

  1. <?php
  2. class All implements Async
  3. {
  4. public $parent;
  5. public $tasks;
  6. public $continuation;
  7. public $n;
  8. public $results;
  9. public $done;
  10. public function __construct(array $tasks, AsyncTask $parent = null)
  11. {
  12. $this->tasks = $tasks;
  13. $this->parent = $parent;
  14. $this->n = count($tasks);
  15. assert($this->n > 0);
  16. $this->results = [];
  17. }
  18. public function begin(callable $continuation = null)
  19. {
  20. $this->continuation = $continuation;
  21. foreach ($this->tasks as $id => $task) {
  22. (new AsyncTask($task, $this->parent))->begin($this->continuation($id));
  23. };
  24. }
  25. private function continuation($id)
  26. {
  27. return function($r, $ex = null) use($id) {
  28. if ($this->done) {
  29. return;
  30. }
  31. // 任一回调发生异常,终止任务
  32. if ($ex) {
  33. $this->done = true;
  34. $k = $this->continuation;
  35. $k(null, $ex);
  36. return;
  37. }
  38. $this->results[$id] = $r;
  39. if (--$this->n === 0) {
  40. // 所有回调完成,终止任务
  41. $this->done = true;
  42. if ($this->continuation) {
  43. $k = $this->continuation;
  44. $k($this->results);
  45. }
  46. }
  47. };
  48. }
  49. }
  50. function all(array $tasks)
  51. {
  52. $tasks = array_map(__NAMESPACE__ . "\\await", $tasks);
  53. return new Syscall(function(AsyncTask $parent) use($tasks) {
  54. if (empty($tasks)) {
  55. return null;
  56. } else {
  57. return new All($tasks, $parent);
  58. }
  59. });
  60. }
  1. <?php
  2. spawn(function() {
  3. $ex = null;
  4. try {
  5. $r = (yield all([
  6. async_dns_lookup("www.bing.com", 100),
  7. async_dns_lookup("www.so.com", 100),
  8. async_dns_lookup("www.baidu.com", 100),
  9. ]));
  10. var_dump($r);
  11. /*
  12. array(3) {
  13. [0]=>
  14. string(14) "202.89.233.103"
  15. [1]=>
  16. string(14) "125.88.193.243"
  17. [2]=>
  18. string(15) "115.239.211.112"
  19. }
  20. */
  21. } catch (\Exception $ex) {
  22. echo $ex;
  23. }
  24. });

我们这里实现了与Promise.all相同语义的接口,或者更复杂一些,我们也可以实现批量任务以chunk方式进行作业的接口,留待读者自己完成;

至此, 我们已经拥有了 spawn callcc race all timeout.