自定义队列

实现EasySwoole\Queue\QueueDriverInterface;接口,以组件默认的fast-cache queue为例

  1. namespace EasySwoole\Spider\Queue;
  2. use EasySwoole\FastCache\Cache;
  3. use EasySwoole\Queue\QueueDriverInterface;
  4. use EasySwoole\Queue\Job;
  5. class FastCacheQueue implements QueueDriverInterface
  6. {
  7. private const FASTCACHE_JOB_QUEUE_KEY='FASTCACHE_JOB_QUEUE_KEY';
  8. function pop(float $timeout = 3):?Job
  9. {
  10. // TODO: Implement pop() method.
  11. $job = Cache::getInstance()->deQueue(self::FASTCACHE_JOB_QUEUE_KEY);
  12. if (empty($job)) {
  13. return null;
  14. }
  15. $job = unserialize($job);
  16. if (empty($job)) {
  17. return null;
  18. }
  19. return $job;
  20. }
  21. function push(Job $job):bool
  22. {
  23. // TODO: Implement push() method.
  24. $res = Cache::getInstance()->enQueue(self::FASTCACHE_JOB_QUEUE_KEY, serialize($job));
  25. if (empty($res)) {
  26. return false;
  27. }
  28. return true;
  29. }
  30. public function size(): ?int
  31. {
  32. // TODO: Implement size() method.
  33. }
  34. }

分布式

使用组件自带的redis通信或自定义通信方式,即可实现