无缓存channel

我们首先实现无缓存的Channel:

  1. <?php
  2. class Channel
  3. {
  4. // 因为同一个channel可能有多个接收者,使用队列实现,保证调度均衡
  5. // 队列内保存的是被阻塞的接收者协程的控制流,即call/cc的参数,我们模拟的continuation
  6. public $recvQ;
  7. // 发送者队列逻辑相同
  8. public $sendQ;
  9. public function __construct()
  10. {
  11. $this->recvQ = new \SplQueue();
  12. $this->sendQ = new \SplQueue();
  13. }
  14. public function send($val)
  15. {
  16. return callcc(function($cc) use($val) {
  17. if ($this->recvQ->isEmpty()) {
  18. // 当chan没有接收者,发送者协程挂起(将$cc入列,不调用$cc回送数据)
  19. $this->sendQ->enqueue([$cc, $val]);
  20. } else {
  21. // 当chan对端有接收者,将挂起接收者协程出列,
  22. // 调用接收者$recvCc发送数据,运行接收者协程后继代码
  23. // 执行完毕或者遇到Async挂起,$recvCc()调用返回,
  24. // 调用$cc(),控制流回到发送者协程
  25. $recvCc = $this->recvQ->dequeue();
  26. $recvCc($val, null);
  27. $cc(null, null);
  28. }
  29. });
  30. }
  31. public function recv()
  32. {
  33. return callcc(function($cc) {
  34. if ($this->sendQ->isEmpty()) {
  35. // 当chan没有发送者,接收者协程挂起(将$cc入列)
  36. $this->recvQ->enqueue($cc);
  37. } else {
  38. // 当chan对端有发送者,将挂起发送者协程与待发送数据出列
  39. // 调用发送者$sendCc发送数据,运行发送者协程后继代码
  40. // 执行完毕或者遇到Async挂起,$sendCc()调用返回,
  41. // 调用$cc(),控制流回到接收者协程
  42. list($sendCc, $val) = $this->sendQ->dequeue();
  43. $sendCc(null, null);
  44. $cc($val, null);
  45. }
  46. });
  47. }
  48. }