队列

简介

Tip:Laravel 现在为你的 Redis 队列 提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 Horizon documentation 文档 了解更多信息。

Laravel 队列为不同的后台队列服务提供统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。

队列配置文件存放在 config/queue.php 文件中。每一种队列驱动的配置都可以在该文件中找到,包括数据库, Beanstalkd, Amazon SQS, Redis,以及同步(本地使用)驱动。其中还包含了一个 null 队列驱动用于那些放弃队列的任务。

连接 Vs. 队列

在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。在你的 config/queue.php 配置文件里,有一个 connections 配置选项。这个选项给 Amazon SQS,Beanstalk,或者 Redis 这样的后端服务定义了一个特有的连接。不管是哪一种,一个给定的连接可能会有多个 「队列」,而 「队列」 可以被认为是不同的栈或者大量的队列任务。

要注意的是,queue 配置文件中每个连接的配置示例中都包含一个 queue 属性。队列任务被发给指定连接的时候会被分发到 queue 属性和指定连接相同的队列中。换句话说,如果你分发任务的时候没有定义分配到哪个队列,那么它就会被放到连接配置中 queue 属性所定义的默认队列中,默认队列由env文件 ‘ QUEUE_DRIVER‘ 参数定义 :

  1. // 这个任务将被分发到默认队列...
  2. Job::dispatch();
  3. // 这个任务将被发送到「emails」队列...
  4. Job::dispatch()->onQueue('emails');

有些应用可能不需要把任务发到不同的队列,而只发到一个简单的队列中就行了。但是把任务推到不同的队列仍然是非常有用的,因为 Laravel 队列处理器允许你定义队列的优先级,所以你能给不同的队列划分不同的优先级或者区分不同任务的不同处理方式了。比如说,如果你把任务推到 high 队列中,你就能让队列处理器优先处理这些任务了:

  1. php artisan queue:work --queue=high,default

驱动的必要设置

Database

为了使用 database 队列驱动,你需要一张数据表来存储任务。运行 queue:table Artisan 命令来创建这张表的迁移文件。当迁移文件创建好后,你就可以使用 migrate 命令来进行迁移:

  1. php artisan queue:table
  2. php artisan migrate

Redis

为了使用 redis 队列驱动,你需要在 config/database.php 配置文件中配置 Redis 的数据库连接。

Redis 集群

如果你的 Redis 队列驱动使用了 Redis 集群,你的队列名必须包含一个 key hash tag 。这是为了确保所有的 Redis 键对于一个队列都被放在同一哈希中。

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => 'default',
  4. 'queue' => '{default}',
  5. 'retry_after' => 90,
  6. ],

阻塞

当使用 Redis 队列时,你可以用 block_for 配置项来具体说明驱动应该在将任务重新放入 Redis 数据库以及处理器轮询之前阻塞多久。

基于你的队列加载来调整这个值比把新任务放入 Redis 数据库轮询要更有效率的多。例如,你可以将这个值设置为 5 来表明这个驱动应该在等待任务可用时阻塞5秒。

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => 'default',
  4. 'queue' => 'default',
  5. 'retry_after' => 90,
  6. 'block_for' => 5,
  7. ],

其它队列驱动的依赖扩展包

在使用列表里的队列服务前,必须安装以下依赖扩展包:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0

创建任务

生成任务类

在你的应用程序中,队列的任务类都默认放在 app/Jobs 目录下。如果这个目录不存在,那当你运行 make:job Artisan 命令时目录就会被自动创建。你可以用以下的 Artisan 命令来生成一个新的队列任务:

  1. php artisan make:job ProcessPodcast

生成的类继承了 Illuminate\Contracts\Queue\ShouldQueue 接口,这意味着这个任务将会被推送到队列中,而不是同步执行。

任务类结构

任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle 方法。我们来看一个示例的任务类。这个示例里,假设我们管理着一个播客发布服务,在发布之前需要处理上传播客文件:

  1. <?php
  2. namespace App\Jobs;
  3. use App\Podcast;
  4. use App\AudioProcessor;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Queue\SerializesModels;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Contracts\Queue\ShouldQueue;
  9. use Illuminate\Foundation\Bus\Dispatchable;
  10. class ProcessPodcast implements ShouldQueue
  11. {
  12. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  13. protected $podcast;
  14. /**
  15. * 新建一个任务实例
  16. *
  17. * @param Podcast $podcast
  18. * @return void
  19. */
  20. public function __construct(Podcast $podcast)
  21. {
  22. $this->podcast = $podcast;
  23. }
  24. /**
  25. * 执行任务
  26. *
  27. * @param AudioProcessor $processor
  28. * @return void
  29. */
  30. public function handle(AudioProcessor $processor)
  31. {
  32. // 处理上传播客...
  33. }
  34. }

注意,在这个例子中,我们在任务类的构造器中直接传递了一个 Eloquent 模型 。因为我们在任务类里引用了 SerializesModels 这个 trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果你的队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性会被序列化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。这整个过程对你的应用程序来说是完全透明的,这样可以避免在序列化完整的 Eloquent 模式实例时所带来的一些问题。

在队列处理任务时,会调用 handle 方法,而这里我们也可以通过 handle 方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。

如果你想完全控制容器如何将依赖对象注入至 handle 方法,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个任务和容器的回调。虽然可以直接在回调中可以调用 handle 方法,但建议应该从 service provider 调用为佳:

  1. use App\Jobs\ProcessPodcast;
  2. $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
  3. return $job->handle($app->make(AudioProcessor::class));
  4. });

注意:像图片内容这种二进制数据,在放入队列任务之前必须使用 base64_encode 方法转换一下。否则,当这项任务放置到队列中时,可能无法正确序列化为 JSON。

任务中间件

任务中间件允许您针对队列任务的执行包装自定义逻辑,从而减少任务本身的样板代码。 例如,考虑下面的 handle 方法,该方法利用了 Laravel 的 Redis 速率限制功能,使得每5秒只允许处理一个任务:

  1. /**
  2. * 执行队列.
  3. *
  4. * @return void
  5. */
  6. public function handle()
  7. {
  8. Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
  9. info('Lock obtained...');
  10. // 处理队列...
  11. }, function () {
  12. // 无法获取锁…
  13. return $this->release(5);
  14. });
  15. }

虽然这段代码有效,但 handle 方法的结构变得有噪声,因为它与 Redis 速率限制逻辑混杂在一起。此外,对于我们要进行速率限制的任何其他任务,必须复制此速率限制逻辑。

和 handle 方法中的速率限制不同,我们可以定义一个处理速率限制的任务中间件。Laravel 没有任务中间件的默认位置,因此欢迎您将应用程序中间件放在应用程序的任何位置。在此示例中,我们将中间件放在 app/Jobs/Middleware 目录中:

  1. <?php
  2. namespace App\Jobs\Middleware;
  3. use Illuminate\Support\Facades\Redis;
  4. class RateLimited
  5. {
  6. /**
  7. * 处理队列中的任务.
  8. *
  9. * @param mixed $job
  10. * @param callable $next
  11. * @return mixed
  12. */
  13. public function handle($job, $next)
  14. {
  15. Redis::throttle('key')
  16. ->block(0)->allow(1)->every(5)
  17. ->then(function () use ($job, $next) {
  18. // 锁定…
  19. $next($job);
  20. }, function () use ($job) {
  21. // 无法获取锁…
  22. $job->release(5);
  23. });
  24. }
  25. }

正如您所看到的,就像 路由中间件 ,任务中间件接收正在处理的任务以及应该调用以继续处理任务的回调。

创建任务中间件后,可以通过从任务的 middleware 方法返回它们来将它们附加到任务。此方法不存在于由 make:job Artisan 命令构建的任务上,因此您需要将其添加到自己的任务类定义中:

  1. use App\Jobs\Middleware\RateLimited;
  2. /**
  3. * 得到任务应该经过的中间人。
  4. *
  5. * @return array
  6. */
  7. public function middleware()
  8. {
  9. return [new RateLimited];
  10. }

分发任务

一旦你写完了你的任务类你就可以使用它自带的 dispatch 方法分发它。传递给 dispatch 方法的参数将会被传递给任务的构造函数:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Jobs\ProcessPodcast;
  4. use Illuminate\Http\Request;
  5. use App\Http\Controllers\Controller;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的播客节目。
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建播客...
  17. ProcessPodcast::dispatch($podcast);
  18. }
  19. }

延迟分发

如果你想延迟你的队列任务的执行,你可以在分发任务的时候使用 delay 方法。例如,让我们详细说明一个十分钟之后才会执行的任务:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Jobs\ProcessPodcast;
  4. use Illuminate\Http\Request;
  5. use App\Http\Controllers\Controller;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的播客节目.
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建播客...
  17. ProcessPodcast::dispatch($podcast)
  18. ->delay(now()->addMinutes(10));
  19. }
  20. }

注意:Amazon SQS 队列服务最大延迟 15 分钟的时间。

同步调度

如果您想立即(同步)执行队列任务,可以使用 dispatchNow 方法。 使用此方法时,队列任务将不会排队,并立即在当前进程中运行:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use Illuminate\Http\Request;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Http\Controllers\Controller;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的播客节目。
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建播客...
  17. ProcessPodcast::dispatchNow($podcast);
  18. }
  19. }

任务链

任务链允许你具体定义一个按序列执行队列任务的列表。一旦序列中的任务失败了,剩余的工作将不会执行。要运行一个任务链,你可以对可分发的任务使用 withChain 方法:

  1. ProcessPodcast::withChain([
  2. new OptimizePodcast,
  3. new ReleasePodcast
  4. ])->dispatch();

注意:使用 $this->delete() 方法删除队列任务不会阻止任务链任务执行。只有当任务链中的任务执行失败时,任务链才会停止执行。

链连接和队列

如果你想定义用于任务链的默认连接和队列,你可以使用 allOnConnectionallOnQueue 方法。 这些方法指定了所需队列的连接和队列 —— 除非队列任务被明确指定给了不同的连接 / 队列:

  1. ProcessPodcast::withChain([
  2. new OptimizePodcast,
  3. new ReleasePodcast
  4. ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义连接 & 队列

分发任务到指定队列

通过将任务分发到不同队列,你可以将你的队列任务「分类」,甚至指定给不同队列分配的任务数量。记住,这不是推送任务到你定义的队列配置文件的不同的连接里,而是一个单一的连接。要指定队列,在分发任务时使用 onQueue 方法:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Jobs\ProcessPodcast;
  4. use Illuminate\Http\Request;
  5. use App\Http\Controllers\Controller;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的播客节目.
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建播客...
  17. ProcessPodcast::dispatch($podcast)->onQueue('processing');
  18. }
  19. }

分发任务到指定连接

如果你在多队列连接中工作,你可以指定将任务分发到哪个连接。要指定连接,在分发任务时使用 onConnection 方法:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Jobs\ProcessPodcast;
  4. use Illuminate\Http\Request;
  5. use App\Http\Controllers\Controller;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新播客节目。
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建播客...
  17. ProcessPodcast::dispatch($podcast)->onConnection('sqs');
  18. }
  19. }

当然,你可以链式调用 onConnectiononQueue 方法来指定连接和队列:

  1. ProcessPodcast::dispatch($podcast)
  2. ->onConnection('sqs')
  3. ->onQueue('processing');

或者,可以将 connection 指定为任务类的属性:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 应该处理任务的队列连接.
  7. *
  8. * @var string
  9. */
  10. public $connection = 'sqs';
  11. }

指定最大任务尝试次数/超时值

最大尝试次数

在一个任务重指定最大尝试次数可以通过 Artisan 命令的 —tries 选项 指定:

  1. php artisan queue:work --tries=3

你可能想通过任务类自身对最大任务尝试次数进行一个更颗粒化的处理。如果最大尝试次数是在任务类中定义的,它将优先于命令行中的值提供:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 任务可以尝试的最大次数。
  7. *
  8. * @var int
  9. */
  10. public $tries = 5;
  11. }

基于时间的尝试

作为另外一个选择来定义任务在失败前会尝试多少次,你可以定义一个任务超时时间。这样的话,在给定的时间范围内,任务可以无限次尝试。要定义一个任务的超时时间,在你的任务类中新增一个 retryUntil 方法:

  1. /**
  2. * 定义任务超时时间
  3. *
  4. * @return \DateTime
  5. */
  6. public function retryUntil()
  7. {
  8. return now()->addSeconds(5);
  9. }

Tip:你也可以在你的队列事件监听器中使用 retryUntil 方法。

超时

注意:timeout 特性对于 PHP 7.1+ 和 pcntl PHP 扩展进行了优化.

同样的,任务执行最大秒数的数值可以通过 Artisan 命令行的 —timeout 选项指定。

  1. php artisan queue:work --timeout=30

然而,你可能也想在任务类自身定义一个超时时间。如果在任务类中指定,优先级将会高于命令行:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 任务可以执行的最大秒数 (超时时间)。
  7. *
  8. * @var int
  9. */
  10. public $timeout = 120;
  11. }

频率限制

注意:这个特性要求你的应用可以使用 Redis 服务器.

如果你的应用使用了 Redis,你可以通过时间或并发限制你的队列任务。当你的队列任务通过同样有速率限制的 API 使用时,这个特性将很有帮助。

例如,使用 throttle 方法,你可以限制一个给定类型的任务每 60 秒只执行 10 次。如果没有获得锁,一般情况下你应该将任务放回队列以使其可以被稍后重试。

  1. Redis::throttle('key')->allow(10)->every(60)->then(function () {
  2. // 任务逻辑...
  3. }, function () {
  4. // 无法获得锁...
  5. return $this->release(10);
  6. });

Tip:在上述的例子里,key 可以是任何你想要限制频率的任务类型的唯一识别字符串。例如,使用构件基于任务类名的 key,或它操作的 Eloquent 模型的 ID。

注意:将受限制的作业释放回队列,仍然会增加工作的总数 attempts

或者,你可以指定一个任务可以同时执行的最大数量。在如下情况时这会很有用处:当一个队列中的任务正在修改资源时,一次只能被一个任务修改。例如,使用 funnel 方法,你可以限制一个给定类型的任务一次只能执行一个处理器:

  1. Redis::funnel('key')->limit(1)->then(function () {
  2. // 任务逻辑...
  3. }, function () {
  4. // 无法获得锁...
  5. return $this->release(10);
  6. });

Tip:当使用频率限制时,任务执行成功的尝试的次数可能会难以确定。所以,将频率限制与 时间限制 组合是很有作用的。

错误处理

如果在任务执行的时候出现异常,任务会被自动释放到队列中以再次尝试。任务将会一直被释放直到达到应用允许的最大重试次数。最大重试的数值由 queue:work Artisan 命令的 —tries 选项定义,或者在任务类中定义。更多执行队列处理器的信息可以 在以下找到

排队闭包

你也可以直接调用闭包,而不是将任务类调度到队列中。这对于需要执行的快速、简单的任务非常有用:

  1. $podcast = App\Podcast::find(1);
  2. dispatch(function () use ($podcast) {
  3. $podcast->publish();
  4. });

将闭包分派给队列时,闭包的代码内容将以加密方式签名,因此无法在传输过程中对其进行修改。

运行队列处理器

Laravel 包含了一个队列处理器以将推送到队列中的任务执行。你可以使用 queue:work Artisan 命令运行处理器。 注意一旦 queue:work 命令开始执行,它会一直运行直到它被手动停止或终端被关闭。

  1. php artisan queue:work

Tip:要使 queue:work 进程一直在后台运行,你应该使用进程管理器比如 Supervisor 来确保队列处理器不会停止运行

记住,队列处理器是一个常驻的进程并且在内存中保存着已经启动的应用状态。因此,它们并不会在启动后注意到你代码的更改。所以,在你的重新部署过程中,请记得 重启你的队列处理器.

指定连接&队列

你也可以具体说明队列处理器应该使用哪个队列连接。 传递给 work 的连接名应该与你的 config/queue.php 配置文件中定义的连接之一相符。

  1. php artisan queue:work redis

你甚至可以自定义你的队列处理器使其只执行连接中指定的队列。例如,如果你的所有邮件都由 redis 连接的 emails 队列处理,你可以使用如下的命令启动一个仅执行此队列的处理器:

  1. php artisan queue:work redis --queue=emails

执行单一任务

—once 选项用于使队列处理器只处理队列中的单一任务。

  1. php artisan queue:work --once

处理所有队列的任务然后退出

—stop-when-empty 选项可用于处理队列处理器处理所有作业然后优雅地退出。如果您希望在队列为空后关闭容器,则在 Docker 容器中运行 Laravel 队列时,此选项很有用:

  1. php artisan queue:work --stop-when-empty

资源注意事项

后台驻留的队列处理器不会在执行完每个任务后「重启」框架。因此,你应该在每个任务完成后释放任何占用过大的资源。例如,如果你正在用 GD 库执行图像处理,你应该在完成后使用 imagedestroy 释放内存。

队列优先级

有时你可能想确定队列执行的优先顺序。例如在 config/queue.php 中你可以将 redis 连接的 queue 队列的优先级从 default 设置为 low。然而, 偶尔你也想像如下方式将一个任务推送到 high 队列:

  1. dispatch((new Job)->onQueue('high'));

要运行一个处理器来确认 low 队列中的任务在全部的 high 队列任务完成后才继续执行,你可以传递一个逗号分隔的队列名列表作为 work 命令的参数。

  1. php artisan queue:work --queue=high,low

队列处理器&部署

因为队列处理器是常驻进程,他们在重启前不会应用你代码的更改。因此,部署使用队列处理器的应用最简单的方法是在部署进程中重启队列处理器。你可以平滑地重启所有队列处理器通过使用 queue:restart 方法:

  1. php artisan queue:restart

这个命令将会引导所有的队列处理器在完成当前任务后平滑「中止」,这样不会有丢失的任务。由于在执行 queue:restart 后队列处理器将会中止,所以你应该运行一个进程管理器例如 Supervisor 来自动重启队列处理器。

Tip:队列使用 缓存 存储重启信号,所以你应该确定在使用这个功能之前配置好缓存驱动。

任务过期&超时

任务过期

在你的 config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。这个选项指定了队列连接在重试一个任务前应该等它执行多久。例如,如果 retry_after 的值设置为 90 ,那么任务在执行了 90 秒后将会被放回队列而不是删除它。一般情况下,你应该将 retry_after 的值设置为你认为你的任务可能会执行需要最长时间的值。

注意:只有在 Amazon SQS 中不存在 retry_after 这个值。 SQS将会以 AWS 控制台配置的 默认可见超时值 作为重试任务的依据。

处理器超时

queue:work Artisan 命令包含一个 —timeout 选项。 —timeout 选项指定了 Laravel 的队列主进程在中止一个执行任务的子进程之前需要等到多久。有时一个子进程可能会因为各种原因「冻结」,比如一个外部的 HTTP 请求失去响应。 —timeout 选项会移除那些超过指定时间被冻结的进程。

  1. php artisan queue:work --timeout=60

retry_after 配置项和 —timeout 命令行配置并不同,但将它们同时使用可以确保任务不会丢失并且任务只会成功执行一次。

注意:—timeout 的值应该比你在 retry_after 中配置的值至少短几秒。这会确保处理器永远会在一个任务被重试之前中止。如果你的 —timeout 值比 retry_after 的值长的话,你的任务可能会被执行两次。

队列进程睡眠时间

当任务在队列中可用时,处理器将会一直无间隔地处理任务。 然而, sleep 选项定义了如果没有新任务的时候处理器将会「睡眠」多长时间。在处理器睡眠时,它不会处理任何新任务 —— 任务将会在队列处理器再次启动后执行。

  1. php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor 是 Linux 操作系统下中的一个进程监控器,它可以在queue:work 挂掉时自动重启之。在 Ubuntu 上安装 Supervisor,你可以使用如下命令:

  1. sudo apt-get install supervisor

{小提醒}如果觉得配置 Supervisor 难于登天,可以考虑使用Laravel Forge,它将自动为你的 Laravel 项目安装和配置Supervisor。

配置 Supervisor

Supervisor 的配置文件通常位于 /etc/supervisor/conf.d 目录下。在该目录中,你可以创建任意数量的配置文件,用来控制 supervisor 将如何监控你的进程。例如,创建一个laravel-worker.conf 文件使之启动和监控一个 queue:work 进程:

  1. [program:laravel-worker]
  2. process_name=%(program_name)s_%(process_num)02d
  3. command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
  4. autostart=true
  5. autorestart=true
  6. user=forge
  7. numprocs=8
  8. redirect_stderr=true
  9. stdout_logfile=/home/forge/app.com/worker.log

在这个例子中,numprocs指令将指定 Supervisor 运行 8 个 queue:work 进程并对其进行监控,如果它们挂掉就自动重启它们。你应该更改command选项中的queue:work sqs部分以表示你所需的队列连接。

启动 Supervisor

配置文件创建完毕后,你就可以使用如下命令更新 Supervisor 配置并启动进程了:

  1. sudo supervisorctl reread
  2. sudo supervisorctl update
  3. sudo supervisorctl start laravel-worker:*

获取关于 Supervisor 的更多信息,可以查阅Supervisor 文档.

处理失败的任务

有时你的队列化任务会执行失败。放平心态,好事多磨。 Laravel 包含了一种方便的方法来指定任务应该尝试的最大次数。如果一个任务已经到达了最大尝试次数,它就会被插入到failed_jobs 数据库表中。要创建 failed_jobs 数据库迁移表,你可以使用 queue:failed-table 命令:

  1. php artisan queue:failed-table
  2. php artisan migrate

然后,当你运行 queue worker,你应该使用queue:work命令中的—tries开关指定应尝试运行任务的最大次数。 如果没有为—tries选项指定值,则将死循环尝试运行任务:

  1. php artisan queue:work redis --tries=3

任务失败后清理

你可以直接在任务类中定义 failed 方法,允许你在任务失败时执行针对于该任务的清理工作。 这是向用户发送警报或恢复任务执行的任何操作的绝佳位置。导致任务失败的Exception将被传递给failed方法:

  1. <?php
  2. namespace App\Jobs;
  3. use Exception;
  4. use App\Podcast;
  5. use App\AudioProcessor;
  6. use Illuminate\Bus\Queueable;
  7. use Illuminate\Queue\SerializesModels;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Contracts\Queue\ShouldQueue;
  10. class ProcessPodcast implements ShouldQueue
  11. {
  12. use InteractsWithQueue, Queueable, SerializesModels;
  13. protected $podcast;
  14. /**
  15. * 创建任务实例
  16. *
  17. * @param Podcast $podcast
  18. * @return void
  19. */
  20. public function __construct(Podcast $podcast)
  21. {
  22. $this->podcast = $podcast;
  23. }
  24. /**
  25. * 执行任务
  26. *
  27. * @param AudioProcessor $processor
  28. * @return void
  29. */
  30. public function handle(AudioProcessor $processor)
  31. {
  32. // 上传播客……
  33. }
  34. /**
  35. * 任务失败的处理过程
  36. *
  37. * @param Exception $exception
  38. * @return void
  39. */
  40. public function failed(Exception $exception)
  41. {
  42. // 给用户发送任务失败的通知,等等……
  43. }
  44. }

任务失败事件

如果你想在任务失败时注册一个可调用的事件,你可以使用Queue::failing 方法。该事件是通过 email 或 Slack 通知你团队的绝佳时机。例如,我们可以在 Laravel 中的 AppServiceProvider 中附加一个回调事件:

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Queue\Events\JobFailed;
  5. use Illuminate\Support\ServiceProvider;
  6. class AppServiceProvider extends ServiceProvider
  7. {
  8. /**
  9. * 启动任意服务。
  10. *
  11. * @return void
  12. */
  13. public function boot()
  14. {
  15. Queue::failing(function (JobFailed $event) {
  16. // $event->connectionName
  17. // $event->job
  18. // $event->exception
  19. });
  20. }
  21. /**
  22. * 注册服务提供者。
  23. *
  24. * @return void
  25. */
  26. public function register()
  27. {
  28. //
  29. }
  30. }

重试失败的任务

要想查看所有被放入 failed_jobs 数据表中的任务,你可以使用 Artisan 命令 queue:failed

  1. php artisan queue:failed

queue:failed 命令会列出任务 ID ,队列,以及失败的时间。任务 ID 可能会被用于重试失败的任务。例如,要重试一个任务 ID 为 5 的任务,使用如下命令:

  1. php artisan queue:retry 5

要重试所有失败的任务,执行 queue:retry 命令,将 all 作为 ID 传入:

  1. php artisan queue:retry all

如果你想删除一个失败的任务,使用 queue:forget 命令:

  1. php artisan queue:forget 5

要清空所有失败的任务,使用 queue:flush 命令:

  1. php artisan queue:flush

忽略缺失的模型

在向任务中注入 Eloquent 模型时,模型被放入队列前将被自动序列化并在执行任务时还原。但是,如果在任务等待执行时删除了模型,任务可能会失败并抛出 ModelNotFoundException

为了方便,你可以选择设置任务的 deleteWhenMissingModels 属性为 true 来自动地删除缺失模型的任务。

  1. /**
  2. * 如果模型缺失即删除任务。
  3. *
  4. * @var bool
  5. */
  6. public $deleteWhenMissingModels = true;

任务事件

通过在 Queue facade中使用 beforeafter 方法,你可以指定一个队列任务被执行前后的回调。这些回调是添加额外的日志或增加统计的绝好时机。通常,你应该在 服务提供者中调用这些方法。例如,我们可以使用 Laravel 的 AppServiceProvider

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobProcessed;
  6. use Illuminate\Queue\Events\JobProcessing;
  7. class AppServiceProvider extends ServiceProvider
  8. {
  9. /**
  10. * 引导启动任意应用服务。
  11. *
  12. * @return void
  13. */
  14. public function boot()
  15. {
  16. Queue::before(function (JobProcessing $event) {
  17. // $event->connectionName
  18. // $event->job
  19. // $event->job->payload()
  20. });
  21. Queue::after(function (JobProcessed $event) {
  22. // $event->connectionName
  23. // $event->job
  24. // $event->job->payload()
  25. });
  26. }
  27. /**
  28. * 注册服务提供者。
  29. *
  30. * @return void
  31. */
  32. public function register()
  33. {
  34. //
  35. }
  36. }

Queue facade 使用 looping 方法可以在处理器尝试获取任务之前执行回调。例如,你也许想用一个闭包来回滚之前失败的任务尚未关闭的事务:

  1. Queue::looping(function () {
  2. while (DB::transactionLevel() > 0) {
  3. DB::rollBack();
  4. }
  5. });

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接 我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。