Messenger: Sync & Queued Message Handling

Messenger: Sync & Queued Message Handling

Messenger provides a message bus with the ability to send messages and then handle them immediately in your application or send them through transports (e.g. queues) to be handled later. To learn more deeply about it, read the Messenger component docs.

Installation

In applications using Symfony Flex, run this command to install messenger:

  1. $ composer require symfony/messenger

Creating a Message & Handler

Messenger centers around two different classes that you’ll create: (1) a message class that holds data and (2) a handler(s) class that will be called when that message is dispatched. The handler class will read the message class and perform some task.

There are no specific requirements for a message class, except that it can be serialized:

  1. // src/Message/SmsNotification.php
  2. namespace App\Message;
  3. class SmsNotification
  4. {
  5. private $content;
  6. public function __construct(string $content)
  7. {
  8. $this->content = $content;
  9. }
  10. public function getContent(): string
  11. {
  12. return $this->content;
  13. }
  14. }

A message handler is a PHP callable, the recommended way to create it is to create a class that implements Symfony\Component\Messenger\Handler\MessageHandlerInterface and has an `__invoke() method that’s type-hinted with the message class (or a message interface):

  1. // src/MessageHandler/SmsNotificationHandler.php
  2. namespace App\MessageHandler;
  3. use App\Message\SmsNotification;
  4. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  5. class SmsNotificationHandler implements MessageHandlerInterface
  6. {
  7. public function __invoke(SmsNotification $message)
  8. {
  9. // ... do some work - like sending an SMS message!
  10. }
  11. }

Thanks to autoconfiguration and the SmsNotification type-hint, Symfony knows that this handler should be called when an SmsNotification message is dispatched. Most of the time, this is all you need to do. But you can also manually configure message handlers. To see all the configured handlers, run:

  1. $ php bin/console debug:messenger

Dispatching the Message

You’re ready! To dispatch the message (and call the handler), inject the messenger.default_bus service (via the MessageBusInterface), like in a controller:

  1. // src/Controller/DefaultController.php
  2. namespace App\Controller;
  3. use App\Message\SmsNotification;
  4. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  5. use Symfony\Component\Messenger\MessageBusInterface;
  6. class DefaultController extends AbstractController
  7. {
  8. public function index(MessageBusInterface $bus)
  9. {
  10. // will cause the SmsNotificationHandler to be called
  11. $bus->dispatch(new SmsNotification('Look! I created a message!'));
  12. // or use the shortcut
  13. $this->dispatchMessage(new SmsNotification('Look! I created a message!'));
  14. // ...
  15. }
  16. }

Transports: Async/Queued Messages

By default, messages are handled as soon as they are dispatched. If you want to handle a message asynchronously, you can configure a transport. A transport is capable of sending messages (e.g. to a queueing system) and then receiving them via a worker. Messenger supports multiple transports.

Note

If you want to use a transport that’s not supported, check out the Enqueue’s transport, which supports things like Kafka and Google Pub/Sub.

A transport is registered using a “DSN”. Thanks to Messenger’s Flex recipe, your .env file already has a few examples.

  1. # MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
  2. # MESSENGER_TRANSPORT_DSN=doctrine://default
  3. # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Uncomment whichever transport you want (or set it in .env.local). See Transport Configuration for more details.

Next, in config/packages/messenger.yaml, let’s define a transport called async that uses this configuration:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async: "%env(MESSENGER_TRANSPORT_DSN)%"
    6. # or expanded to configure more options
    7. #async:
    8. # dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
    9. # options: []
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
    13. <!-- or expanded to configure more options -->
    14. <framework:transport name="async"
    15. dsn="%env(MESSENGER_TRANSPORT_DSN)%"
    16. >
    17. <option key="...">...</option>
    18. </framework:transport>
    19. </framework:messenger>
    20. </framework:config>
    21. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $framework->messenger()
    5. ->transport('async')
    6. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    7. ;
    8. $framework->messenger()
    9. ->transport('async')
    10. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    11. ->options([])
    12. ;
    13. };

Routing Messages to a Transport

Now that you have a transport configured, instead of handling a message immediately, you can configure them to be sent to a transport:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async: "%env(MESSENGER_TRANSPORT_DSN)%"
    6. routing:
    7. # async is whatever name you gave your transport above
    8. 'App\Message\SmsNotification': async
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:routing message-class="App\Message\SmsNotification">
    13. <!-- async is whatever name you gave your transport above -->
    14. <framework:sender service="async"/>
    15. </framework:routing>
    16. </framework:messenger>
    17. </framework:config>
    18. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $framework->messenger()
    5. // async is whatever name you gave your transport above
    6. ->routing('App\Message\SmsNotification')->senders(['async'])
    7. ;
    8. };

Thanks to this, the App\Message\SmsNotification will be sent to the async transport and its handler(s) will not be called immediately. Any messages not matched under routing will still be handled immediately.

You can also route classes by their parent class or interface. Or send messages to multiple transports:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. routing:
    5. # route all messages that extend this example base class or interface
    6. 'App\Message\AbstractAsyncMessage': async
    7. 'App\Message\AsyncMessageInterface': async
    8. 'My\Message\ToBeSentToTwoSenders': [async, audit]
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <!-- route all messages that extend this example base class or interface -->
    13. <framework:routing message-class="App\Message\AbstractAsyncMessage">
    14. <framework:sender service="async"/>
    15. </framework:routing>
    16. <framework:routing message-class="App\Message\AsyncMessageInterface">
    17. <framework:sender service="async"/>
    18. </framework:routing>
    19. <framework:routing message-class="My\Message\ToBeSentToTwoSenders">
    20. <framework:sender service="async"/>
    21. <framework:sender service="audit"/>
    22. </framework:routing>
    23. </framework:messenger>
    24. </framework:config>
    25. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. // route all messages that extend this example base class or interface
    6. $messenger->routing('App\Message\AbstractAsyncMessage')->senders(['async']);
    7. $messenger->routing('App\Message\AsyncMessageInterface')->senders(['async']);
    8. $messenger->routing('My\Message\ToBeSentToTwoSenders')->senders(['async', 'audit']);
    9. };

Note

If you configure routing for both a child and parent class, both rules are used. E.g. if you have an SmsNotification object that extends from Notification, both the routing for Notification and SmsNotification will be used.

Doctrine Entities in Messages

If you need to pass a Doctrine entity in a message, it’s better to pass the entity’s primary key (or whatever relevant information the handler actually needs, like email, etc) instead of the object:

  1. // src/Message/NewUserWelcomeEmail.php
  2. namespace App\Message;
  3. class NewUserWelcomeEmail
  4. {
  5. private $userId;
  6. public function __construct(int $userId)
  7. {
  8. $this->userId = $userId;
  9. }
  10. public function getUserId(): int
  11. {
  12. return $this->userId;
  13. }
  14. }

Then, in your handler, you can query for a fresh object:

  1. // src/MessageHandler/NewUserWelcomeEmailHandler.php
  2. namespace App\MessageHandler;
  3. use App\Message\NewUserWelcomeEmail;
  4. use App\Repository\UserRepository;
  5. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  6. class NewUserWelcomeEmailHandler implements MessageHandlerInterface
  7. {
  8. private $userRepository;
  9. public function __construct(UserRepository $userRepository)
  10. {
  11. $this->userRepository = $userRepository;
  12. }
  13. public function __invoke(NewUserWelcomeEmail $welcomeEmail)
  14. {
  15. $user = $this->userRepository->find($welcomeEmail->getUserId());
  16. // ... send an email!
  17. }
  18. }

This guarantees the entity contains fresh data.

Handling Messages Synchronously

If a message doesn’t match any routing rules, it won’t be sent to any transport and will be handled immediately. In some cases (like when binding handlers to different transports), it’s easier or more flexible to handle this explicitly: by creating a sync transport and “sending” messages there to be handled immediately:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. # ... other transports
    6. sync: 'sync://'
    7. routing:
    8. App\Message\SmsNotification: sync
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <!-- ... other transports -->
    13. <framework:transport name="sync" dsn="sync://"/>
    14. <framework:routing message-class="App\Message\SmsNotification">
    15. <framework:sender service="sync"/>
    16. </framework:routing>
    17. </framework:messenger>
    18. </framework:config>
    19. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. // ... other transports
    6. $messenger->transport('sync')->dsn('sync://');
    7. $messenger->routing('App\Message\SmsNotification')->senders(['sync']);
    8. };

Creating your Own Transport

You can also create your own transport if you need to send or receive messages from something that is not supported. See How to Create Your own Messenger Transport.

Consuming Messages (Running the Worker)

Once your messages have been routed, in most cases, you’ll need to “consume” them. You can do this with the messenger:consume command:

  1. $ php bin/console messenger:consume async
  2. # use -vv to see details about what's happening
  3. $ php bin/console messenger:consume async -vv

The first argument is the receiver’s name (or service id if you routed to a custom service). By default, the command will run forever: looking for new messages on your transport and handling them. This command is called your “worker”.

Deploying to Production

On production, there are a few important things to think about:

Use Supervisor to keep your worker(s) running

You’ll want one or more “workers” running at all times. To do that, use a process control system like Supervisor.

Don’t Let Workers Run Forever

Some services (like Doctrine’s EntityManager) will consume more memory over time. So, instead of allowing your worker to run forever, use a flag like messenger:consume --limit=10 to tell your worker to only handle 10 messages before exiting (then Supervisor will create a new process). There are also other options like --memory-limit=128M and --time-limit=3600.

Restart Workers on Deploy

Each time you deploy, you’ll need to restart all your worker processes so that they see the newly deployed code. To do this, run messenger:stop-workers on deploy. This will signal to each worker that it should finish the message it’s currently handling and shut down gracefully. Then, Supervisor will create new worker processes. The command uses the app cache internally - so make sure this is configured to use an adapter you like.

Use the Same Cache Between Deploys

If your deploy strategy involves the creation of new target directories, you should set a value for the cache.prefix.seed configuration option in order to use the same cache namespace between deployments. Otherwise, the cache.app pool will use the value of the kernel.project_dir parameter as base for the namespace, which will lead to different namespaces each time a new deployment is made.

Prioritized Transports

Sometimes certain types of messages should have a higher priority and be handled before others. To make this possible, you can create multiple transports and route different messages to them. For example:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async_priority_high:
    6. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    7. options:
    8. # queue_name is specific to the doctrine transport
    9. queue_name: high
    10. # for AMQP send to a separate exchange then queue
    11. #exchange:
    12. # name: high
    13. #queues:
    14. # messages_high: ~
    15. # or redis try "group"
    16. async_priority_low:
    17. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    18. options:
    19. queue_name: low
    20. routing:
    21. 'App\Message\SmsNotification': async_priority_low
    22. 'App\Message\NewUserWelcomeEmail': async_priority_high
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
    13. <framework:options>
    14. <framework:queue>
    15. <framework:name>Queue</framework:name>
    16. </framework:queue>
    17. </framework:options>
    18. </framework:transport>
    19. <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
    20. <option key="queue_name">low</option>
    21. </framework:transport>
    22. <framework:routing message-class="App\Message\SmsNotification">
    23. <framework:sender service="async_priority_low"/>
    24. </framework:routing>
    25. <framework:routing message-class="App\Message\NewUserWelcomeEmail">
    26. <framework:sender service="async_priority_high"/>
    27. </framework:routing>
    28. </framework:messenger>
    29. </framework:config>
    30. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->transport('async_priority_high')
    6. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    7. ->options(['queue_name' => 'high']);
    8. $messenger->transport('async_priority_low')
    9. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    10. ->options(['queue_name' => 'low']);
    11. $messenger->routing('App\Message\SmsNotification')->senders(['async_priority_low']);
    12. $messenger->routing('App\Message\NewUserWelcomeEmail')->senders(['async_priority_high']);
    13. };

You can then run individual workers for each transport or instruct one worker to handle messages in a priority order:

  1. $ php bin/console messenger:consume async_priority_high async_priority_low

The worker will always first look for messages waiting on async_priority_high. If there are none, then it will consume messages from async_priority_low.

Limit Consuming to Specific Queues

Some transports (notably AMQP) have the concept of exchanges and queues. A Symfony transport is always bound to an exchange. By default, the worker consumes from all queues attached to the exchange of the specified transport. However, there are use cases to want a worker to only consume from specific queues.

You can limit the worker to only process messages from specific queues:

  1. $ php bin/console messenger:consume my_transport --queues=fasttrack

To allow using the queues option, the receiver must implement the Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface.

New in version 5.3: Limiting the worker to specific queues was introduced in Symfony 5.3.

Supervisor Configuration

Supervisor is a great tool to guarantee that your worker process(es) is always running (even if it closes due to failure, hitting a message limit or thanks to messenger:stop-workers). You can install it on Ubuntu, for example, via:

  1. $ sudo apt-get install supervisor

Supervisor configuration files typically live in a /etc/supervisor/conf.d directory. For example, you can create a new messenger-worker.conf file there to make sure that 2 instances of messenger:consume are running at all times:

  1. ;/etc/supervisor/conf.d/messenger-worker.conf
  2. [program:messenger-consume]
  3. command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
  4. user=ubuntu
  5. numprocs=2
  6. startsecs=0
  7. autostart=true
  8. autorestart=true
  9. process_name=%(program_name)s_%(process_num)02d

Change the async argument to use the name of your transport (or transports) and user to the Unix user on your server.

If you use the Redis Transport, note that each worker needs a unique consumer name to avoid the same message being handled by multiple workers. One way to achieve this is to set an environment variable in the Supervisor configuration file, which you can then refer to in messenger.yaml (see Redis section above):

  1. environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Next, tell Supervisor to read your config and start your workers:

  1. $ sudo supervisorctl reread
  2. $ sudo supervisorctl update
  3. $ sudo supervisorctl start messenger-consume:*

See the Supervisor docs for more details.

Graceful Shutdown

If you install the PCNTL PHP extension in your project, workers will handle the SIGTERM POSIX signal to finish processing their current message before exiting.

In some cases the SIGTERM signal is sent by Supervisor itself (e.g. stopping a Docker container having Supervisor as its entrypoint). In these cases you need to add a stopwaitsecs key to the program configuration (with a value of the desired grace period in seconds) in order to perform a graceful shutdown:

  1. [program:x]
  2. stopwaitsecs=20

Retries & Failures

If an exception is thrown while consuming a message from a transport it will automatically be re-sent to the transport to be tried again. By default, a message will be retried 3 times before being discarded or sent to the failure transport. Each retry will also be delayed, in case the failure was due to a temporary issue. All of this is configurable for each transport:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async_priority_high:
    6. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    7. # default configuration
    8. retry_strategy:
    9. max_retries: 3
    10. # milliseconds delay
    11. delay: 1000
    12. # causes the delay to be higher before each retry
    13. # e.g. 1 second delay, 2 seconds, 4 seconds
    14. multiplier: 2
    15. max_delay: 0
    16. # override all of this with a service that
    17. # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
    18. # service: null
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority">
    13. <framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0"/>
    14. </framework:transport>
    15. </framework:messenger>
    16. </framework:config>
    17. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->transport('async_priority_high')
    6. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    7. // default configuration
    8. ->retryStrategy()
    9. ->maxRetries(3)
    10. // milliseconds delay
    11. ->delay(1000)
    12. // causes the delay to be higher before each retry
    13. // e.g. 1 second delay, 2 seconds, 4 seconds
    14. ->multiplier(2)
    15. ->maxDelay(0)
    16. // override all of this with a service that
    17. // implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
    18. ->service(null)
    19. ;
    20. };

Tip

Symfony triggers a Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent when a message is retried so you can run your own logic.

New in version 5.2: The WorkerMessageRetriedEvent class was introduced in Symfony 5.2.

Avoiding Retrying

Sometimes handling a message might fail in a way that you know is permanent and should not be retried. If you throw Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException, the message will not be retried.

Forcing Retrying

New in version 5.1: The RecoverableMessageHandlingException was introduced in Symfony 5.1.

Sometimes handling a message must fail in a way that you know is temporary and must be retried. If you throw Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException, the message will always be retried.

Saving & Retrying Failed Messages

If a message fails it is retried multiple times (max_retries) and then will be discarded. To avoid this happening, you can instead configure a failure_transport:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. # after retrying, messages will be sent to the "failed" transport
    5. failure_transport: failed
    6. transports:
    7. # ... other transports
    8. failed: 'doctrine://default?queue_name=failed'
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <!-- after retrying, messages will be sent to the "failed" transport -->
    12. <framework:messenger failure-transport="failed">
    13. <!-- ... other transports -->
    14. <framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/>
    15. </framework:messenger>
    16. </framework:config>
    17. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. // after retrying, messages will be sent to the "failed" transport
    6. $messenger->failureTransport('failed');
    7. // ... other transports
    8. $messenger->transport('failed')
    9. ->dsn('doctrine://default?queue_name=failed');
    10. };

In this example, if handling a message fails 3 times (default max_retries), it will then be sent to the failed transport. While you can use messenger:consume failed to consume this like a normal transport, you’ll usually want to manually view the messages in the failure transport and choose to retry them:

  1. # see all messages in the failure transport
  2. $ php bin/console messenger:failed:show
  3. # see details about a specific failure
  4. $ php bin/console messenger:failed:show 20 -vv
  5. # view and retry messages one-by-one
  6. $ php bin/console messenger:failed:retry -vv
  7. # retry specific messages
  8. $ php bin/console messenger:failed:retry 20 30 --force
  9. # remove a message without retrying it
  10. $ php bin/console messenger:failed:remove 20
  11. # remove messages without retrying them and show each message before removing it
  12. $ php bin/console messenger:failed:remove 20 30 --show-messages

New in version 5.1: The --show-messages option was introduced in Symfony 5.1.

If the message fails again, it will be re-sent back to the failure transport due to the normal retry rules. Once the max retry has been hit, the message will be discarded permanently.

Multiple Failed Transports

New in version 5.3: The possibility to use multiple failed transports was introduced in Symfony 5.3.

Sometimes it is not enough to have a single, global failed transport configured because some messages are more important than others. In those cases, you can override the failure transport for only specific transports:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. # after retrying, messages will be sent to the "failed" transport
    5. # by default if no "failed_transport" is configured inside a transport
    6. failure_transport: failed_default
    7. transports:
    8. async_priority_high:
    9. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    10. failure_transport: failed_high_priority
    11. # since no failed transport is configured, the one used will be
    12. # the global "failure_transport" set
    13. async_priority_low:
    14. dsn: 'doctrine://default?queue_name=async_priority_low'
    15. failed_default: 'doctrine://default?queue_name=failed_default'
    16. failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <!-- after retrying, messages will be sent to the "failed" transport
    12. by default if no "failed-transport" is configured inside a transport -->
    13. <framework:messenger failure-transport="failed_default">
    14. <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%" failure-transport="failed_high_priority"/>
    15. <!-- since no "failed_transport" is configured, the one used will be
    16. the global "failed_transport" set -->
    17. <framework:transport name="async_priority_low" dsn="doctrine://default?queue_name=async_priority_low"/>
    18. <framework:transport name="failed_default" dsn="doctrine://default?queue_name=failed_default"/>
    19. <framework:transport name="failed_high_priority" dsn="doctrine://default?queue_name=failed_high_priority"/>
    20. </framework:messenger>
    21. </framework:config>
    22. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. // after retrying, messages will be sent to the "failed" transport
    6. // by default if no "failure_transport" is configured inside a transport
    7. $messenger->failureTransport('failed_default');
    8. $messenger->transport('async_priority_high')
    9. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    10. ->failureTransport('failed_high_priority');
    11. // since no failed transport is configured, the one used will be
    12. // the global failure_transport set
    13. $messenger->transport('async_priority_low')
    14. ->dsn('doctrine://default?queue_name=async_priority_low');
    15. $messenger->transport('failed_default')
    16. ->dsn('doctrine://default?queue_name=failed_default');
    17. $messenger->transport('failed_high_priority')
    18. ->dsn('doctrine://default?queue_name=failed_high_priority');
    19. };

If there is no failure_transport defined globally or on the transport level, the messages will be discarded after the number of retries.

The failed commands have an optional option --transport to specify the failure_transport configured at the transport level.

  1. # see all messages in "failure_transport" transport
  2. $ php bin/console messenger:failed:show --transport=failure_transport
  3. # retry specific messages from "failure_transport"
  4. $ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
  5. # remove a message without retrying it from "failure_transport"
  6. $ php bin/console messenger:failed:remove 20 --transport=failure_transport

Transport Configuration

Messenger supports a number of different transport types, each with their own options. Options can be passed to the transport via a DSN string or configuration.

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. my_transport:
    6. dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
    7. options:
    8. auto_setup: false
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="my_transport" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
    13. <framework:options auto-setup="false"/>
    14. </framework:transport>
    15. </framework:messenger>
    16. </framework:config>
    17. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->transport('my_transport')
    6. ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
    7. ->options(['auto_setup' => false]);
    8. };

Options defined under options take precedence over ones defined in the DSN.

AMQP Transport

The AMQP transport uses the AMQP PHP extension to send messages to queues like RabbitMQ.

New in version 5.1: Starting from Symfony 5.1, the AMQP transport has moved to a separate package. Install it by running:

  1. $ composer require symfony/amqp-messenger

The AMQP transport DSN may looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
  3. # or use the AMQPS protocol
  4. MESSENGER_TRANSPORT_DSN=amqps://guest:[email protected]/%2f/messages

New in version 5.2: The AMQPS protocol support was introduced in Symfony 5.2.

If you want to use TLS/SSL encrypted AMQP, you must also provide a CA certificate. Define the certificate path in the amqp.cacert PHP.ini setting (e.g. amqp.cacert = /etc/ssl/certs) or in the cacert parameter of the DSN (e.g amqps://localhost?cacert=/etc/ssl/certs/).

The default port used by TLS/SSL encrypted AMQP is 5671, but you can overwrite it in the port parameter of the DSN (e.g. amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

By default, the transport will automatically create any exchanges, queues and binding keys that are needed. That can be disabled, but some functionality may not work correctly (like delayed queues).

Note

With Symfony 5.3 or newer, you can limit the consumer of an AMQP transport to only process messages from some queues of an exchange. See Limit Consuming to Specific Queues.

The transport has a number of other options, including ways to configure the exchange, queues binding keys and more. See the documentation on Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection.

The transport has a number of options:

OptionDescriptionDefault
autosetupWhether the table should be created automatically during send / get.true
cacertPath to the CA cert file in PEM format. 
certPath to the client certificate in PEM format. 
channel_maxSpecifies highest channel number that the server permits. 0 means standard extension limit 
confirm_timeoutTimeout in seconds for confirmation; if none specified, transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. 
connect_timeoutConnection timeout. Note: 0 or greater seconds. May be fractional. 
frame_maxThe largest frame size that the server proposes for the connection, including frame header and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit) 
heartbeatThe delay, in seconds, of the connection heartbeat that the server wants. 0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support, which means heartbeats checked only during blocking calls. 
hostHostname of the AMQP service 
keyPath to the client key in PEM format. 
passwordPassword to use to connect to the AMQP service 
persistent ‘false’
portPort of the AMQP service 
prefetch_count  
read_timeoutTimeout in for income activity. Note: 0 or greater seconds. May be fractional. 
retry  
sasl_method  
userUsername to use to connect the AMQP service 
verifyEnable or disable peer verification. If peer verification is enabled then the common name in the server certificate must match the server name. Peer verification is enabled by default. 
vhostVirtual Host to use with the AMQP service 
write_timeoutTimeout in for outcome activity. Note: 0 or greater seconds. May be fractional. 
delay[queue_name_pattern]Pattern to use to create the queuesdelay%exchangename%%routingkey%%delay%
delay[exchange_name]Name of the exchange to be used for the delayed/retried messagesdelays
queues[name][arguments]Extra arguments 
queues[name][binding_arguments]Arguments to be used while binding the queue. 
queues[name][binding_keys]The binding keys (if any) to bind to this queue 
queues[name][flags]Queue flagsAMQP_DURABLE
exchange[arguments]Extra arguments for the exchange (e.g. alternate-exchange) 
exchange[default_publish_routing_key]Routing key to use when publishing, if none is specified on the message 
exchange[flags]Exchange flagsAMQP_DURABLE
exchange[name]Name of the exchange 
exchange[type]Type of exchangefanout

New in version 5.2: The confirm_timeout option was introduced in Symfony 5.2.

Deprecated since version 5.3: The prefetch_count option was deprecated in Symfony 5.3 because it has no effect on the AMQP Messenger transport.

You can also configure AMQP-specific settings on your message by adding Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp to your Envelope:

  1. use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
  2. // ...
  3. $attributes = [];
  4. $bus->dispatch(new SmsNotification(), [
  5. new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
  6. ]);

Caution

The consumers do not show up in an admin panel as this transport does not rely on \AmqpQueue::consume() which is blocking. Having a blocking receiver makes the—time-limit/—memory-limitoptions of themessenger:consumecommand as well as themessenger:stop-workers` command inefficient, as they all rely on the fact that the receiver returns immediately no matter if it finds a message or not. The consume worker is responsible for iterating until it receives a message to handle and/or until one of the stop conditions is reached. Thus, the worker’s stop logic cannot be reached if it is stuck in a blocking call.

Doctrine Transport

The Doctrine transport can be used to store messages in a database table.

New in version 5.1: Starting from Symfony 5.1, the Doctrine transport has moved to a separate package. Install it by running:

  1. $ composer require symfony/doctrine-messenger

The Doctrine transport DSN may looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=doctrine://default

The format is doctrine://<connection_name>, in case you have multiple connections and want to use one other than the “default”. The transport will automatically create a table named messenger_messages.

New in version 5.1: The ability to automatically generate a migration for the messenger_messages table was introduced in Symfony 5.1 and DoctrineBundle 2.1.

Or, to create the table yourself, set the auto_setup option to false and generate a migration.

Caution

The datetime property of the messages stored in the database uses the timezone of the current system. This may cause issues if multiple machines with different timezone configuration use the same storage.

The transport has a number of options:

OptionDescriptionDefault
table_nameName of the tablemessenger_messages
queue_nameName of the queue (a column in the table, to use one table for multiple transports)default
redeliver_timeoutTimeout before retrying a message that’s in the queue but in the “handling” state (if a worker stopped for some reason, this will occur, eventually you should retry the message) - in seconds.3600
auto_setupWhether the table should be created automatically during send / get.true

New in version 5.1: The ability to leverage PostgreSQL’s LISTEN/NOTIFY was introduced in Symfony 5.1.

When using PostgreSQL, you have access to the following options to leverage the LISTEN/NOTIFY feature. This allow for a more performant approach than the default polling behavior of the Doctrine transport because PostgreSQL will directly notify the workers when a new message is inserted in the table.

OptionDescriptionDefault
use_notifyWhether to use LISTEN/NOTIFY.true
check_delayed_intervalThe interval to check for delayed messages, in milliseconds. Set to 0 to disable checks.1000
get_notify_timeoutThe length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds.0

Beanstalkd Transport

New in version 5.2: The Beanstalkd transport was introduced in Symfony 5.2.

The Beanstalkd transport sends messages directly to a Beanstalkd work queue. Install it by running:

  1. $ composer require symfony/beanstalkd-messenger

The Beanstalkd transport DSN may looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120
  3. # If no port, it will default to 11300
  4. MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

The transport has a number of options:

OptionDescriptionDefault
tube_nameName of the queuedefault
timeoutMessage reservation timeout - in seconds.0 (will cause the server to immediately return either a response or a TransportException will be thrown)
ttrThe message time to run before it is put back in the ready queue - in seconds.90

Redis Transport

The Redis transport uses streams to queue messages. This transport requires the Redis PHP extension (>=4.3) and a running Redis server (^5.0).

New in version 5.1: Starting from Symfony 5.1, the Redis transport has moved to a separate package. Install it by running:

  1. $ composer require symfony/redis-messenger

The Redis transport DSN may looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
  3. # Full DSN Example
  4. MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
  5. # Redis Cluster Example
  6. MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
  7. # Unix Socket Example
  8. MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

New in version 5.1: The Unix socket DSN was introduced in Symfony 5.1.

A number of options can be configured via the DSN or via the options key under the transport in messenger.yaml:

OptionDescriptionDefault
streamThe Redis stream namemessages
groupThe Redis consumer group namesymfony
consumerConsumer name used in Redisconsumer
auto_setupCreate the Redis group automatically?true
authThe Redis password 
delete_after_ackIf true, messages are deleted automatically after processing themfalse
delete_after_rejectIf true, messages are deleted automatically if they are rejectedtrue
lazyConnect only when a connection is really neededfalse
serializerHow to serialize the final payload in Redis (the Redis::OPT_SERIALIZER option)Redis::SERIALIZER_PHP
stream_max_entriesThe maximum number of entries which the stream will be trimmed to. Set it to a large enough number to avoid losing pending messages0 (which means “no trimming”)
tlsEnable TLS support for the connectionfalse
redeliver_timeoutTimeout before retrying a pending message which is owned by an abandoned consumer (if a worker died for some reason, this will occur, eventually you should retry the message) - in seconds.3600
claim_intervalInterval on which pending/abandoned messages should be checked for to claim - in milliseconds60000 (1 Minute)

Caution

There should never be more than one messenger:consume command running with the same combination of stream, group and consumer, or messages could end up being handled more than once. If you run multiple queue workers, consumer can be set to an environment variable (like %env(MESSENGER_CONSUMER_NAME)%) set by Supervisor (example below) or any other service used to manage the worker processes. In a container environment, the HOSTNAME can be used as the consumer name, since there is only one worker per container/host. If using Kubernetes to orchestrate the containers, consider using a StatefulSet to have stable names.

Tip

Set delete_after_ack to true (if you use a single group) or define stream_max_entries (if you can estimate how many max entries is acceptable in your case) to avoid memory leaks. Otherwise, all messages will remain forever in Redis.

New in version 5.1: The delete_after_ack, redeliver_timeout and claim_interval options were introduced in Symfony 5.1.

New in version 5.2: The delete_after_reject and lazy options were introduced in Symfony 5.2.

In Memory Transport

The in-memory transport does not actually deliver messages. Instead, it holds them in memory during the request, which can be useful for testing. For example, if you have an async_priority_normal transport, you could override it in the test environment to use this transport:

  • YAML

    1. # config/packages/test/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async_priority_normal: 'in-memory://'
  • XML

    1. <!-- config/packages/test/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="async_priority_normal" dsn="in-memory://"/>
    13. </framework:messenger>
    14. </framework:config>
    15. </container>
  • PHP

    1. // config/packages/test/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->transport('async_priority_normal')
    6. ->dsn('in-memory://');
    7. };

Then, while testing, messages will not be delivered to the real transport. Even better, in a test, you can check that exactly one message was sent during a request:

  1. // tests/Controller/DefaultControllerTest.php
  2. namespace App\Tests\Controller;
  3. use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
  4. use Symfony\Component\Messenger\Transport\InMemoryTransport;
  5. class DefaultControllerTest extends WebTestCase
  6. {
  7. public function testSomething()
  8. {
  9. $client = static::createClient();
  10. // ...
  11. $this->assertSame(200, $client->getResponse()->getStatusCode());
  12. /* @var InMemoryTransport $transport */
  13. $transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
  14. $this->assertCount(1, $transport->getSent());
  15. }
  16. }

The transport has a number of options:

serialize (boolean, default: false)

Whether to serialize messages or not. This is useful to test an additional layer, especially when you use your own message serializer.

New in version 5.3: The serialize option was introduced in Symfony 5.3.

Note

All in-memory transports will be reset automatically after each test in test classes extending Symfony\Bundle\FrameworkBundle\Test\KernelTestCase or Symfony\Bundle\FrameworkBundle\Test\WebTestCase.

Amazon SQS

New in version 5.1: The Amazon SQS transport was introduced in Symfony 5.1.

The Amazon SQS transport is perfect for application hosted on AWS. Install it by running:

  1. $ composer require symfony/amazon-sqs-messenger

The SQS transport DSN may looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
  3. MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

The transport will automatically create queues that are needed. This can be disabled setting the auto_setup option to false.

Tip

Before sending or receiving a message, Symfony needs to convert the queue name into an AWS queue URL by calling the GetQueueUrl API in AWS. This extra API call can be avoided by providing a DSN which is the queue URL.

New in version 5.2: The feature to provide the queue URL in the DSN was introduced in Symfony 5.2.

The transport has a number of options:

OptionDescriptionDefault
access_keyAWS access keymust be urlencoded
accountIdentifier of the AWS accountThe owner of the credentials
auto_setupWhether the queue should be created automatically during send / get.true
buffer_sizeNumber of messages to prefetch9
debugIf true it logs all HTTP requests and responses (it impacts performance)false
endpointAbsolute URL to the SQS servicehttps://sqs.eu-west-1.amazonaws.com
poll_timeoutWait for new message duration in seconds0.1
queue_nameName of the queuemessages
regionName of the AWS regioneu-west-1
secret_keyAWS secret keymust be urlencoded
visibility_timeoutAmount of seconds the message will not be visible (Visibility Timeout)Queue’s configuration
wait_timeLong polling duration in seconds20

New in version 5.3: The debug option was introduced in Symfony 5.3.

Note

The wait_time parameter defines the maximum duration Amazon SQS should wait until a message is available in a queue before sending a response. It helps reducing the cost of using Amazon SQS by eliminating the number of empty responses.

The poll_timeout parameter defines the duration the receiver should wait before returning null. It avoids blocking other receivers from being called.

Note

If the queue name is suffixed by .fifo, AWS will create a FIFO queue. Use the stamp Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp to define the Message group ID and the Message deduplication ID.

FIFO queues don’t support setting a delay per message, a value of delay: 0 is required in the retry strategy settings.

Serializing Messages

When messages are sent to (and received from) a transport, they’re serialized using PHP’s native serialize() &unserialize() functions. You can change this globally (or for each transport) to a service that implements Symfony\Component\Messenger\Transport\Serialization\SerializerInterface:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. serializer:
    5. default_serializer: messenger.transport.symfony_serializer
    6. symfony_serializer:
    7. format: json
    8. context: { }
    9. transports:
    10. async_priority_normal:
    11. dsn: # ...
    12. serializer: messenger.transport.symfony_serializer
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:serializer default-serializer="messenger.transport.symfony_serializer">
    13. <framework:symfony-serializer format="json">
    14. <framework:context/>
    15. </framework:symfony-serializer>
    16. </framework:serializer>
    17. <framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/>
    18. </framework:messenger>
    19. </framework:config>
    20. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->serializer()
    6. ->defaultSerializer('messenger.transport.symfony_serializer')
    7. ->symfonySerializer()
    8. ->format('json')
    9. ->context('foo', 'bar');
    10. $messenger->transport('async_priority_normal')
    11. ->dsn(...)
    12. ->serializer('messenger.transport.symfony_serializer');
    13. };

The messenger.transport.symfony_serializer is a built-in service that uses the Serializer component and can be configured in a few ways. If you do choose to use the Symfony serializer, you can control the context on a case-by-case basis via the Symfony\Component\Messenger\Stamp\SerializerStamp (see Envelopes & Stamps).

Tip

When sending/receiving messages to/from another application, you may need more control over the serialization process. Using a custom serializer provides that control. See SymfonyCasts’ message serializer tutorial for details.

Customizing Handlers

Manually Configuring Handlers

Symfony will normally find and register your handler automatically. But, you can also configure a handler manually - and pass it some extra config - by tagging the handler service with messenger.message_handler

  • YAML

    1. # config/services.yaml
    2. services:
    3. App\MessageHandler\SmsNotificationHandler:
    4. tags: [messenger.message_handler]
    5. # or configure with options
    6. tags:
    7. -
    8. name: messenger.message_handler
    9. # only needed if can't be guessed by type-hint
    10. handles: App\Message\SmsNotification
  • XML

    1. <!-- config/services.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xsi:schemaLocation="http://symfony.com/schema/dic/services
    6. https://symfony.com/schema/dic/services/services-1.0.xsd">
    7. <services>
    8. <service id="App\MessageHandler\SmsNotificationHandler">
    9. <!-- handles is only needed if it can't be guessed by type-hint -->
    10. <tag name="messenger.message_handler"
    11. handles="App\Message\SmsNotification"/>
    12. </service>
    13. </services>
    14. </container>
  • PHP

    1. // config/services.php
    2. use App\Message\SmsNotification;
    3. use App\MessageHandler\SmsNotificationHandler;
    4. $container->register(SmsNotificationHandler::class)
    5. ->addTag('messenger.message_handler', [
    6. // only needed if can't be guessed by type-hint
    7. 'handles' => SmsNotification::class,
    8. ]);

Possible options to configure with tags are:

  • bus
  • from_transport
  • handles
  • method
  • priority

Handler Subscriber & Options

A handler class can handle multiple messages or configure itself by implementing Symfony\Component\Messenger\Handler\MessageSubscriberInterface:

  1. // src/MessageHandler/SmsNotificationHandler.php
  2. namespace App\MessageHandler;
  3. use App\Message\OtherSmsNotification;
  4. use App\Message\SmsNotification;
  5. use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
  6. class SmsNotificationHandler implements MessageSubscriberInterface
  7. {
  8. public function __invoke(SmsNotification $message)
  9. {
  10. // ...
  11. }
  12. public function handleOtherSmsNotification(OtherSmsNotification $message)
  13. {
  14. // ...
  15. }
  16. public static function getHandledMessages(): iterable
  17. {
  18. // handle this message on __invoke
  19. yield SmsNotification::class;
  20. // also handle this message on handleOtherSmsNotification
  21. yield OtherSmsNotification::class => [
  22. 'method' => 'handleOtherSmsNotification',
  23. //'priority' => 0,
  24. //'bus' => 'messenger.bus.default',
  25. ];
  26. }
  27. }

Binding Handlers to Different Transports

Each message can have multiple handlers, and when a message is consumed all of its handlers are called. But you can also configure a handler to only be called when it’s received from a specific transport. This allows you to have a single message where each handler is called by a different “worker” that’s consuming a different transport.

Suppose you have an UploadedImage message with two handlers:

  • ThumbnailUploadedImageHandler: you want this to be handled by a transport called image_transport
  • NotifyAboutNewUploadedImageHandler: you want this to be handled by a transport called async_priority_normal

To do this, add the from_transport option to each handler. For example:

  1. // src/MessageHandler/ThumbnailUploadedImageHandler.php
  2. namespace App\MessageHandler;
  3. use App\Message\UploadedImage;
  4. use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
  5. class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
  6. {
  7. public function __invoke(UploadedImage $uploadedImage)
  8. {
  9. // do some thumbnailing
  10. }
  11. public static function getHandledMessages(): iterable
  12. {
  13. yield UploadedImage::class => [
  14. 'from_transport' => 'image_transport',
  15. ];
  16. }
  17. }

And similarly:

  1. // src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
  2. // ...
  3. class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
  4. {
  5. // ...
  6. public static function getHandledMessages(): iterable
  7. {
  8. yield UploadedImage::class => [
  9. 'from_transport' => 'async_priority_normal',
  10. ];
  11. }
  12. }

Then, make sure to “route” your message to both transports:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. async_priority_normal: # ...
    6. image_transport: # ...
    7. routing:
    8. # ...
    9. 'App\Message\UploadedImage': [image_transport, async_priority_normal]
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="async_priority_normal" dsn="..."/>
    13. <framework:transport name="image_transport" dsn="..."/>
    14. <framework:routing message-class="App\Message\UploadedImage">
    15. <framework:sender service="image_transport"/>
    16. <framework:sender service="async_priority_normal"/>
    17. </framework:routing>
    18. </framework:messenger>
    19. </framework:config>
    20. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $messenger->transport('async_priority_normal')->dsn(...);
    6. $messenger->transport('image_transport')->dsn(...);
    7. $messenger->routing('App\Message\UploadedImage')
    8. ->senders(['image_transport', 'async_priority_normal']);
    9. };

That’s it! You can now consume each transport:

  1. # will only call ThumbnailUploadedImageHandler when handling the message
  2. $ php bin/console messenger:consume image_transport -vv
  3. $ php bin/console messenger:consume async_priority_normal -vv

Caution

If a handler does not have from_transport config, it will be executed on every transport that the message is received from.

Extending Messenger

Envelopes & Stamps

A message can be any PHP object. Sometimes, you may need to configure something extra about the message - like the way it should be handled inside AMQP or adding a delay before the message should be handled. You can do that by adding a “stamp” to your message:

  1. use Symfony\Component\Messenger\Envelope;
  2. use Symfony\Component\Messenger\MessageBusInterface;
  3. use Symfony\Component\Messenger\Stamp\DelayStamp;
  4. public function index(MessageBusInterface $bus)
  5. {
  6. $bus->dispatch(new SmsNotification('...'), [
  7. // wait 5 seconds before processing
  8. new DelayStamp(5000),
  9. ]);
  10. // or explicitly create an Envelope
  11. $bus->dispatch(new Envelope(new SmsNotification('...'), [
  12. new DelayStamp(5000),
  13. ]));
  14. // ...
  15. }

Internally, each message is wrapped in an Envelope, which holds the message and stamps. You can create this manually or allow the message bus to do it. There are a variety of different stamps for different purposes and they’re used internally to track information about a message - like the message bus that’s handling it or if it’s being retried after failure.

Middleware

What happens when you dispatch a message to a message bus depends on its collection of middleware and their order. By default, the middleware configured for each bus looks like this:

  1. add_bus_name_stamp_middleware - adds a stamp to record which bus this message was dispatched into;
  2. dispatch_after_current_bus- see Transactional Messages: Handle New Messages After Handling is Done;
  3. failed_message_processing_middleware - processes messages that are being retried via the failure transport to make them properly function as if they were being received from their original transport;
  4. Your own collection of middleware;
  5. send_message - if routing is configured for the transport, this sends messages to that transport and stops the middleware chain;
  6. handle_message - calls the message handler(s) for the given message.

Note

These middleware names are actually shortcut names. The real service ids are prefixed with messenger.middleware. (e.g. messenger.middleware.handle_message).

The middleware are executed when the message is dispatched but also again when a message is received via the worker (for messages that were sent to a transport to be handled asynchronously). Keep this in mind if you create your own middleware.

You can add your own middleware to this list, or completely disable the default middleware and only include your own:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. buses:
    5. messenger.bus.default:
    6. # disable the default middleware
    7. default_middleware: false
    8. # and/or add your own
    9. middleware:
    10. # service ids that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
    11. - 'App\Middleware\MyMiddleware'
    12. - 'App\Middleware\AnotherMiddleware'
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <!-- default-middleware: disable the default middleware -->
    13. <framework:bus name="messenger.bus.default" default-middleware="false"/>
    14. <!-- and/or add your own -->
    15. <framework:middleware id="App\Middleware\MyMiddleware"/>
    16. <framework:middleware id="App\Middleware\AnotherMiddleware"/>
    17. </framework:messenger>
    18. </framework:config>
    19. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $bus = $messenger->bus('messenger.bus.default')
    6. ->defaultMiddleware(false);
    7. $bus->middleware()->id('App\Middleware\MyMiddleware');
    8. $bus->middleware()->id('App\Middleware\AnotherMiddleware');
    9. };

Note

If a middleware service is abstract, a different instance of the service will be created per bus.

Middleware for Doctrine

New in version 1.11: The following Doctrine middleware were introduced in DoctrineBundle 1.11.

If you use Doctrine in your app, a number of optional middleware exist that you may want to use:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. buses:
    5. command_bus:
    6. middleware:
    7. # each time a message is handled, the Doctrine connection
    8. # is "pinged" and reconnected if it's closed. Useful
    9. # if your workers run for a long time and the database
    10. # connection is sometimes lost
    11. - doctrine_ping_connection
    12. # After handling, the Doctrine connection is closed,
    13. # which can free up database connections in a worker,
    14. # instead of keeping them open forever
    15. - doctrine_close_connection
    16. # wraps all handlers in a single Doctrine transaction
    17. # handlers do not need to call flush() and an error
    18. # in any handler will cause a rollback
    19. - doctrine_transaction
    20. # or pass a different entity manager to any
    21. #- doctrine_transaction: ['custom']
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:bus name="command_bus">
    13. <framework:middleware id="doctrine_transaction"/>
    14. <framework:middleware id="doctrine_ping_connection"/>
    15. <framework:middleware id="doctrine_close_connection"/>
    16. <!-- or pass a different entity manager to any -->
    17. <!--
    18. <framework:middleware id="doctrine_transaction">
    19. <framework:argument>custom</framework:argument>
    20. </framework:middleware>
    21. -->
    22. </framework:bus>
    23. </framework:messenger>
    24. </framework:config>
    25. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $bus = $messenger->bus('command_bus');
    6. $bus->middleware()->id('doctrine_transaction');
    7. $bus->middleware()->id('doctrine_ping_connection');
    8. $bus->middleware()->id('doctrine_close_connection');
    9. // Using another entity manager
    10. $bus->middleware()->id('doctrine_transaction')
    11. ->arguments(['custom']);
    12. };

Other Middlewares

New in version 5.3: The router_context middleware was introduced in Symfony 5.3.

Add the router_context middleware if you need to generate absolute URLs in the consumer (e.g. render a template with links). This middleware stores the original request context (i.e. the host, the HTTP port, etc.) which is needed when building absolute URLs.

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. buses:
    5. command_bus:
    6. middleware:
    7. - router_context
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:bus name="command_bus">
    13. <framework:middleware id="router_context"/>
    14. </framework:bus>
    15. </framework:messenger>
    16. </framework:config>
    17. </container>
  • PHP

    1. // config/packages/messenger.php
    2. use Symfony\Config\FrameworkConfig;
    3. return static function (FrameworkConfig $framework) {
    4. $messenger = $framework->messenger();
    5. $bus = $messenger->bus('command_bus');
    6. $bus->middleware()->id('router_context');
    7. };

Messenger Events

In addition to middleware, Messenger also dispatches several events. You can create an event listener to hook into various parts of the process. For each, the event class is the event name:

  • Symfony\Component\Messenger\Event\WorkerStartedEvent
  • Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent
  • Symfony\Component\Messenger\Event\SendMessageToTransportsEvent
  • Symfony\Component\Messenger\Event\WorkerMessageFailedEvent
  • Symfony\Component\Messenger\Event\WorkerMessageHandledEvent
  • Symfony\Component\Messenger\Event\WorkerRunningEvent
  • Symfony\Component\Messenger\Event\WorkerStoppedEvent

Multiple Buses, Command & Event Buses

Messenger gives you a single message bus service by default. But, you can configure as many as you want, creating “command”, “query” or “event” buses and controlling their middleware. See Multiple Buses.

Learn more

This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.