缓存channel

接下来我们来实现带缓存的Channel:

Sends to a buffered channel block only when the buffer is full.

Receives block when the buffer is empty.

  1. <?php
  2. class BufferChannel
  3. {
  4. // 缓存容量
  5. public $cap;
  6. // 缓存
  7. public $queue;
  8. // 同无缓存Channel
  9. public $recvCc;
  10. // 同无缓存Channel
  11. public $sendCc;
  12. public function __construct($cap)
  13. {
  14. assert($cap > 0);
  15. $this->cap = $cap;
  16. $this->queue = new \SplQueue();
  17. $this->sendCc = new \SplQueue();
  18. $this->recvCc = new \SplQueue();
  19. }
  20. public function recv()
  21. {
  22. return callcc(function($cc) {
  23. if ($this->queue->isEmpty()) {
  24. // 当无数据可接收时,$cc入列,让出控制流,挂起接收者协程
  25. $this->recvCc->enqueue($cc);
  26. } else {
  27. // 当有数据可接收时,先接收数据,然后恢复控制流
  28. $val = $this->queue->dequeue();
  29. $this->cap++;
  30. $cc($val, null);
  31. }
  32. // 递归唤醒其他被阻塞的发送者与接收者收发数据,注意顺序
  33. $this->recvPingPong();
  34. });
  35. }
  36. public function send($val)
  37. {
  38. return callcc(function($cc) use($val) {
  39. if ($this->cap > 0) {
  40. // 当缓存未满,发送数据直接加入缓存,然后恢复控制流
  41. $this->queue->enqueue($val);
  42. $this->cap--;
  43. $cc(null, null);
  44. } else {
  45. // 当缓存满,发送者控制流与发送数据入列,让出控制流,挂起发送者协程
  46. $this->sendCc->enqueue([$cc, $val]);
  47. }
  48. // 递归唤醒其他被阻塞的接收者与发送者收发数据,注意顺序
  49. $this->sendPingPong();
  50. // 如果全部代码都为同步,防止多个发送者时,数据全部来自某个发送者
  51. // 应该把sendPingPong 修改为异步执行 defer([$this, "sendPingPong"]);
  52. // 但是swoole本身的defer实现有bug,除非把defer 实现为swoole_timer_after(1, ...)
  53. // recvPingPong 同理
  54. });
  55. }
  56. public function recvPingPong()
  57. {
  58. // 当有阻塞的发送者,唤醒其发送数据
  59. if (!$this->sendCc->isEmpty() && $this->cap > 0) {
  60. list($sendCc, $val) = $this->sendCc->dequeue();
  61. $this->queue->enqueue($val);
  62. $this->cap--;
  63. $sendCc(null, null);
  64. // 当有阻塞的接收者,唤醒其接收数据
  65. if (!$this->recvCc->isEmpty() && !$this->queue->isEmpty()) {
  66. $recvCc = $this->recvCc->dequeue();
  67. $val = $this->queue->dequeue();
  68. $this->cap++;
  69. $recvCc($val);
  70. $this->recvPingPong();
  71. }
  72. }
  73. }
  74. public function sendPingPong()
  75. {
  76. // 当有阻塞的接收者,唤醒其接收数据
  77. if (!$this->recvCc->isEmpty() && !$this->queue->isEmpty()) {
  78. $recvCc = $this->recvCc->dequeue();
  79. $val = $this->queue->dequeue();
  80. $this->cap++;
  81. $recvCc($val);
  82. // 当有阻塞的发送者,唤醒其发送数据
  83. if (!$this->sendCc->isEmpty() && $this->cap > 0) {
  84. list($sendCc, $val) = $this->sendCc->dequeue();
  85. $this->queue->enqueue($val);
  86. $this->cap--;
  87. $sendCc(null, null);
  88. $this->sendPingPong();
  89. }
  90. }
  91. }
  92. }