Queue 介绍

原理

EasySwoole 封装实现了一个轻量级的队列,默认使用 Redis 作为队列驱动器。

用户可以自己实现一个队列驱动器来实现队列,用 kafka 作为队列驱动器或者 其他驱动器方式 作为队列驱动器,来进行存储。

从上可知,Queue 并不是一个单独使用的组件,它更像一个对不同驱动的队列进行统一封装的门面组件。

Queue 组件当前最新稳定版本为 3.x。

旧版本 (2.1.x) 的 Queue 组件的使用,请看 Queue 2.1.x

组件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安装方法

composer require easyswoole/queue 3.x

仓库地址

easyswoole/queue 3.x

基本使用

默认自带的队列驱动为 Redis 队列。这里简单列举 2 种用户可使用的方式:

  • 在框架的任意位置进行生产和消费队列任务。
  • 在框架的任意位置进行生产队列任务, 然后在自定义进程中进行消费任务。

在框架中进行生产和消费任务

创建队列

  1. use EasySwoole\Queue\Driver\RedisQueue;
  2. use EasySwoole\Queue\Job;
  3. use EasySwoole\Queue\Queue;
  4. use EasySwoole\Redis\Config\RedisConfig;
  5. // 配置 Redis 队列驱动器
  6. $redisConfig = new RedisConfig([
  7. 'host' => '127.0.0.1', // 服务端地址 默认为 '127.0.0.1'
  8. 'port' => 6379, // 端口 默认为 6379
  9. 'auth' => '', // 密码 默认为 不设置
  10. 'db' => 0, // 默认为 0 号库
  11. ]);
  12. // 创建队列
  13. $queue = new Queue(new RedisQueue($redisConfig));

普通生产任务

$queue 为上述创建队列中得到的队列对象。

  1. // 创建任务
  2. $job = new Job();
  3. // 设置任务数据
  4. $job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
  5. // 生产普通任务
  6. $queue->producer()->push($job);

普通消费任务

$queue 为上述创建队列中得到的队列对象。

  1. // 消费任务
  2. $job = $queue->consumer()->pop();
  3. // 或者是自定义进程中消费任务(具体使用请看下文自定义进程消费任务完整使用示例)
  4. $queue->consumer()->listen(function (Job $job){
  5. var_dump($job);
  6. });

生产延迟任务

$queue 为上述创建队列中得到的队列对象。

  1. // 创建任务
  2. $job = new Job();
  3. // 设置任务数据
  4. $job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
  5. // 设置任务延后执行时间
  6. $job->setDelayTime(5);
  7. // 生产延迟任务
  8. $queue->producer()->push($job);

生产可信任务

  1. // 创建任务
  2. $job = new Job();
  3. // 设置任务数据
  4. $job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
  5. // 设置任务重试次数为 3 次。任务如果没有确认,则会执行三次
  6. $job->setRetryTimes(3);
  7. // 如果5秒内没确认任务,会重新回到队列。默认为3秒
  8. $job->setWaitConfirmTime(5);
  9. // 投递任务
  10. $queue->producer()->push($job);
  11. // 确认一个任务
  12. $queue->consumer()->confirm($job);

完整使用示例

以在 http 服务中为例,使用示例代码如下:

  1. <?php
  2. namespace App\HttpController;
  3. use App\Utility\MyQueue;
  4. use EasySwoole\Http\AbstractInterface\Controller;
  5. use EasySwoole\Http\Message\Status;
  6. use EasySwoole\Queue\Driver\RedisQueue;
  7. use EasySwoole\Queue\Job;
  8. use EasySwoole\Queue\Queue;
  9. use EasySwoole\Redis\Config\RedisConfig;
  10. class Index extends Controller
  11. {
  12. // 创建队列
  13. public function createQueue()
  14. {
  15. // 配置 Redis 队列驱动器
  16. $redisConfig = new RedisConfig([
  17. 'host' => '127.0.0.1', // 服务端地址 默认为 '127.0.0.1'
  18. 'port' => 6379, // 端口 默认为 6379
  19. 'auth' => '', // 密码 默认为 不设置
  20. 'db' => 0, // 默认为 0 号库
  21. ]);
  22. // 创建队列
  23. $queue = new Queue(new RedisQueue($redisConfig, 'easyswoole_queue'));
  24. return $queue;
  25. }
  26. // 生产普通任务
  27. public function producer1()
  28. {
  29. // 获取队列
  30. $queue = $this->createQueue();
  31. // 创建任务
  32. $job = new Job();
  33. // 设置任务数据
  34. $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
  35. var_dump('producer1 => ');
  36. var_dump($job->getJobData());
  37. // 生产普通任务
  38. $produceRes = $queue->producer()->push($job);
  39. if (!$produceRes) {
  40. $this->writeJson(Status::CODE_OK, [], '队列生产普通任务失败!');
  41. } else {
  42. $this->writeJson(Status::CODE_OK, [], '队列生产普通任务成功!');
  43. }
  44. }
  45. // 生产延迟任务
  46. public function producer2()
  47. {
  48. // 获取队列
  49. $queue = $this->createQueue();
  50. // 创建任务
  51. $job = new Job();
  52. // 设置任务数据
  53. $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
  54. // 设置任务延后执行时间
  55. $job->setDelayTime(5);
  56. var_dump('producer2 => ');
  57. var_dump($job->getJobData());
  58. // 生产延迟任务
  59. $produceRes = $queue->producer()->push($job);
  60. if (!$produceRes) {
  61. $this->writeJson(Status::CODE_OK, [], '队列生产延迟任务失败!');
  62. } else {
  63. $this->writeJson(Status::CODE_OK, [], '队列生产延迟任务成功!');
  64. }
  65. }
  66. // 生产可信任务
  67. public function producer3()
  68. {
  69. // 获取队列
  70. $queue = $this->createQueue();
  71. // 创建任务
  72. $job = new Job();
  73. // 设置任务数据
  74. $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
  75. var_dump('producer3 => ');
  76. var_dump($job->getJobData());
  77. // 设置任务重试次数为 3 次。任务如果没有确认,则会执行三次
  78. $job->setRetryTimes(3);
  79. // 如果5秒内没确认任务,会重新回到队列。默认为3秒
  80. $job->setWaitConfirmTime(5);
  81. // 投递任务
  82. $queue->producer()->push($job);
  83. // 确认一个任务
  84. $queue->consumer()->confirm($job);
  85. }
  86. // 消费任务
  87. public function consumer()
  88. {
  89. // 获取队列
  90. $queue = $this->createQueue();
  91. ### 消费任务
  92. // 获取到需要消费的任务
  93. $job = $queue->consumer()->pop();
  94. if (!$job) {
  95. $this->writeJson(Status::CODE_OK, [], '没有队列任务需要消费了!');
  96. return false;
  97. }
  98. // 获取需要消费的任务的数据
  99. $jobData = $job->getJobData();
  100. var_dump($jobData);
  101. }
  102. }

在框架中生产任务和自定义进程中消费任务

  • 注册队列驱动器
  • 设置消费进程
  • 生产者投递任务

定义一个队列

  1. <?php
  2. namespace App\Utility;
  3. use EasySwoole\Component\Singleton;
  4. use EasySwoole\Queue\Queue;
  5. class MyQueue extends Queue
  6. {
  7. use Singleton;
  8. }

定义消费进程

  1. <?php
  2. namespace App\Utility;
  3. use EasySwoole\Component\Process\AbstractProcess;
  4. use EasySwoole\Queue\Job;
  5. class QueueProcess extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function (){
  10. MyQueue::getInstance()->consumer()->listen(function (Job $job){
  11. var_dump($job->getJobData());
  12. });
  13. });
  14. }
  15. }

支持多进程、多协程消费

注册队列驱动器、消费进程及设置生产者投递任务

  1. <?php
  2. namespace EasySwoole\EasySwoole;
  3. use App\Utility\MyQueue;
  4. use App\Utility\QueueProcess;
  5. use EasySwoole\Component\Timer;
  6. use EasySwoole\EasySwoole\AbstractInterface\Event;
  7. use EasySwoole\EasySwoole\Swoole\EventRegister;
  8. use EasySwoole\Queue\Job;
  9. class EasySwooleEvent implements Event
  10. {
  11. public static function initialize()
  12. {
  13. date_default_timezone_set('Asia/Shanghai');
  14. }
  15. public static function mainServerCreate(EventRegister $register)
  16. {
  17. // redis pool 使用请看 redis 章节文档
  18. $redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
  19. [
  20. 'host' => '127.0.0.1', // 服务端地址 默认为 '127.0.0.1'
  21. 'port' => 6379, // 端口 默认为 6379
  22. 'auth' => '', // 密码 默认为 不设置
  23. 'db' => 0, // 默认为 0 号库
  24. ]
  25. );
  26. // 配置 队列驱动器
  27. $driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
  28. MyQueue::getInstance($driver);
  29. // 注册一个消费进程
  30. $processConfig = new \EasySwoole\Component\Process\Config([
  31. 'processName' => 'QueueProcess', // 设置 自定义进程名称
  32. 'processGroup' => 'Queue', // 设置 自定义进程组名称
  33. 'enableCoroutine' => true, // 设置 自定义进程自动开启协程
  34. ]);
  35. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
  36. // 模拟生产者,可以在任意位置投递
  37. $register->add($register::onWorkerStart, function ($server, $id) {
  38. if ($id == 0) {
  39. Timer::getInstance()->loop(3000, function () {
  40. $job = new Job();
  41. $job->setJobData(['time' => \time()]);
  42. MyQueue::getInstance()->producer()->push($job);
  43. });
  44. }
  45. });
  46. }
  47. }

进程安全退出问题请看 自定义进程 章节

进阶使用

我们可以自定义驱动,实现 RabbitMQKafka 等消费队列软件的封装。

用户需要定义类,并实现 \EasySwoole\Queue\QueueDriverInterface 接口的几个方法即可。该接口的详细实现请看下文。

QueueDriverInterface 接口类实现

  1. <?php
  2. namespace EasySwoole\Queue;
  3. interface QueueDriverInterface
  4. {
  5. public function push(Job $job,float $timeout = 3.0): bool;
  6. public function pop(float $timeout = 3.0, array $params = []): ?Job;
  7. public function info(): ?array;
  8. public function confirm(Job $job,float $timeout = 3.0): bool;
  9. }

组件自带的 Redis 队列驱动器实现

  1. <?php
  2. namespace EasySwoole\Queue\Driver;
  3. use EasySwoole\Queue\Job;
  4. use EasySwoole\Queue\QueueDriverInterface;
  5. use EasySwoole\Redis\Config\RedisConfig;
  6. use EasySwoole\Redis\Redis;
  7. use EasySwoole\RedisPool\Pool;
  8. use Swoole\Coroutine;
  9. class RedisQueue implements QueueDriverInterface
  10. {
  11. protected $pool;
  12. protected $queueName;
  13. protected $lastCheckDelay = null;
  14. public function __construct(RedisConfig $config,string $queueName = 'es_q')
  15. {
  16. $this->pool = new Pool($config);
  17. $this->queueName = $queueName;
  18. }
  19. public function push(Job $job,float $timeout = 3.0): bool
  20. {
  21. if($job->getDelayTime() > 0){
  22. return $this->pool->invoke(function ($redis)use($job){
  23. /** @var $redis Redis */
  24. return $redis->zAdd("{$this->queueName}_d",time() + $job->getDelayTime(),serialize($job));
  25. },$timeout);
  26. }else{
  27. return $this->pool->invoke(function($redis)use($job){
  28. /** @var $redis Redis */
  29. return $redis->rPush($this->queueName,serialize($job));
  30. },$timeout);
  31. }
  32. }
  33. public function pop(float $timeout = 3.0, array $params = []): ?Job
  34. {
  35. // 检查当前秒数的延迟任务是否存在未执行任务。
  36. if($this->lastCheckDelay != time()){
  37. $this->lastCheckDelay = time();
  38. Coroutine::create(function ()use($timeout){
  39. $this->pool->invoke(function ($redis){
  40. /** @var $redis Redis */
  41. $list = $redis->zCount("{$this->queueName}_d",0,$this->lastCheckDelay);
  42. if($list > 0){
  43. $jobs = $redis->zPopmin("{$this->queueName}_d",$list);
  44. if(is_array($jobs)){
  45. foreach ($jobs as $tempJob => $time){
  46. if($time > $this->lastCheckDelay){
  47. $redis->zAdd("{$this->queueName}_d",$time,$tempJob);
  48. }else{
  49. //插入到队列头
  50. $redis->lPush($this->queueName,$tempJob);
  51. }
  52. }
  53. }
  54. }
  55. },$timeout);
  56. });
  57. }
  58. $job = $this->pool->invoke(function ($redis){
  59. /** @var $redis Redis */
  60. return $redis->lPop($this->queueName);
  61. },$timeout);
  62. if($job){
  63. $job = unserialize($job);
  64. }
  65. if(!$job instanceof Job){
  66. return null;
  67. }
  68. // 需要确认的任务
  69. if($job->getRetryTimes() != 0){
  70. // 到达最大的执行次数
  71. if($job->getRunTimes() >= $job->getRetryTimes()){
  72. $this->pool->invoke(function ($redis)use($job){
  73. /** @var $redis Redis */
  74. $redis->hDel("{$this->queueName}_c",$job->getJobId());
  75. },$timeout);
  76. return null;
  77. }
  78. // 如果不是第一次执行
  79. if($job->getRunTimes() !== 0){
  80. $hashConfirm = $this->pool->invoke(function ($redis)use($job){
  81. /** @var $redis Redis */
  82. return $redis->hGet("{$this->queueName}_c",$job->getJobId());
  83. },$timeout);
  84. // 说明该任务已经被确认删除
  85. if($hashConfirm != 1){
  86. return null;
  87. }
  88. }
  89. // 丢到延迟队列中。
  90. $temp = clone $job;
  91. $temp->setRunTimes($temp->getRunTimes() + 1);
  92. $temp->setDelayTime($temp->getWaitConfirmTime());
  93. $this->push($temp);
  94. // 标记为待确认。
  95. $this->pool->invoke(function ($redis)use($temp){
  96. /** @var $redis Redis */
  97. $redis->hSet("{$this->queueName}_c",$temp->getJobId(),1);
  98. },$timeout);
  99. }
  100. $job->setRunTimes($job->getRunTimes() + 1);
  101. return $job;
  102. }
  103. public function confirm(Job $job,float $timeout = 3.0): bool
  104. {
  105. if($job->getRetryTimes() != 0){
  106. $this->pool->invoke(function ($redis)use($job){
  107. /** @var $redis Redis */
  108. $redis->hDel("{$this->queueName}_c",$job->getJobId());
  109. });
  110. return true;
  111. }else{
  112. return false;
  113. }
  114. }
  115. public function info(): ?array
  116. {
  117. return $this->pool->invoke(function ($redis){
  118. /** @var $redis Redis */
  119. return [
  120. 'runningQueue'=>$redis->lLen($this->queueName),
  121. 'delayQueue'=>$redis->zCard("{$this->queueName}_c")
  122. ];
  123. });
  124. }
  125. }

相关仓库

EasySwoole 中利用 Redis 实现消息队列

如何利用 EasySwoole 多进程多协程 Redis 队列实现爬虫