FutureTask与fork

在多线程代码中我们经常会遇到这种模型,将一个耗时任务, new一个新的Thread或者通常放到线程池后台执行,当前线程执行另外任务,之后通过某个api接口阻塞获取后台任务结果。

Java童鞋应该对这个概念非常熟悉——JDK给予直接支持的Future。

同样的模型我们可以利用channel对多个协程进行同步来实现,代码很简单:

  1. <?php
  2. go(function() {
  3. $start = microtime(true);
  4. $ch = chan();
  5. // 开启一个新的协程,异步执行耗时任务
  6. spawn(function() use($ch) {
  7. yield delay(1000);
  8. yield $ch->send(42); // 通知+传送结果
  9. });
  10. yield delay(500);
  11. $r = (yield $ch->recv()); // 阻塞等待结果
  12. echo $r; // 42
  13. // 我们这里两个耗时任务并发执行,总耗时约1000ms
  14. echo "cost ", microtime(true) - $start, "\n";
  15. });

事实上我们也很容易把Future模型移植过来:

  1. <?php
  2. final class FutureTask
  3. {
  4. const PENDING = 1;
  5. const DONE = 2;
  6. private $cc;
  7. public $state;
  8. public $result;
  9. public $ex;
  10. public function __construct(\Generator $gen)
  11. {
  12. $this->state = self::PENDING;
  13. $asyncTask = new AsyncTask($gen);
  14. $asyncTask->begin(function($r, $ex = null) {
  15. $this->state = self::DONE;
  16. if ($cc = $this->cc) {
  17. // 有cc,说明有call get方法挂起协程,在此处唤醒
  18. $cc($r, $ex);
  19. } else {
  20. // 无挂起,暂存执行结果
  21. $this->result = $r;
  22. $this->ex = $ex;
  23. }
  24. });
  25. }
  26. public function get()
  27. {
  28. return callcc(function($cc) use($timeout) {
  29. if ($this->state === self::DONE) {
  30. // 获取结果时,任务已经完成,同步返回结果
  31. // 这里也可以考虑用defer实现,异步返回结果,首先先释放php栈,降低内存使用
  32. $cc($this->result, $this->ex);
  33. } else {
  34. // 获取结果时未完成,保存$cc,挂起等待
  35. $this->cc = $cc;
  36. }
  37. });
  38. }
  39. }
  40. // helper
  41. function fork($task, ...$args)
  42. {
  43. $task = await($task); // 将task转换为生成器
  44. yield new FutureTask($task);
  45. }

还是刚才那个例子, 我们改写为FutureTask版本:

  1. <?php
  2. go(function() {
  3. $start = microtime(true);
  4. // fork 子协程执行耗时任务
  5. /** @var $future FutureTask */
  6. $future = (yield fork(function() {
  7. yield delay(1000);
  8. yield 42;
  9. }));
  10. yield delay(500);
  11. // 阻塞等待结果
  12. $r = (yield $future->get());
  13. echo $r; // 42
  14. // 总耗时仍旧只有1000ms
  15. echo "cost ", microtime(true) - $start, "\n";
  16. });

再进一步,我们扩充FutureTask的状态,阻塞获取结果加入超时选项:

  1. <?php
  2. final class FutureTask
  3. {
  4. const PENDING = 1;
  5. const DONE = 2;
  6. const TIMEOUT = 3;
  7. private $timerId;
  8. private $cc;
  9. public $state;
  10. public $result;
  11. public $ex;
  12. // 我们这里加入新参数,用来链接futureTask与caller父任务
  13. // 这样的好处比如可以共享父子任务上下文
  14. public function __construct(\Generator $gen, AsyncTask $parent = null)
  15. {
  16. $this->state = self::PENDING;
  17. if ($parent) {
  18. $asyncTask = new AsyncTask($gen, $parent);
  19. } else {
  20. $asyncTask = new AsyncTask($gen);
  21. }
  22. $asyncTask->begin(function($r, $ex = null) {
  23. // PENDING or TIMEOUT
  24. if ($this->state === self::TIMEOUT) {
  25. return;
  26. }
  27. // PENDING -> DONE
  28. $this->state = self::DONE;
  29. if ($cc = $this->cc) {
  30. if ($this->timerId) {
  31. swoole_timer_clear($this->timerId);
  32. }
  33. $cc($r, $ex);
  34. } else {
  35. $this->result = $r;
  36. $this->ex = $ex;
  37. }
  38. });
  39. }
  40. // 这里超时时间0为永远阻塞,
  41. // 否则超时未获取到结果,将向父任务传递超时异常
  42. public function get($timeout = 0)
  43. {
  44. return callcc(function($cc) use($timeout) {
  45. // PENDING or DONE
  46. if ($this->state === self::DONE) {
  47. $cc($this->result, $this->ex);
  48. } else {
  49. // 获取结果时未完成,保存$cc,开启定时器(如果需要),挂起等待
  50. $this->cc = $cc;
  51. $this->getResultTimeout($timeout);
  52. }
  53. });
  54. }
  55. private function getResultTimeout($timeout)
  56. {
  57. if (!$timeout) {
  58. return;
  59. }
  60. $this->timerId = swoole_timer_after($timeout, function() {
  61. assert($this->state === self::PENDING);
  62. $this->state = self::TIMEOUT;
  63. $cc = $this->cc;
  64. $cc(null, new AsyncTimeoutException());
  65. });
  66. }
  67. }

因为引入parentTask参数,需要将父任务隐式传参,而我们执行通过Syscall与执行当前生成器的父任务交互,所以我们重写fork辅助函数,改用Syscall实现:

  1. <?php
  2. /**
  3. * @param $task
  4. * @return Syscall
  5. */
  6. function fork($task)
  7. {
  8. $task = await($task);
  9. return new Syscall(function(AsyncTask $parent) use($task) {
  10. return new FutureTask($task, $parent);
  11. });
  12. }

下面看一些关于超时的示例:

  1. <?php
  2. go(function() {
  3. $start = microtime(true);
  4. /** @var $future FutureTask */
  5. $future = (yield fork(function() {
  6. yield delay(500);
  7. yield 42;
  8. }));
  9. // 阻塞等待超时,捕获到超时异常
  10. try {
  11. $r = (yield $future->get(100));
  12. var_dump($r);
  13. } catch (\Exception $ex) {
  14. echo "get result timeout\n";
  15. }
  16. yield delay(1000);
  17. // 因为我们只等待子任务100ms,我们的总耗时只有 1100ms
  18. echo "cost ", microtime(true) - $start, "\n";
  19. });
  20. go(function() {
  21. $start = microtime(true);
  22. /** @var $future FutureTask */
  23. $future = (yield fork(function() {
  24. yield delay(500);
  25. yield 42;
  26. throw new \Exception();
  27. }));
  28. yield delay(1000);
  29. // 子任务500ms前发生异常,已经处于完成状态
  30. // 我们调用get会当即引发异常
  31. try {
  32. $r = (yield $future->get());
  33. var_dump($r);
  34. } catch (\Exception $ex) {
  35. echo "something wrong in child task\n";
  36. }
  37. // 因为耗时任务并发执行,这里总耗时仅1000ms
  38. echo "cost ", microtime(true) - $start, "\n";
  39. });