Flask 和 Celery 一起工作

Flask 与 Celery 整合是十分简单,不需要任何插件。一个 Flask 应用需要使用 Celery 的话只需要初始化 Celery 客户端像这样:

  1. from flask import Flask
  2. from celery import Celery
  3. app = Flask(__name__)
  4. app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
  5. app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
  6. celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
  7. celery.conf.update(app.config)

正如你所见,Celery 通过创建一个 Celery 类对象来初始化,传入应用名称以及消息代理的连接 URL,这个 URL 我把它放在 app.config 中的 CELERY_BROKER_URL 的键值。URL 告诉 Celery 代理服务在哪里运行。如果你运行的不是 Redis,或者代理服务运行在一个不同的机器上,相应地你需要改变 URL。

Celery 其它任何配置可以直接用 celery.conf.update() 通过 Flask 的配置直接传递。CELERY_RESULT_BACKEND 选项只有在你必须要 Celery 任务的存储状态和运行结果的时候才是必须的。展示的第一个示例是不需要这个功能的,但是第二个示例是需要的,因此最好从一开始就配置好。

任何你需要作为后台任务的函数需要用 celery.task 装饰器装饰。例如:

  1. @celery.task
  2. def my_background_task(arg1, arg2):
  3. # some long running task here
  4. return result

接着 Flask 应用能够请求这个后台任务的执行,像这样:

  1. task = my_background_task.delay(10, 20)

delay() 方法是强大的 apply_async() 调用的快捷方式。这样相当于使用 apply_async():

  1. task = my_background_task.apply_async(args=[10, 20])

当使用 apply_async(),你可以给 Celery 后台任务如何执行的更详细的说明。一个有用的选项就是要求任务在未来的某一时刻执行。例如,这个调用将安排任务运行在大约一分钟后:

  1. task = my_background_task.apply_async(args=[10, 20], countdown=60)

delay() 和 apply_async() 的返回值是一个表示任务的对象,这个对象可以用于获取任务状态。我将会在本文的后面展示如何获取任务状态等信息,但现在让我们保持简单些,不用担心任务的执行结果。

更多可用的选项请参阅 Celery 文档