channel演示

这是我们最终得到的接口:

  1. <?php
  2. function go(...$args)
  3. {
  4. spawn(...$args);
  5. }
  6. function chan($n = 0)
  7. {
  8. if ($n === 0) {
  9. return new Channel();
  10. } else {
  11. return new BufferChannel($n);
  12. }
  13. }

第一个典型例子, PINGPONG。

与golang的channel类似,我们可以在两个semicoroutine之间做同步:

  1. <?php
  2. // 构建两个单向channel, 我们只单向收发数据
  3. $pingCh = chan();
  4. $pongCh = chan();
  5. go(function() use($pingCh, $pongCh) {
  6. while (true) {
  7. echo (yield $pingCh->recv());
  8. yield $pongCh->send("PONG\n");
  9. // 递归调度器实现,需要引入异步的方法退栈,否则Stack Overflow...
  10. // 或者考虑将send或者recv以defer方式实现
  11. yield async_sleep(1);
  12. }
  13. });
  14. go(function() use($pingCh, $pongCh) {
  15. while (true) {
  16. echo (yield $pongCh->recv());
  17. yield $pingCh->send("PING\n");
  18. yield async_sleep(1);
  19. }
  20. });
  21. // start up
  22. go(function() use($pingCh) {
  23. echo "start up\n";
  24. yield $pingCh->send("PING");
  25. });
  26. // output:
  27. /*
  28. start up
  29. PING
  30. PONG
  31. PING
  32. PONG
  33. PING
  34. PONG
  35. PING
  36. ...
  37. */

当然,我们可以很轻易构建一个生产者-消费者模型:

  1. <?php
  2. $ch = chan();
  3. // 生产者1
  4. go(function() use($ch) {
  5. while (true) {
  6. yield $ch->send("producer 1");
  7. yield async_sleep(1000);
  8. }
  9. });
  10. // 生产者2
  11. go(function() use($ch) {
  12. while (true) {
  13. yield $ch->send("producer 2");
  14. yield async_sleep(1000);
  15. }
  16. });
  17. // 消费者1
  18. go(function() use($ch) {
  19. while (true) {
  20. $recv = (yield $ch->recv());
  21. echo "consumer1: $recv\n";
  22. }
  23. });
  24. // 消费者2
  25. go(function() use($ch) {
  26. while (true) {
  27. $recv = (yield $ch->recv());
  28. echo "consumer2: $recv\n";
  29. }
  30. });
  31. // output:
  32. /*
  33. consumer1 recv from producer 1
  34. consumer1 recv from producer 2
  35. consumer1 recv from producer 1
  36. consumer2 recv from producer 2
  37. consumer1 recv from producer 2
  38. consumer2 recv from producer 1
  39. consumer1 recv from producer 1
  40. consumer2 recv from producer 2
  41. consumer1 recv from producer 2
  42. consumer2 recv from producer 1
  43. consumer1 recv from producer 1
  44. consumer2 recv from producer 2
  45. ......
  46. */

channel 自身是first-class value, 所以可传递:

  1. <?php
  2. // 我们通过一个chan来发送另一个chan
  3. // 然后等待接收到这个chan的semicoroutine回送数据
  4. $ch = chan();
  5. go(function() use ($ch) {
  6. $anotherCh = chan();
  7. yield $ch->send($anotherCh);
  8. echo "send another channel\n";
  9. yield $anotherCh->send("HELLO");
  10. echo "send hello through another channel\n";
  11. });
  12. go(function() use($ch) {
  13. /** @var Channel $anotherCh */
  14. $anotherCh = (yield $ch->recv());
  15. echo "recv another channel\n";
  16. $val = (yield $anotherCh->recv());
  17. echo $val, "\n";
  18. });
  19. // output:
  20. /*
  21. send another channel
  22. recv another channel
  23. send hello through another channel
  24. HELLO
  25. */

我们通过控制channel缓存大小 观察输出结果:

  1. <?php
  2. $ch = chan($n);
  3. go(function() use($ch) {
  4. $recv = (yield $ch->recv());
  5. echo "recv $recv\n";
  6. $recv = (yield $ch->recv());
  7. echo "recv $recv\n";
  8. $recv = (yield $ch->recv());
  9. echo "recv $recv\n";
  10. $recv = (yield $ch->recv());
  11. echo "recv $recv\n";
  12. });
  13. go(function() use($ch) {
  14. yield $ch->send(1);
  15. echo "send 1\n";
  16. yield $ch->send(2);
  17. echo "send 2\n";
  18. yield $ch->send(3);
  19. echo "send 3\n";
  20. yield $ch->send(4);
  21. echo "send 4\n";
  22. });
  23. // $n = 1;
  24. // output:
  25. /*
  26. send 1
  27. recv 1
  28. send 2
  29. recv 2
  30. send 3
  31. recv 3
  32. send 4
  33. recv 4
  34. */
  35. // $n = 2;
  36. // output:
  37. /*
  38. send 1
  39. send 2
  40. recv 1
  41. recv 2
  42. send 3
  43. send 4
  44. recv 3
  45. recv 4
  46. */
  47. // $n = 3;
  48. // output:
  49. /*
  50. send 1
  51. send 2
  52. send 3
  53. recv 1
  54. recv 2
  55. recv 3
  56. send 4
  57. recv 4
  58. */

一个更具体的生产者消费者的例子:

  1. <?php
  2. // 缓存两个结果
  3. $ch = chan(2);
  4. // 从channel接口请求写过写文件
  5. go(function() use($ch) {
  6. $file = new AsyncFile("path/to/save");
  7. while (true) {
  8. list($host, $status) = (yield $ch->recv());
  9. yield $file->write("$host: $status\n");
  10. }
  11. });
  12. // 请求并写入chan
  13. go(function() use($ch) {
  14. while (true) {
  15. $host = "www.baidu.com";
  16. $resp = (yield async_curl_get($host));
  17. yield $ch->send([$host, $resp->statusCode]);
  18. }
  19. });
  20. // 请求并写入chan
  21. go(function() use($ch) {
  22. while (true) {
  23. $host = "www.bing.com";
  24. $resp = (yield async_curl_get($host));
  25. yield $ch->send([$host, $resp->statusCode]);
  26. }
  27. });
  28. // output:

channel的发送与接受没有超时机制,Golang可以select多个chan实现超时处理,我们可以做一个select设施,或者在send于recv接受直接添加超时参数,扩展接口功能,留待读者自行实现。