Task

EasySwoole3.3.0+异步任务放弃了Swoole的原生task,采用独立组件实现实现。 相对于原生swoole taskeasyswoole/task组件实现了以下功能:

  • 可以投递闭包任务
  • 可以在TaskWorker等其他自定义进程继续投递任务
  • 实现任务限流与状态监控

安装

composer require easyswoole/task

框架中使用

同步调用:

  1. \EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function (){
  2. echo 'sync';
  3. });

异步调用:

  1. \EasySwoole\EasySwoole\Task\TaskManager::getInstance()->async(function (){
  2. echo 'async';
  3. },function ($reply,$taskId,$workerIndex){
  4. // $reply 返回的执行结果
  5. // $taskId 任务id
  6. echo 'async success';
  7. });

由于php本身就不能序列化闭包,该闭包投递是通过反射该闭包函数,获取php代码直接序列化php代码,然后直接eval代码实现的。
所以投递闭包无法使用外部的对象引用,以及资源句柄,复杂任务请使用任务模板方法。

任务模版

自定义一个任务模版

  1. <?php
  2. namespace App\Task;
  3. use EasySwoole\Task\AbstractInterface\TaskInterface;
  4. class CustomTask implements TaskInterface
  5. {
  6. protected $data;
  7. public function __construct($data)
  8. {
  9. // 保存投递过来的数据
  10. $this->data = $data;
  11. }
  12. public function run(int $taskId, int $workerIndex)
  13. {
  14. // 执行逻辑
  15. }
  16. public function onException(\Throwable $throwable, int $taskId, int $workerIndex)
  17. {
  18. // 异常处理
  19. }
  20. }

如何使用

  1. $task = \EasySwoole\EasySwoole\Task\TaskManager::getInstance();
  2. $task->async(new CustomTask(['user'=>'custom']));
  3. $data = $task->sync(new CustomTask(['user'=>'custom']));

投递返回值

  • >0 投递成功(异步任务专属,返回taskId,同步任务直接返回return值)
  • -1 task进程繁忙,投递失败(已经到达最大运行数量maxRunningNum)
  • -2 投递数据解包失败,当投递数据传输时数据异常时会报错,此错误为组件底层错误,一般不会出现
  • -3 任务出错(该任务执行时出现异常错误,被组件拦截并输出错误)

独立使用

该组件可独立使用,代码如下:

  1. use EasySwoole\Task\Config;
  2. use EasySwoole\Task\Task;
  3. /*
  4. 配置项中可以修改工作进程数、临时目录,进程名,最大并发执行任务数,异常回调等
  5. */
  6. $config = new Config();
  7. $task = new Task($config);
  8. //添加swoole 服务
  9. $http = new \Swoole\Http\Server("0.0.0.0", 9501);
  10. //注入swoole服务,进行创建task进程
  11. $task->attachToServer($http);
  12. //在onrequest事件中调用task(其他地方也可以,这只是示例)
  13. $http->on("request", function (Swoole\Http\Request $request, $response)use($task){
  14. if(isset($request->get['sync'])){
  15. //同步调用task
  16. $ret = $task->sync(function ($taskId,$workerIndex){
  17. return "{$taskId}.{$workerIndex}";
  18. });
  19. $response->end("sync result ".$ret);
  20. }else if(isset($request->get['status'])) {
  21. var_dump($task->status());
  22. }else{
  23. //异步调用task
  24. $id = $task->async(function ($taskId,$workerIndex){
  25. \co::sleep(1);
  26. var_dump("async id {$taskId} task run");
  27. });
  28. $response->end("async id {$id} ");
  29. }
  30. });
  31. //启动服务
  32. $http->start();

版本强调

框架低版本升级为EasySwoole3.3.0+,需要手动进行配置修改。
需要删除MAIN_SERVER.SETTING.task_worker_numMAIN_SERVER.SETTING.task_enable_coroutine配置项。
请在MAIN_SERVER配置项中,增加TASK子配置项:

  1. [
  2. 'MAIN_SERVER' => [
  3. 'TASK'=>[
  4. 'workerNum'=>4,
  5. 'maxRunningNum'=>128,
  6. 'timeout'=>15
  7. ],
  8. ],
  9. ];

Task管理

查看所有Task进程的状态

php easyswoole task status