Queues

Queues are a powerful design pattern that help you deal with common application scaling and performance challenges. Some examples of problems that Queues can help you solve are:

  • Smooth out processing peaks. For example, if users can initiate resource-intensive tasks at arbitrary times, you can add these tasks to a queue instead of performing them synchronously. Then you can have worker processes pull tasks from the queue in a controlled manner. You can easily add new Queue consumers to scale up the back-end task handling as the application scales up.
  • Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
  • Provide a reliable communication channel across various services. For example, you can queue tasks (jobs) in one process or service, and consume them in another. You can be notified (by listening for status events) upon completion, error or other state changes in the job life cycle from any process or service. When Queue producers or consumers fail, their state is preserved and task handling can restart automatically when nodes are restarted.

Nest provides the @nestjs/bull package as an abstraction/wrapper on top of Bull, a popular, well supported, high performance Node.js based Queue system implementation. The package makes it easy to integrate Bull Queues in a Nest-friendly way to your application.

Bull uses Redis to persist job data, so you’ll need to have Redis installed on your system. Because it is Redis-backed, your Queue architecture can be completely distributed and platform-independent. For example, you can have some Queue producers and consumers and listeners running in Nest on one (or several) nodes, and other producers, consumers and listeners running on other Node.js platforms on other network nodes.

This chapter covers the @nestjs/bull package. We also recommend reading the Bull documentation for more background and specific implementation details.

Installation

To begin using it, we first install the required dependencies.

  1. $ npm install --save @nestjs/bull bull
  2. $ npm install --save-dev @types/bull

Once the installation process is complete, we can import the BullModule into the root AppModule.

app.module.ts

  1. import { Module } from '@nestjs/common';
  2. import { BullModule } from '@nestjs/bull';
  3. @Module({
  4. imports: [
  5. BullModule.registerQueue({
  6. name: 'audio',
  7. redis: {
  8. host: 'localhost',
  9. port: 6379,
  10. },
  11. }),
  12. ],
  13. })
  14. export class AppModule {}

The registerQueue() method is used to instantiate and/or register queues. Queues are shared across modules and processes that connect to the same underlying Redis database with the same credentials. Each queue is unique by its name property (see below). When sharing queues (across modules/processes), the first registerQueue() method to run both instantiates the queue and registers it for that module. Other modules (in the same or separate processes) simply register the queue. Queue registration creates an injection token that can be used to access the queue in a given Nest module.

For each queue, pass a configuration object containing the following properties:

  • name: string - A queue name, which will be used as both an injection token (for injecting the queue into controllers/providers), and as an argument to decorators to associate consumer classes and listeners with queues. Required.
  • limiter: RateLimiter - Options to control the rate at which the queue’s jobs are processed. See RateLimiter for more information. Optional.
  • redis: RedisOpts - Options to configure the Redis connection. See RedisOpts for more information. Optional.
  • prefix: string - Prefix for all queue keys. Optional.
  • defaultJobOptions: JobOpts - Options to control the default settings for new jobs. See JobOpts for more information. Optional.
  • settings: AdvancedSettings - Advanced Queue configuration settings. These should usually not be changed. See AdvancedSettings for more information. Optional.

As noted, the name property is required. The rest of the options are optional, providing detailed control over queue behavior. These are passed directly to the Bull Queue constructor. Read more about these options here. When registering a queue in a second or subsequent module, it is best practice to omit all options but the name property from the configuration object. These options should be specified only in the module that instantiates the queue.

Hint Create multiple queues by passing multiple comma-separated configuration objects to the registerQueue() method.

Since jobs are persisted in Redis, each time a specific named queue is instantiated (e.g., when an app is started/restarted), it attempts to process any old jobs that may exist from a previous unfinished session.

Each queue can have one or many producers, consumers, and listeners. Consumers retrieve jobs from the queue in a specific order: FIFO (the default), LIFO, or according to priorities. Controlling queue processing order is discussed here.

Official enterprise support

  • Queues - 图1 Providing technical guidance
  • Queues - 图2 Performing in-depth code reviews
  • Queues - 图3 Mentoring team members
  • Queues - 图4 Advising best practices

Explore more

Producers

Job producers add jobs to queues. Producers are typically application services (Nest providers). To add jobs to a queue, first inject the queue into the service as follows:

  1. import { Injectable } from '@nestjs/common';
  2. import { Queue } from 'bull';
  3. import { InjectQueue } from '@nestjs/bull';
  4. @Injectable()
  5. export class AudioService {
  6. constructor(@InjectQueue('audio') private audioQueue: Queue) {}
  7. }

Hint The @InjectQueue() decorator identifies the queue by its name, as provided in the registerQueue() method call (e.g., 'audio').

Now, add a job by calling the queue’s add() method, passing a user-defined job object. Jobs are represented as serializable JavaScript objects (since that is how they are stored in the Redis database). The shape of the job you pass is arbitrary; use it to represent the semantics of your job object.

  1. const job = await this.audioQueue.add({
  2. foo: 'bar',
  3. });

Named jobs

Jobs may have unique names. This allows you to create specialized consumers that will only process jobs with a given name.

  1. const job = await this.audioQueue.add('transcode', {
  2. foo: 'bar',
  3. });

Warning When using named jobs, you must create processors for each unique name added to a queue, or the queue will complain that you are missing a processor for the given job. See here for more information on consuming named jobs.

Job options

Jobs can have additional options associated with them. Pass an options object after the job argument in the Queue.add() method. Job options properties are:

  • priority: number - Optional priority value. Ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that using priorities has a slight impact on performance, so use them with caution.
  • delay: number - An amount of time (milliseconds) to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized.
  • attempts: number - The total number of attempts to try the job until it completes.
  • repeat: RepeatOpts - Repeat job according to a cron specification. See RepeatOpts.
  • backoff: number | BackoffOpts - Backoff setting for automatic retries if the job fails. See BackoffOpts.
  • lifo: boolean - If true, adds the job to the right end of the queue instead of the left (default false).
  • timeout: number - The number of milliseconds after which the job should fail with a timeout error.
  • jobId: number | string - Override the job ID - by default, the job ID is a unique integer, but you can use this setting to override it. If you use this option, it is up to you to ensure the jobId is unique. If you attempt to add a job with an id that already exists, it will not be added.
  • removeOnComplete: boolean | number - If true, removes the job when it successfully completes. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the completed set.
  • removeOnFail: boolean | number - If true, removes the job when it fails after all attempts. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the failed set.
  • stackTraceLimit: number - Limits the amount of stack trace lines that will be recorded in the stacktrace.

Here are a few examples of customizing jobs with job options.

To delay the start of a job, use the delay configuration property.

  1. const job = await this.audioQueue.add(
  2. {
  3. foo: 'bar',
  4. },
  5. { delay: 3000 }, // 3 seconds delayed
  6. );

To add a job to the right end of the queue (process the job as LIFO (Last In First Out)), set the lifo property of the configuration object to true.

  1. const job = await this.audioQueue.add(
  2. {
  3. foo: 'bar',
  4. },
  5. { lifo: true },
  6. );

To prioritize a job, use the priority property.

  1. const job = await this.audioQueue.add(
  2. {
  3. foo: 'bar',
  4. },
  5. { priority: 2 },
  6. );

Consumers

A consumer is a class defining methods that either process jobs added into the queue, or listen for events on the queue, or both. Declare a consumer class using the @Processor() decorator as follows:

  1. import { Processor } from '@nestjs/bull';
  2. @Processor('audio')
  3. export class AudioConsumer {}

Where the decorator’s string argument (e.g., 'audio') is the name of the queue to be associated with the class methods.

Within a consumer class, declare job handlers by decorating handler methods with the @Process() decorator.

  1. import { Processor, Process } from '@nestjs/bull';
  2. import { Job } from 'bull';
  3. @Processor('audio')
  4. export class AudioConsumer {
  5. @Process()
  6. async transcode(job: Job<unknown>) {
  7. let progress = 0;
  8. for (i = 0; i < 100; i++) {
  9. await doSomething(job.data);
  10. progress += 10;
  11. job.progress(progress);
  12. }
  13. return {};
  14. }
  15. }

The decorated method (e.g., transcode()) is called whenever the worker is idle and there are jobs to process in the queue. This handler method receives the job object as its only argument. The value returned by the handler method is stored in the job object and can be accessed later on, for example in a listener for the completed event.

Job objects have multiple methods that allow you to interact with their state. For example, the above code uses the progress() method to update the job’s progress. See here for the complete Job object API reference.

You can designate that a job handler method will handle only jobs of a certain type (jobs with a specific name) by passing that name to the @Process() decorator as shown below. You can have multiple @Process() handlers in a given consumer class, corresponding to each job type (name). When you use named jobs, be sure to have a handler corresponding to each name.

  1. @Process('transcode')
  2. async transcode(job: Job<unknown>) { ... }

Event listeners

Bull generates a set of useful events when queue and/or job state changes occur. Nest provides a set of decorators that allow subscribing to a core set of standard events. These are exported from the @nestjs/bull package.

Event listeners must be declared within a consumer class (i.e., within a class decorated with the @Processor() decorator). To listen for an event, use one of the decorators in the table below to declare a handler for the event. For example, to listen to the event emitted when a job enters the active state in the audio queue, use the following construct:

  1. import { Processor, Process } from '@nestjs/bull';
  2. import { Job } from 'bull';
  3. @Processor('audio')
  4. export class AudioConsumer {
  5. @OnQueueActive()
  6. onActive(job: Job) {
  7. console.log(
  8. `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
  9. );
  10. }
  11. ...

Since Bull operates in a distributed (multi-node) environment, it defines the concept of event locality. This concept recognizes that events may be triggered either entirely within a single process, or on shared queues from different processes. A local event is one that is produced when an action or state change is triggered on a queue in the local process. In other words, when your event producers and consumers are local to a single process, all events happening on queues are local.

When a queue is shared across multiple processes, we encounter the possibility of global events. For a listener in one process to receive an event notification triggered by another process, it must register for a global event.

Event handlers are invoked whenever their corresponding event is emitted. The handler is called with the signature shown in the table below, providing access to information relevant to the event. We discuss one key difference between local and global event handler signatures below.

Local event listenersGlobal event listenersHandler method signature / When fired
@OnQueueError()@OnGlobalQueueError()handler(error: Error) - An error occurred. error contains the triggering error.
@OnQueueWaiting()@OnGlobalQueueWaiting()handler(jobId: number | string) - A Job is waiting to be processed as soon as a worker is idling. jobId contains the id for the job that has entered this state.
@OnQueueActive()@OnGlobalQueueActive()handler(job: Job) - Job jobhas started.
@OnQueueStalled()@OnGlobalQueueStalled()handler(job: Job) - Job job has been marked as stalled. This is useful for debugging job workers that crash or pause the event loop.
@OnQueueProgress()@OnGlobalQueueProgress()handler(job: Job, progress: number) - Job job‘s progress was updated to value progress.
@OnQueueCompleted()@OnGlobalQueueCompleted()handler(job: Job, result: any) Job job successfully completed with a result result.
@OnQueueFailed()@OnGlobalQueueFailed()handler(job: Job, err: Error) Job job failed with reason err.
@OnQueuePaused()@OnGlobalQueuePaused()handler() The queue has been paused.
@OnQueueResumed()@OnGlobalQueueResumed()handler(job: Job) The queue has been resumed.
@OnQueueCleaned()@OnGlobalQueueCleaned()handler(jobs: Job[], type: string) Old jobs have been cleaned from the queue. jobs is an array of cleaned jobs, and type is the type of jobs cleaned.
@OnQueueDrained()@OnGlobalQueueDrained()handler() Emitted whenever the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed).
@OnQueueRemoved()@OnGlobalQueueRemoved()handler(job: Job) Job job was successfully removed.

When listening for global events, the method signatures can be slightly different from their local counterpart. Specifically, any method signature that receives job objects in the local version, instead receives a jobId (number) in the global version. To get a reference to the actual job object in such a case, use the Queue#getJob method. This call should be awaited, and therefore the handler should be declared async. For example:

  1. @OnGlobalQueueCompleted()
  2. async onGlobalCompleted(jobId: number, result: any) {
  3. const job = await this.immediateQueue.getJob(jobId);
  4. console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
  5. }

Hint To access the Queue object (to make a getJob() call), you must of course inject it. Also, the Queue must be registered in the module where you are injecting it.

In addition to the specific event listener decorators, you can also use the generic @OnQueueEvent() decorator in combination with either BullQueueEvents or BullQueueGlobalEvents enums. Read more about events here.

Queue management

Queue’s have an API that allows you to perform management functions like pausing and resuming, retrieving the count of jobs in various states, and several more. You can find the full queue API here. Invoke any of these methods directly on the Queue object, as shown below with the pause/resume examples.

Pause a queue with the pause() method call. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.

  1. await audioQueue.pause();

To resume a paused queue, use the resume() method, as follows:

  1. await audioQueue.resume();

Async configuration

You may want to pass your queue options asynchronously instead of statically. In this case, use the registerQueueAsync() method, which provides several ways to deal with async configuration.

One approach is to use a factory function:

  1. BullModule.registerQueueAsync({
  2. name: 'audio',
  3. useFactory: () => ({
  4. redis: {
  5. host: 'localhost',
  6. port: 6379,
  7. },
  8. }),
  9. });

Our factory behaves like any other asynchronous provider (e.g., it can be async and it’s able to inject dependencies through inject).

  1. BullModule.registerQueueAsync({
  2. name: 'audio',
  3. imports: [ConfigModule],
  4. useFactory: async (configService: ConfigService) => ({
  5. redis: {
  6. host: configService.get('QUEUE_HOST'),
  7. port: +configService.get('QUEUE_PORT'),
  8. },
  9. }),
  10. inject: [ConfigService],
  11. });

Alternatively, you can use the useClass syntax:

  1. BullModule.registerQueueAsync({
  2. name: 'audio',
  3. useClass: BullConfigService,
  4. });

The construction above will instantiate BullConfigService inside BullModule and use it to provide an options object by calling createBullOptions(). Note that this means that the BullConfigService has to implement the BullOptionsFactory interface, as shown below:

  1. @Injectable()
  2. class BullConfigService implements BullOptionsFactory {
  3. createBullOptions(): BullModuleOptions {
  4. return {
  5. redis: {
  6. host: 'localhost',
  7. port: 6379,
  8. },
  9. };
  10. }
  11. }

In order to prevent the creation of BullConfigService inside BullModule and use a provider imported from a different module, you can use the useExisting syntax.

  1. BullModule.registerQueueAsync({
  2. name: 'audio',
  3. imports: [ConfigModule],
  4. useExisting: ConfigService,
  5. });

This construction works the same as useClass with one critical difference - BullModule will lookup imported modules to reuse an existing ConfigService instead of instantiating a new one.

Example

A working example is available here.