celery.worker.consumer

celery.worker.consumer

This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop at 0xb08df7c>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, \*kwargs*)[源代码]

  • class Blueprint(steps=None, name=None, app=None, on_start=None, on_close=None, on_stopped=None)

    • default_steps = [‘celery.worker.consumer:Connection’, ‘celery.worker.consumer:Mingle’, ‘celery.worker.consumer:Events’, ‘celery.worker.consumer:Gossip’, ‘celery.worker.consumer:Heart’, ‘celery.worker.consumer:Control’, ‘celery.worker.consumer:Tasks’, ‘celery.worker.consumer:Evloop’, ‘celery.worker.consumer:Agent’]

    • name = ‘Consumer’

    • shutdown(parent)

  • Consumer.Strategies

    dict 的别名

  • Consumer.add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, \*options*)[源代码]

  • Consumer.apply_eta_task(task)[源代码]

    Method called by the timer to apply a task with an ETA/countdown.

  • Consumer.bucket_for_task(type)

  • Consumer.cancel_task_queue(queue)[源代码]

  • Consumer.connect()

    Establish the broker connection.

    Will retry establishing the connection if the BROKER_CONNECTION_RETRY setting is enabled

  • Consumer.create_task_handler()

  • Consumer.in_shutdown = False

    set when consumer is shutting down.

  • Consumer.init_callback = None

    Optional callback called the first time the worker is ready to receive tasks.

  • Consumer.loop_args()

  • Consumer.on_close()

  • Consumer.on_decode_error(message, exc)[源代码]

    Callback called if an error occurs while decoding a message received.

    Simply logs the error and acknowledges the message so it doesn’t enter a loop.

    参数:
    • message – The message with errors.
    • exc – The original exception instance.
  • Consumer.on_invalid_task(body, message, exc)

  • Consumer.on_ready()

  • Consumer.on_unknown_message(body, message)

  • Consumer.on_unknown_task(body, message, exc)

  • Consumer.pool = None

    The current worker pool instance.

  • Consumer.register_with_event_loop(hub)

  • Consumer.reset_rate_limits()

  • Consumer.restart_count = -1

  • Consumer.shutdown()

  • Consumer.start()[源代码]

  • Consumer.stop()[源代码]

  • Consumer.timer = None

    A timer used for high-priority internal tasks, such as sending heartbeats.

  • Consumer.update_strategies()[源代码]

class celery.worker.consumer.Connection(c, \*kwargs*)

  • info(c, params=’N/A’)

  • name = u’celery.worker.consumer.Connection’

  • requires = ()

  • shutdown(c)

  • start(c)

class celery.worker.consumer.Events(c, send_events=None, \*kwargs*)

  • name = u’celery.worker.consumer.Events’

  • requires = (step:celery.worker.consumer.Connection{()},)

  • shutdown(c)

  • start(c)

  • stop(c)

class celery.worker.consumer.Heart(c, without_heartbeat=False, \*kwargs*)

  • name = u’celery.worker.consumer.Heart’

  • requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)

  • shutdown(c)

  • start(c)

  • stop(c)

class celery.worker.consumer.Control(c, \*kwargs*)

  • include_if(c)

  • name = u’celery.worker.consumer.Control’

  • requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)

class celery.worker.consumer.Tasks(c, \*kwargs*)

  • info(c)

  • name = u’celery.worker.consumer.Tasks’

  • requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)

  • shutdown(c)

  • start(c)

  • stop(c)

class celery.worker.consumer.Evloop(parent, \*kwargs*)

  • label = ‘event loop’

  • last = True

  • name = u’celery.worker.consumer.Evloop’

  • patch_all(c)

  • requires = ()

  • start(c)

class celery.worker.consumer.Agent(c, \*kwargs*)

  • conditional = True

  • create(c)

  • name = u’celery.worker.consumer.Agent’

  • requires = (step:celery.worker.consumer.Connection{()},)

class celery.worker.consumer.Mingle(c, without_mingle=False, \*kwargs*)

  • compatible_transport(app)

  • compatible_transports = set([‘redis’, ‘amqp’])

  • label = ‘Mingle’

  • name = u’celery.worker.consumer.Mingle’

  • requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)

  • start(c)

class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, \*kwargs*)

  • call_task(task)

  • compatible_transport(app)

  • compatible_transports = set([‘redis’, ‘amqp’])

  • election(id, topic, action=None)

  • get_consumers(channel)

  • label = ‘Gossip’

  • name = u’celery.worker.consumer.Gossip’

  • on_elect(event)

  • on_elect_ack(event)

  • on_message(prepare, message)

  • on_node_join(worker)

  • on_node_leave(worker)

  • on_node_lost(worker)

  • periodic()

  • register_timer()

  • requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)

  • start(c)

celery.worker.consumer.dump_body(m, body)[源代码]