如何实现队列消费/自定义进程

可能我们会经常遇见需要不断消费队列内内容的场景,我们以EasySwoole中自定义进程的方式,来实现这一功能。

实现代码

定义消费进程逻辑

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Tioncico
  5. * Date: 2018/10/18 0018
  6. * Time: 9:43
  7. */
  8. namespace App\Process;
  9. use EasySwoole\Component\Process\AbstractProcess;
  10. use Swoole\Process;
  11. class Consumer extends AbstractProcess
  12. {
  13. private $isRun = false;
  14. public function run($arg)
  15. {
  16. // TODO: Implement run() method.
  17. /*
  18. * 举例,消费redis中的队列数据
  19. * 定时500ms检测有没有任务,有的话就while死循环执行
  20. */
  21. $this->addTick(500,function (){
  22. if(!$this->isRun){
  23. $this->isRun = true;
  24. $redis = new \redis();//此处为伪代码,请自己建立连接或者维护redis连接
  25. while (true){
  26. try{
  27. $task = $redis->lPop('task_list');
  28. if($task){
  29. // do you task
  30. }else{
  31. break;
  32. }
  33. }catch (\Throwable $throwable){
  34. break;
  35. }
  36. }
  37. $this->isRun = false;
  38. }
  39. var_dump($this->getProcessName().' task run check');
  40. });
  41. }
  42. public function onShutDown()
  43. {
  44. // TODO: Implement onShutDown() method.
  45. }
  46. public function onReceive(string $str, ...$args)
  47. {
  48. // TODO: Implement onReceive() method.
  49. }
  50. }

注册消费进程

在EasySwoole的全局事件中,注册消费进程。

  1. <?php
  2. use App\Consumer;
  3. use EasySwoole\EasySwoole\ServerManager;
  4. public static function mainServerCreate(EventRegister $register)
  5. {
  6. $allNum = 3;
  7. for ($i = 0 ;$i < $allNum;$i++){
  8. ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
  9. }
  10. }

爬虫例子:https://github.com/HeKunTong/easyswoole3_demo