celery.contrib.batches

Experimental task class that buffers messages and processes them as a list.

警告

For this to work you have to set CELERYD_PREFETCH_MULTIPLIER to zero, or some value where the final multiplied value is higher than flush_every.

In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.

Simple Example

A click counter that flushes the buffer every 100 messages, and every seconds. Does not do anything with the data, but can easily be modified to store it in a database.

  1. # Flush after 100 messages, or 10 seconds.
  2. @app.task(base=Batches, flush_every=100, flush_interval=10)
  3. def count_click(requests):
  4. from collections import Counter
  5. count = Counter(request.kwargs['url'] for request in requests)
  6. for url, count in count.items():
  7. print('>>> Clicks: {0} -> {1}'.format(url, count))

Then you can ask for a click to be counted by doing:

  1. >>> count_click.delay('http://example.com')

Example returning results

An interface to the Web of Trust API that flushes the buffer every 100 messages, and every 10 seconds.

  1. import requests
  2. from urlparse import urlparse
  3. from celery.contrib.batches import Batches
  4. wot_api_target = "https://api.mywot.com/0.4/public_link_json"
  5. @app.task(base=Batches, flush_every=100, flush_interval=10)
  6. def wot_api(requests):
  7. sig = lambda url: url
  8. reponses = wot_api_real(
  9. (sig(*request.args, **request.kwargs) for request in requests)
  10. )
  11. # use mark_as_done to manually return response data
  12. for response, request in zip(reponses, requests):
  13. app.backend.mark_as_done(request.id, response)
  14. def wot_api_real(urls):
  15. domains = [urlparse(url).netloc for url in urls]
  16. response = requests.get(
  17. wot_api_target,
  18. params={"hosts": ('/').join(set(domains)) + '/'}
  19. )
  20. return [response.json[domain] for domain in domains]

Using the API is done as follows:

  1. >>> wot_api.delay('http://example.com')

注解

If you don’t have an app instance then use the current app proxy instead:

  1. from celery import current_app
  2. app.backend.mark_as_done(request.id, response)

API

class celery.contrib.batches.Batches[源代码]

  • Strategy(task, app, consumer)[源代码]

  • apply_buffer(requests, args=(), kwargs={})[源代码]

  • flush(requests)[源代码]

  • flush_every = 10

    Maximum number of message in buffer.

  • flush_interval = 30

    Timeout in seconds before buffer is flushed anyway.

  • run(requests)[源代码]

class celery.contrib.batches.SimpleRequest(id, name, args, kwargs, delivery_info, hostname)[源代码]

Pickleable request.

  • args = ()

    positional arguments

  • delivery_info = None

    message delivery information.

  • classmethod from_request(request)[源代码]

  • hostname = None

    worker node name

  • id = None

    task id

  • kwargs = {}

    keyword arguments

  • name = None

    task name