Task

现阶段 Swoole 暂时没有办法 hook 所有的阻塞函数,也就意味着有些函数仍然会导致 进程阻塞,从而影响协程的调度,此时我们可以通过使用 Task 组件来模拟协程处理,从而达到不阻塞进程调用阻塞函数的目的,本质上是仍是是多进程运行阻塞函数,所以性能上会明显地不如原生协程,具体取决于 Task Worker 的数量。

安装

  1. composer require hyperf/task

配置

因为 Task 并不是默认组件,所以在使用的时候需要在 server.php 增加 Task 相关的配置。

  1. <?php
  2. declare(strict_types=1);
  3. use Hyperf\Server\SwooleEvent;
  4. return [
  5. // 这里省略了其它不相关的配置项
  6. 'settings' => [
  7. // Task Worker 数量,根据您的服务器配置而配置适当的数量
  8. 'task_worker_num' => 8,
  9. // 因为 `Task` 主要处理无法协程化的方法,所以这里推荐设为 `false`,避免协程下出现数据混淆的情况
  10. 'task_enable_coroutine' => false,
  11. ],
  12. 'callbacks' => [
  13. // Task callbacks
  14. SwooleEvent::ON_TASK => [Hyperf\Framework\Bootstrap\TaskCallback::class, 'onTask'],
  15. SwooleEvent::ON_FINISH => [Hyperf\Framework\Bootstrap\FinishCallback::class, 'onFinish'],
  16. ],
  17. ];

使用

Task 组件提供了 主动方法投递注解投递 两种使用方法。

主动方法投递

  1. <?php
  2. use Hyperf\Utils\Coroutine;
  3. use Hyperf\Utils\ApplicationContext;
  4. use Hyperf\Task\TaskExecutor;
  5. use Hyperf\Task\Task;
  6. class MethodTask
  7. {
  8. public function handle($cid)
  9. {
  10. return [
  11. 'worker.cid' => $cid,
  12. // task_enable_coroutine 为 false 时返回 -1,反之 返回对应的协程 ID
  13. 'task.cid' => Coroutine::id(),
  14. ];
  15. }
  16. }
  17. $container = ApplicationContext::getContainer();
  18. $exec = $container->get(TaskExecutor::class);
  19. $result = $exec->execute(new Task([MethodTask::class, 'handle'], [Coroutine::id()]));

使用注解

通过 主动方法投递 时,并不是特别直观,这里我们实现了对应的 @Task 注解,并通过 AOP 重写了方法调用。当在 Worker 进程时,自动投递到 Task 进程,并协程等待 数据返回。

  1. <?php
  2. use Hyperf\Utils\Coroutine;
  3. use Hyperf\Utils\ApplicationContext;
  4. use Hyperf\Task\Annotation\Task;
  5. class AnnotationTask
  6. {
  7. /**
  8. * @Task
  9. */
  10. public function handle($cid)
  11. {
  12. return [
  13. 'worker.cid' => $cid,
  14. // task_enable_coroutine=false 时返回 -1,反之 返回对应的协程 ID
  15. 'task.cid' => Coroutine::id(),
  16. ];
  17. }
  18. }
  19. $container = ApplicationContext::getContainer();
  20. $task = $container->get(AnnotationTask::class);
  21. $result = $task->handle(Coroutine::id());

使用 @Task 注解时需 use Hyperf\Task\Annotation\Task;

附录

Swoole 暂时没有协程化的函数列表

  • mysql,底层使用 libmysqlclient, 不推荐使用, 推荐使用已经实现协程化的 pod_mysql/mysqli
  • curl,底层使用 libcurl,在 Swoole 4.4 后底层进行了协程化(beta)
  • mongo,底层使用 mongo-c-client
  • pdo_pgsql
  • pdo_ori
  • pdo_odbc
  • pdo_firebird

MongoDB

因为 MongoDB 没有办法被 hook,所以我们可以通过 Task 来调用,下面就简单介绍一下如何通过注解方式调用 MongoDB

以下我们实现两个方法 insertquery,其中需要注意的是 manager 方法不能使用 Task,因为 Task 会在对应的 Task 进程 中处理,然后将数据从 Task 进程 返回到 Worker 进程 。所以 Task 方法 的入参和出参最好不要携带任何 IO,比如返回一个实例化后的 Redis 等等。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Task;
  4. use Hyperf\Task\Annotation\Task;
  5. use MongoDB\Driver\BulkWrite;
  6. use MongoDB\Driver\Manager;
  7. use MongoDB\Driver\Query;
  8. use MongoDB\Driver\WriteConcern;
  9. class MongoTask
  10. {
  11. /**
  12. * @var Manager
  13. */
  14. public $manager;
  15. /**
  16. * @Task
  17. */
  18. public function insert(string $namespace, array $document)
  19. {
  20. $writeConcern = new WriteConcern(WriteConcern::MAJORITY, 1000);
  21. $bulk = new BulkWrite();
  22. $bulk->insert($document);
  23. $result = $this->manager()->executeBulkWrite($namespace, $bulk, $writeConcern);
  24. return $result->getUpsertedCount();
  25. }
  26. /**
  27. * @Task
  28. */
  29. public function query(string $namespace, array $filter = [], array $options = [])
  30. {
  31. $query = new Query($filter, $options);
  32. $cursor = $this->manager()->executeQuery($namespace, $query);
  33. return $cursor->toArray();
  34. }
  35. protected function manager()
  36. {
  37. if ($this->manager instanceof Manager) {
  38. return $this->manager;
  39. }
  40. $uri = 'mongodb://127.0.0.1:27017';
  41. return $this->manager = new Manager($uri, []);
  42. }
  43. }

使用如下

<?php
use App\Task\MongoTask;
use Hyperf\Utils\ApplicationContext;

$client = ApplicationContext::getContainer()->get(MongoTask::class);
$client->insert('hyperf.test', ['id' => rand(0, 99999999)]);

$result = $client->query('hyperf.test', [], [
    'sort' => ['id' => -1],
    'limit' => 5,
]);