复杂例子:显示状态更新和结果

上面的示例过于简单,后台作业启动然后应用忘记它。大部分 Celery 针对网页开发的教程就到此为止,但是事实上许多应用程序有必要监控它的后台任务并且获取运行结果。

我现在将要做的就是扩展上面的应用程序成为第二个示例,这个示例展示一个虚构的长时间运行的任务。用户点击按钮启动一个或者更多的长时间运行的任务,在浏览器上的页面使用 ajax 轮询服务器更新所有任务的状态。每一个任务,页面都会显示一个图形的状态栏,进度条,一个状态消息,并且当任务完成的时候,也会显示任务的执行结果。示例的截图在本文的最开始。

状态更新的后台任务

让我向你们展示我在第二个示例中使用的后台任务:

  1. @celery.task(bind=True)
  2. def long_task(self):
  3. """Background task that runs a long function with progress reports."""
  4. verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
  5. adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
  6. noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
  7. message = ''
  8. total = random.randint(10, 50)
  9. for i in range(total):
  10. if not message or random.random() < 0.25:
  11. message = '{0} {1} {2}...'.format(random.choice(verb),
  12. random.choice(adjective),
  13. random.choice(noun))
  14. self.update_state(state='PROGRESS',
  15. meta={'current': i, 'total': total,
  16. 'status': message})
  17. time.sleep(1)
  18. return {'current': 100, 'total': 100, 'status': 'Task completed!',
  19. 'result': 42}

对于这个任务,我在 Celery 装饰器中添加了 bind=True 参数。这个参数告诉 Celery 发送一个 self 参数到我的函数,我能够使用它(self)来记录状态更新。

因为这个任务真没有干什么有用的事情,我决定使用随机的动词,形容词和名词组合的幽默状态信息。你可以在代码上看到我用来生成上述信息的毫无意义的列表。

self.update_state() 调用是 Celery 如何接受这些任务更新。有一些内置的状态,比如 STARTED, SUCCESS 等等,但是 Celery 也支持自定义状态。这里我使用一个叫做 PROGRESS 的自定义状态。连同状态,还有一个附件的元数据,该元数据是 Python 字典形式,包含目前和总的迭代数以及随机生成的状态消息。客户端可以使用这些元素来显示一个漂亮的进度条。每迭代一次休眠一秒,以模拟正在做一些工作。

当循环退出,一个 Python 字典作为函数结果返回。这个字典包含了更新迭代计数器,最后的状态消息和幽默的结果。

上面的 long_task() 函数在一个 Celery worker 进程中运行。下面你能看到启动这个后台作业的 Flask 应用路由:

  1. @app.route('/longtask', methods=['POST'])
  2. def longtask():
  3. task = long_task.apply_async()
  4. return jsonify({}), 202, {'Location': url_for('taskstatus',
  5. task_id=task.id)}

正如你所见,客户端需要发起一个 POST 请求到 /longtask 来掀开这些任务中的一个的序幕。服务器启动任务,并且存储返回值。对于响应我使用状态码 202,这个状态码通常是在 REST APIs 中使用用来表明一个请求正在进行中。我也添加了 Location 头,值为一个客户端用来获取状态信息的 URL。这个 URL 指向另一个叫做 taskstatus 的 Flask 路由,并且有 task.id 作为动态的要素。

从 Flask 应用中访问任务状态

上面提及到 taskstatus 路由负责报告有后台任务提供的状态更新。这里就是这个路由的实现:

  1. @app.route('/status/<task_id>')
  2. def taskstatus(task_id):
  3. task = long_task.AsyncResult(task_id)
  4. if task.state == 'PENDING':
  5. // job did not start yet
  6. response = {
  7. 'state': task.state,
  8. 'current': 0,
  9. 'total': 1,
  10. 'status': 'Pending...'
  11. }
  12. elif task.state != 'FAILURE':
  13. response = {
  14. 'state': task.state,
  15. 'current': task.info.get('current', 0),
  16. 'total': task.info.get('total', 1),
  17. 'status': task.info.get('status', '')
  18. }
  19. if 'result' in task.info:
  20. response['result'] = task.info['result']
  21. else:
  22. # something went wrong in the background job
  23. response = {
  24. 'state': task.state,
  25. 'current': 1,
  26. 'total': 1,
  27. 'status': str(task.info), # this is the exception raised
  28. }
  29. return jsonify(response)

这个路由生成一个 JSON 响应,该响应包含任务的状态以及设置在 update_state() 调用中作为 meta 的参数的所有值,客户端可以使用这些构建一个进度条。遗憾地是这个函数需要检查一些条件,因此代码有些长。为了能够访问任务的数据,我重新创建了任务对象,该对象是 AsyncResult 类的实例,使用了 URL 中给的任务 id。

第一个 if 代码块是当任务还没有开始的时候(PENDING 状态)。在这种情况下暂时没有状态信息,因此我人为地制造了些数据。接下来的 elif 代码块返回后台的任务的状态信息。任务提供的信息可以通过访问 task.info 获得。如果数据中包含键 result ,这就意味着这是最终的结果并且任务已经结束,因此我把这些信息也加到响应中。最后的 else 代码块是任务执行失败的情况,这种情况下 task.info 中会包含异常的信息。

不管你是否相信,服务器所有要做的事情已经完成了。剩下的部分就是需要客户端需要实现的,在这里也就是用 JavaScript 脚本的网页来实现。

客户端的 Javascript

这一部分就不是本文的重点,如果你有兴趣的话,可以自己研究研究。

对于图形进度条我使用 nanobar.js,我从 CDN 上引用它。同样还需要引入 jQuery,它能够简化 ajax 的调用。

  1. <script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
  2. <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

启动连接后台作业的按钮的 Javascript 处理程序如下:

  1. function start_long_task() {
  2. // add task status elements
  3. div = $('0%...&nbsp;<hr>');
  4. $('#progress').append(div);
  5. // create a progress bar
  6. var nanobar = new Nanobar({
  7. bg: '#44f',
  8. target: div[0].childNodes[0]
  9. });
  10. // send ajax POST request to start background job
  11. $.ajax({
  12. type: 'POST',
  13. url: '/longtask',
  14. success: function(data, status, request) {
  15. status_url = request.getResponseHeader('Location');
  16. update_progress(status_url, nanobar, div[0]);
  17. },
  18. error: function() {
  19. alert('Unexpected error');
  20. }
  21. });
  22. }

div 的代码:

  1. <-- Progress bar
  2. 0% <-- Percentage
  3. ... <-- Status message
  4. &nbsp; <-- Result
  5. <hr>

最后 Javascript 的 update_progress 函数代码如下:

  1. function update_progress(status_url, nanobar, status_div) {
  2. // send GET request to status URL
  3. $.getJSON(status_url, function(data) {
  4. // update UI
  5. percent = parseInt(data['current'] * 100 / data['total']);
  6. nanobar.go(percent);
  7. $(status_div.childNodes[1]).text(percent + '%');
  8. $(status_div.childNodes[2]).text(data['status']);
  9. if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
  10. if ('result' in data) {
  11. // show result
  12. $(status_div.childNodes[3]).text('Result: ' + data['result']);
  13. }
  14. else {
  15. // something unexpected happened
  16. $(status_div.childNodes[3]).text('Result: ' + data['state']);
  17. }
  18. }
  19. else {
  20. // rerun in 2 seconds
  21. setTimeout(function() {
  22. update_progress(status_url, nanobar, status_div);
  23. }, 2000);
  24. }
  25. });
  26. }

这一部分的代码就不一一解释了。