Messenger: Sync & Queued Message Handling

Messenger provides a message bus with the ability to send messages and thenhandle them immediately in your application or send them through transports(e.g. queues) to be handled later. To learn more deeply about it, read theMessenger component docs.

Installation

In applications using Symfony Flex, run this command toinstall messenger:

  1. $ composer require messenger

Creating a Message & Handler

Messenger centers around two different classes that you'll create: (1) a messageclass that holds data and (2) a handler(s) class that will be called when thatmessage is dispatched. The handler class will read the message class and performsome task.

There are no specific requirements for a message class, except that it can beserialized:

  1. // src/Message/SmsNotification.php
  2. namespace App\Message;
  3.  
  4. class SmsNotification
  5. {
  6. private $content;
  7.  
  8. public function __construct(string $content)
  9. {
  10. $this->content = $content;
  11. }
  12.  
  13. public function getContent(): string
  14. {
  15. return $this->content;
  16. }
  17. }

A message handler is a PHP callable, the easiest way to create it is to create a class that implementsMessageHandlerInterface and has an __invoke() method that'stype-hinted with the message class (or a message interface):

  1. // src/MessageHandler/SmsNotificationHandler.php
  2. namespace App\MessageHandler;
  3.  
  4. use App\Message\SmsNotification;
  5. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  6.  
  7. class SmsNotificationHandler implements MessageHandlerInterface
  8. {
  9. public function __invoke(SmsNotification $message)
  10. {
  11. // ... do some work - like sending an SMS message!
  12. }
  13. }

Thanks to autoconfiguration and the SmsNotificationtype-hint, Symfony knows that this handler should be called when an SmsNotificationmessage is dispatched. Most of the time, this is all you need to do. But you canalso manually configure message handlers. Tosee all the configured handlers, run:

  1. $ php bin/console debug:messenger

Dispatching the Message

You're ready! To dispatch the message (and call the handler), inject themessage_bus service (via the MessageBusInterface), like in a controller:

  1. // src/Controller/DefaultController.php
  2. namespace App\Controller;
  3.  
  4. use App\Message\SmsNotification;
  5. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  6. use Symfony\Component\Messenger\MessageBusInterface;
  7.  
  8. class DefaultController extends AbstractController
  9. {
  10. public function index(MessageBusInterface $bus)
  11. {
  12. // will cause the SmsNotificationHandler to be called
  13. $bus->dispatch(new SmsNotification('Look! I created a message!'));
  14.  
  15. // or use the shortcut
  16. $this->dispatchMessage(new SmsNotification('Look! I created a message!'));
  17.  
  18. // ...
  19. }
  20. }

Transports: Async/Queued Messages

By default, messages are handled as soon as they are dispatched. If you wantto handle a message asynchronously, you can configure a transport. A transportis capable of sending messages (e.g. to a queueing system) and thenreceiving them via a worker. Messenger supportsmultiple transports.

Note

If you want to use a transport that's not supported, check out theEnqueue's transport, which supports things like Kafka, Amazon SQS andGoogle Pub/Sub.

A transport is registered using a "DSN". Thanks to Messenger's Flex recipe, your.env file already has a few examples.

  1. # MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
  2. # MESSENGER_TRANSPORT_DSN=doctrine://default
  3. # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Uncomment whichever transport you want (or set it in .env.local). SeeTransport Configuration for more details.

Next, in config/packages/messenger.yaml, let's define a transport called asyncthat uses this configuration:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async: "%env(MESSENGER_TRANSPORT_DSN)%"
  6.  
  7. # or expanded to configure more options
  8. #async:
  9. # dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
  10. # options: []
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
  14.  
  15. <!-- or expanded to configure more options -->
  16. <framework:transport name="async"
  17. dsn="%env(MESSENGER_TRANSPORT_DSN)%"
  18. >
  19. <option key="...">...</option>
  20. </framework:transport>
  21. </framework:messenger>
  22. </framework:config>
  23. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'transports' => [
  5. 'async' => '%env(MESSENGER_TRANSPORT_DSN)%',
  6.  
  7. // or expanded to configure more options
  8. 'async' => [
  9. 'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
  10. 'options' => []
  11. ],
  12. ],
  13. ],
  14. ]);

Routing Messages to a Transport

Now that you have a transport configured, instead of handling a message immediately,you can configure them to be sent to a transport:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async: "%env(MESSENGER_TRANSPORT_DSN)%"
  6.  
  7. routing:
  8. # async is whatever name you gave your transport above
  9. 'App\Message\SmsNotification': async
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <framework:routing message-class="App\Message\SmsNotification">
  14. <!-- async is whatever name you gave your transport above -->
  15. <framework:sender service="async"/>
  16. </framework:routing>
  17. </framework:messenger>
  18. </framework:config>
  19. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'routing' => [
  5. // async is whatever name you gave your transport above
  6. 'App\Message\SmsNotification' => 'async',
  7. ],
  8. ],
  9. ]);

Thanks to this, the App\Message\SmsNotification will be sent to the asynctransport and its handler(s) will not be called immediately. Any messages notmatched under routing will still be handled immediately.

You can also route classes by their parent class or interface. Or send messagesto multiple transport:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. routing:
  5. # route all messages that extend this example base class or interface
  6. 'App\Message\AbstractAsyncMessage': async
  7. 'App\Message\AsyncMessageInterface': async
  8.  
  9. 'My\Message\ToBeSentToTwoSenders': [async, audit]
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <!-- route all messages that extend this example base class or interface -->
  14. <framework:routing message-class="App\Message\AbstractAsyncMessage">
  15. <framework:sender service="async"/>
  16. </framework:routing>
  17. <framework:routing message-class="App\Message\AsyncMessageInterface">
  18. <framework:sender service="async"/>
  19. </framework:routing>
  20. <framework:routing message-class="My\Message\ToBeSentToTwoSenders">
  21. <framework:sender service="async"/>
  22. <framework:sender service="audit"/>
  23. </framework:routing>
  24. </framework:messenger>
  25. </framework:config>
  26. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'routing' => [
  5. // route all messages that extend this example base class or interface
  6. 'App\Message\AbstractAsyncMessage' => 'async',
  7. 'App\Message\AsyncMessageInterface' => 'async',
  8. 'My\Message\ToBeSentToTwoSenders' => ['async', 'audit'],
  9. ],
  10. ],
  11. ]);

Doctrine Entities in Messages

If you need to pass a Doctrine entity in a message, it's better to pass the entity'sprimary key (or whatever relevant information the handler actually needs, like email,etc) instead of the object:

  1. class NewUserWelcomeEmail
  2. {
  3. private $userId;
  4.  
  5. public function __construct(int $userId)
  6. {
  7. $this->userId = $userId;
  8. }
  9.  
  10. public function getUserId(): int
  11. {
  12. return $this->userId;
  13. }
  14. }

Then, in your handler, you can query for a fresh object:

  1. // src/MessageHandler/NewUserWelcomeEmailHandler.php
  2. namespace App\MessageHandler;
  3.  
  4. use App\Message\NewUserWelcomeEmail;
  5. use App\Repository\UserRepository;
  6. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  7.  
  8. class NewUserWelcomeEmailHandler implements MessageHandlerInterface
  9. {
  10. private $userRepository;
  11.  
  12. public function __construct(UserRepository $userRepository)
  13. {
  14. $this->userRepository = $userRepository;
  15. }
  16.  
  17. public function __invoke(NewUserWelcomeEmail $welcomeEmail)
  18. {
  19. $user = $this->userRepository->find($welcomeEmail->getUserId());
  20.  
  21. // ... send an email!
  22. }
  23. }

This guarantees the entity contains fresh data.

Handling Messages Synchronously

If a message doesn't match any routing rules, it won'tbe sent to any transport and will be handled immediately. In some cases (likewhen binding handlers to different transports),it's easier or more flexible to handle this explicitly: by creating a synctransport and "sending" messages there to be handled immediately:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. # ... other transports
  6.  
  7. sync: 'sync://'
  8.  
  9. routing:
  10. App\Message\SmsNotification: sync
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <!-- ... other transports -->
  14.  
  15. <framework:transport name="sync" dsn="sync://"/>
  16.  
  17. <framework:routing message-class="App\Message\SmsNotification">
  18. <framework:sender service="sync"/>
  19. </framework:routing>
  20. </framework:messenger>
  21. </framework:config>
  22. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'transports' => [
  5. // ... other transports
  6.  
  7. 'sync' => 'sync://',
  8. ],
  9. 'routing' => [
  10. 'App\Message\SmsNotification' => 'sync',
  11. ],
  12. ],
  13. ]);

Creating your Own Transport

You can also create your own transport if you need to send or receive messagesfrom something that is not supported. See How to Create Your own Messenger Transport.

Consuming Messages (Running the Worker)

Once your messages have been routed, in most cases, you'll need to "consume" them.You can do this with the messenger:consume command:

  1. $ php bin/console messenger:consume async
  2.  
  3. # use -vv to see details about what's happening
  4. $ php bin/console messenger:consume async -vv

New in version 4.3: The messenger:consume command was renamed in Symfony 4.3 (previously itwas called messenger:consume-messages).

The first argument is the receiver's name (or service id if you routed to acustom service). By default, the command will run forever: looking for new messageson your transport and handling them. This command is called your "worker".

Deploying to Production

On production, there are a few important things to think about:

  • Use Supervisor to keep your worker(s) running
  • You'll want one or more "workers" running at all times. To do that, use aprocess control system like Supervisor.
  • Don't Let Workers Run Forever
  • Some services (like Doctrine's EntityManager) will consume more memoryover time. So, instead of allowing your worker to run forever, use a flaglike messenger:consume —limit=10 to tell your worker to only handle 10messages before exiting (then Supervisor will create a new process). Thereare also other options like —memory-limit=128M and —time-limit=3600.
  • Restart Workers on Deploy
  • Each time you deploy, you'll need to restart all your worker processes sothat they see the newly deployed code. To do this, run messenger:stop-workerson deploy. This will signal to each worker that it should finish the messageit's currently handling and shut down gracefully. Then, Supervisor will createnew worker processes. The command uses the appcache internally - so make sure this is configured to use an adapter you like.

Prioritized Transports

Sometimes certain types of messages should have a higher priority and be handledbefore others. To make this possible, you can create multiple transports and routedifferent messages to them. For example:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async_priority_high:
  6. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
  7. options:
  8. # queue_name is specific to the doctrine transport
  9. queue_name: high
  10.  
  11. # for amqp send to a separate exchange then queue
  12. #exchange:
  13. # name: high
  14. #queues:
  15. # messages_high: ~
  16. # or redis try "group"
  17. async_priority_low:
  18. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
  19. options:
  20. queue_name: low
  21.  
  22. routing:
  23. 'App\Message\SmsNotification': async_priority_low
  24. 'App\Message\NewUserWelcomeEmail': async_priority_high
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
  14. <option key="queue_name">high</option>
  15. </framework:transport>
  16. <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
  17. <option key="queue_name">low</option>
  18. </framework:transport>
  19.  
  20. <framework:routing message-class="App\Message\SmsNotification">
  21. <framework:sender service="async_priority_low"/>
  22. </framework:routing>
  23. <framework:routing message-class="App\Message\NewUserWelcomeEmail">
  24. <framework:sender service="async_priority_high"/>
  25. </framework:routing>
  26. </framework:messenger>
  27. </framework:config>
  28. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'transports' => [
  5. 'async_priority_high' => [
  6. 'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
  7. 'options' => [
  8. 'queue_name' => 'high',
  9. ],
  10. ],
  11. 'async_priority_low' => [
  12. 'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
  13. 'options' => [
  14. 'queue_name' => 'low',
  15. ],
  16. ],
  17. ],
  18. 'routing' => [
  19. 'App\Message\SmsNotification' => 'async_priority_low',
  20. 'App\Message\NewUserWelcomeEmail' => 'async_priority_high',
  21. ],
  22. ],
  23. ]);

You can then run individual workers for each transport or instruct one workerto handle messages in a priority order:

  1. $ php bin/console messenger:consume async_priority_high async_priority_low

The worker will always first look for messages waiting on asyncpriority_high. Ifthere are none, _then it will consume messages from async_priority_low.

Supervisor Configuration

Supervisor is a great tool to guarantee that your worker process(es) isalways running (even if it closes due to failure, hitting a message limitor thanks to messenger:stop-workers). You can install it on Ubuntu, forexample, via:

  1. $ sudo apt-get install supervisor

Supervisor configuration files typically live in a /etc/supervisor/conf.ddirectory. For example, you can create a new messenger-worker.conf filethere to make sure that 2 instances of messenger:consume are running at alltimes:

  1. ;/etc/supervisor/conf.d/messenger-worker.conf
  2. [program:messenger-consume]
  3. command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
  4. user=ubuntu
  5. numprocs=2
  6. autostart=true
  7. autorestart=true
  8. process_name=%(program_name)s_%(process_num)02d

Change the async argument to use the name of your transport (or transports)and user to the Unix user on your server. Next, tell Supervisor to read yourconfig and start your workers:

  1. $ sudo supervisorctl reread
  2.  
  3. $ sudo supervisorctl update
  4.  
  5. $ sudo supervisorctl start messenger-consume:*

See the Supervisor docs for more details.

Retries & Failures

If an exception is thrown while consuming a message from a transport it willautomatically be re-sent to the transport to be tried again. By default, a messagewill be retried 3 times before being discarded orsent to the failure transport. Each retrywill also be delayed, in case the failure was due to a temporary issue. All ofthis is configurable for each transport:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async_priority_high:
  6. dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
  7.  
  8. # default configuration
  9. retry_strategy:
  10. max_retries: 3
  11. # milliseconds delay
  12. delay: 1000
  13. # causes the delay to be higher before each retry
  14. # e.g. 1 second delay, 2 seconds, 4 seconds
  15. multiplier: 2
  16. max_delay: 0
  17. # override all of this with a service that
  18. # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
  19. # service: null

Avoiding Retrying

Sometimes handling a message might fail in a way that you know is permanentand should not be retried. If you throwUnrecoverableMessageHandlingException,the message will not be retried.

Saving & Retrying Failed Messages

If a message fails it is retried multiple times (max_retries) and then willbe discarded. To avoid this happening, you can instead configure a failure_transport:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. # after retrying, messages will be sent to the "failed" transport
  5. failure_transport: failed
  6.  
  7. transports:
  8. # ... other transports
  9.  
  10. failed: 'doctrine://default?queue_name=failed'

In this example, if handling a message fails 3 times (default maxretries),it will then be sent to the failed transport. While you _can usemessenger:consume failed to consume this like a normal transport, you'llusually want to manually view the messages in the failure transport and chooseto retry them:

  1. # see all messages in the failure transport
  2. $ php bin/console messenger:failed:show
  3.  
  4. # see details about a specific failure
  5. $ php bin/console messenger:failed:show 20 -vv
  6.  
  7. # view and retry messages one-by-one
  8. $ php bin/console messenger:failed:retry -vv
  9.  
  10. # retry specific messages
  11. $ php bin/console messenger:failed:retry 20 30 --force
  12.  
  13. # remove a message without retrying it
  14. $ php bin/console messenger:failed:remove 20

If the message fails again, it will be re-sent back to the failure transportdue to the normal retry rules. Once the maxretry has been hit, the message will be discarded permanently.

Transport Configuration

Messenger supports a number of different transport types, each with their ownoptions.

AMQP Transport

The amqp transport configuration looks like this:

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages

To use Symfony's built-in AMQP transport, you need the AMQP PHP extension.

Note

By default, the transport will automatically create any exchanges, queues andbinding keys that are needed. That can be disabled, but some functionalitymay not work correctly (like delayed queues).

The transport has a number of other options, including ways to configurethe exchange, queues binding keys and more. See the documentation onConnection.

You can also configure AMQP-specific settings on your message by addingAmqpStamp toyour Envelope:

  1. use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
  2. // ...
  3.  
  4. $attributes = [];
  5. $bus->dispatch(new SmsNotification(), [
  6. new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes)
  7. ]);

Caution

The consumers do not show up in an admin panel as this transport does not rely on\AmqpQueue::consume() which is blocking. Having a blocking receiver makesthe —time-limit/—memory-limit options of the messenger:consume command as well asthe messenger:stop-workers command inefficient, as they all rely on the fact thatthe receiver returns immediately no matter if it finds a message or not. The consumeworker is responsible for iterating until it receives a message to handle and/or until oneof the stop conditions is reached. Thus, the worker's stop logic cannot be reached if itis stuck in a blocking call.

Doctrine Transport

New in version 4.3: The Doctrine transport was introduced in Symfony 4.3.

The Doctrine transport can be used to store messages in a database table.

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=doctrine://default

The format is doctrine://<connection_name>, in case you have multiple connectionsand want to use one other than the "default". The transport will automatically createa table named messenger_messages (this is configurable) when the transport isfirst used. You can disable that with the auto_setup option and set the tableup manually by calling the messenger:setup-transports command.

Tip

To avoid tools like Doctrine Migrations from trying to remove this table becauseit's not part of your normal schema, you can set the schema_filter option:

  1. # config/packages/doctrine.yaml
  2. doctrine:
  3. dbal:
  4. schema_filter: '~^(?!messenger_messages)~'

The transport has a number of options:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async_priority_high: "%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority"
  6. async_normal:
  7. dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
  8. options:
  9. queue_name: normal_priority
  • XML
  1. <!-- config/packages/messenger.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:framework="http://symfony.com/schema/dic/symfony"
  6. xsi:schemaLocation="http://symfony.com/schema/dic/services
  7. https://symfony.com/schema/dic/services/services-1.0.xsd
  8. http://symfony.com/schema/dic/symfony
  9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
  10.  
  11. <framework:config>
  12. <framework:messenger>
  13. <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority"/>
  14. <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
  15. <framework:option queue_name="normal_priority"/>
  16. </framework:transport>
  17. </framework:messenger>
  18. </framework:config>
  19. </container>
  • PHP
  1. // config/packages/messenger.php
  2. $container->loadFromExtension('framework', [
  3. 'messenger' => [
  4. 'transports' => [
  5. 'async_priority_high' => 'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority',
  6. 'async_priority_low' => [
  7. 'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
  8. 'options' => [
  9. 'queue_name' => 'normal_priority'
  10. ]
  11. ],
  12. ],
  13. ],
  14. ]);

Options defined under options take precedence over ones defined in the DSN.

OptionDescriptionDefault
table_nameName of the tablemessenger_messages
queue_nameName of the queue (a column in thetable, to use one table formultiple transports)default
redeliver_timeoutTimeout before retrying a messagethat's in the queue but in the"handling" state (if a worker diedfor some reason, this will occur,eventually you should retry themessage) - in seconds.3600
auto_setupWhether the table should be createdautomatically during send / get.true

Redis Transport

New in version 4.3: The Redis transport was introduced in Symfony 4.3.

The Redis transport uses streams to queue messages.

  1. # .env
  2. MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
  3. # Full DSN Example
  4. MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1

To use the Redis transport, you will need the Redis PHP extension (^4.3) anda running Redis server (^5.0).

Caution

The Redis transport does not support "delayed" messages.

A number of options can be configured via the DSN or via the options keyunder the transport in messenger.yaml:

OptionDescriptionDefault
streamThe Redis stream namemessages
groupThe Redis consumer group namesymfony
consumerConsumer name used in Redisconsumer
auto_setupCreate the Redis group automatically?true
authThe Redis password
serializerHow to serialize the final payloadin Redis (theRedis::OPT_SERIALIZER option)Redis::SERIALIZER_PHP

In Memory Transport

New in version 4.3: The in-memory transport was introduced in Symfony 4.3.

The in-memory transport does not actually delivery messages. Instead, itholds them in memory during the request, which can be useful for testing.For example, if you have an async_priority_normal transport, you couldoverride it in the test environment to use this transport:

  1. # config/packages/test/messenger.yaml
  2. framework:
  3. messenger:
  4. transports:
  5. async_priority_normal: 'in-memory:///'

Then, while testing, messages will not be delivered to the real transport.Even better, in a test, you can check that exactly one message was sentduring a request:

  1. // tests/DefaultControllerTest.php
  2. namespace App\Tests;
  3.  
  4. use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
  5. use Symfony\Component\Messenger\Transport\InMemoryTransport;
  6.  
  7. class DefaultControllerTest extends WebTestCase
  8. {
  9. public function testSomething()
  10. {
  11. $client = static::createClient();
  12. // ...
  13.  
  14. $this->assertSame(200, $client->getResponse()->getStatusCode());
  15.  
  16. /* @var InMemoryTransport $transport */
  17. $transport = self::$container->get('messenger.transport.async_priority_normal');
  18. $this->assertCount(1, $transport->get());
  19. }
  20. }

Note

All in-memory transports will be reset automatically after each test intest classes extendingKernelTestCaseor WebTestCase.

Serializing Messages

New in version 4.3: The default serializer changed in 4.3 from the Symfony serializer to thenative PHP serializer. Existing applications should configure their transportsto use the Symfony serializer to avoid losing already-queued messages afterupgrading.

When messages are sent to (and received from) a transport, they're serializedusing PHP's native serialize() & unserialize() functions. You can changethis globally (or for each transport) to a service that implementsSerializerInterface:

  • YAML
  1. # config/packages/messenger.yaml
  2. framework:
  3. messenger:
  4. serializer:
  5. default_serializer: messenger.transport.symfony_serializer
  6. symfony_serializer:
  7. format: json
  8. context: { }
  9.  
  10. transports:
  11. async_priority_normal:
  12. dsn: # ...
  13. serializer: messenger.transport.symfony_serializer

The messenger.transport.symfonyserializer is a built-in service that usesthe [_Serializer component]($e4d319ced595d34c.md) and can be configured in a few ways.If you do choose to use the Symfony serializer, you can control the contexton a case-by-case basis via the SerializerStamp(see Envelopes & Stamps).

Customizing Handlers

Manually Configuring Handlers

Symfony will normally find and register your handler automatically.But, you can also configure a handler manually - and pass it some extra config -by tagging the handler service with messenger.message_handler

  • YAML
  1. # config/services.yaml
  2. services:
  3. App\MessageHandler\SmsNotificationHandler:
  4. tags: [messenger.message_handler]
  5.  
  6. # or configure with options
  7. tags:
  8. -
  9. name: messenger.message_handler
  10. # only needed if can't be guessed by type-hint
  11. handles: App\Message\SmsNotification
  12.  
  13. # options returned by getHandledMessages() are supported here
  • XML
  1. <!-- config/services.xml -->
  2. <?xml version="1.0" encoding="UTF-8" ?>
  3. <container xmlns="http://symfony.com/schema/dic/services"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xsi:schemaLocation="http://symfony.com/schema/dic/services
  6. https://symfony.com/schema/dic/services/services-1.0.xsd">
  7.  
  8. <services>
  9. <service id="App\MessageHandler\SmsNotificationHandler">
  10. <tag name="messenger.message_handler"/>
  11. </service>
  12. </services>
  13. </container>
  • PHP
// config/services.php
use App\MessageHandler\SmsNotificationHandler;

$container->register(SmsNotificationHandler::class)
    ->addTag('messenger.message_handler');

Handler Subscriber & Options

A handler class can handle multiple messages or configure itself by implementingMessageSubscriberInterface:

// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class SmsNotificationHandler implements MessageSubscriberInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }

    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }

    public static function getHandledMessages(): iterable
    {
        // handle this message on __invoke
        yield SmsNotification::class;

        // also handle this message on handleOtherSmsNotification
        yield OtherSmsNotification::class => [
            'method' => 'handleOtherSmsNotification',
            //'priority' => 0,
            //'bus' => 'messenger.bus.default',
        ];
    }
}

Binding Handlers to Different Transports

Each message can have multiple handlers, and when a message is consumedall of its handlers are called. But you can also configure a handler to onlybe called when it's received from a specific transport. This allows you tohave a single message where each handler is called by a different "worker"that's consuming a different transport.

Suppose you have an UploadedImage message with two handlers:

  • ThumbnailUploadedImageHandler: you want this to be handled bya transport called image_transport
  • NotifyAboutNewUploadedImageHandler: you want this to be handledby a transport called async_priority_normalTo do this, add the from_transport option to each handler. For example:
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // do some thumbnailing
    }

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'image_transport',
        ];
    }
}

And similarly:

// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
    // ...

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'async_priority_normal',
        ];
    }
}

Then, make sure to "route" your message to both transports:

  • YAML
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]
  • XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:framework="http://symfony.com/schema/dic/symfony"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd
        http://symfony.com/schema/dic/symfony
        https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

    <framework:config>
        <framework:messenger>
            <framework:transport name="async_priority_normal" dsn="..."/>
            <framework:transport name="image_transport" dsn="..."/>

            <framework:routing message-class="App\Message\UploadedImage">
                <framework:sender service="image_transport"/>
                <framework:sender service="async_priority_normal"/>
            </framework:routing>
        </framework:messenger>
    </framework:config>
</container>
  • PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
    'messenger' => [
        'transports' => [
            'async_priority_normal' => '...',
            'image_transport' => '...',
        ],
        'routing' => [
            'App\Message\UploadedImage' => ['image_transport', 'async_priority_normal']
        ]
    ],
]);

That's it! You can now consume each transport:

# will only call ThumbnailUploadedImageHandler when handling the message
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

If a handler does not have fromtransport config, it will be executedon _every transport that the message is received from.

Extending Messenger

Envelopes & Stamps

A message can be any PHP object. Sometimes, you may need to configure somethingextra about the message - like the way it should be handled inside Amqp or addinga delay before the message should be handled. You can do that by adding a "stamp"to your message:

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // wait 5 seconds before processing
        new DelayStamp(5000)
    ]);

    // or explicitly create an Envelope
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000)
    ]));

    // ...
}

Internally, each message is wrapped in an Envelope, which holds the messageand stamps. You can create this manually or allow the message bus to do it. Thereare a variety of different stamps for different purposes and they're used internallyto track information about a message - like the message bus that's handling itor if it's being retried after failure.

Middleware

What happens when you dispatch a message to a message bus depends on itscollection of middleware (and their order). By default, the middleware configuredfor each bus looks like this:

  • add_bus_name_stamp_middleware - adds a stamp to record which bus thismessage was dispatched into;
  • dispatchafter_current_bus- see [_Transactional Messages: Handle New Messages After Handling is Done](https://symfony.com/doc/current/messenger/message-recorder.html);
  • failed_message_processing_middleware - processes messages that are beingretried via the failure transport to makethem properly function as if they were being received from their original transport;
  • Your own collection of middleware;
  • send_message - if routing is configured for the transport, this sendsmessages to that transport and stops the middleware chain;
  • handle_message - calls the message handler(s) for the given message.

Note

These middleware names are actually shortcuts names. The real service idsare prefixed with messenger.middleware..

You can add your own middleware to this list, or completely disable the defaultmiddleware and only include your own:

  • YAML
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                middleware:
                    # service ids that implement Symfony\Component\Messenger\Middleware
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

                #default_middleware: false
  • XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:framework="http://symfony.com/schema/dic/symfony"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd
        http://symfony.com/schema/dic/symfony
        https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

    <framework:config>
        <framework:messenger>
            <framework:middleware id="App\Middleware\MyMiddleware"/>
            <framework:middleware id="App\Middleware\AnotherMiddleware"/>
            <framework:bus name="messenger.bus.default" default-middleware="false"/>
        </framework:messenger>
    </framework:config>
</container>
  • PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
    'messenger' => [
        'buses' => [
            'messenger.bus.default' => [
                'middleware' => [
                    'App\Middleware\MyMiddleware',
                    'App\Middleware\AnotherMiddleware',
                ],
                'default_middleware' => false,
            ],
        ],
    ],
]);

Note

If a middleware service is abstract, a different instance of the service willbe created per bus.

Middleware for Doctrine

If you use Doctrine in your app, a number of optional middleware exist that youmay want to use:

  • YAML
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # each time a message is handled, the Doctrine connection
                    # is "pinged" and reconnected if it's closed. Useful
                    # if your workers run for a long time and the database
                    # connection is sometimes lost
                    - doctrine_ping_connection

                    # After handling, the Doctrine connection is closed,
                    # which can free up database connections in a worker,
                    # instead of keeping them open forever
                    - doctrine_close_connection

                    # wraps all handlers in a single Doctrine transaction
                    # handlers do not need to call flush() and an error
                    # in any handler will cause a rollback
                    - doctrine_transaction

                    # or pass a different entity manager to any
                    #- doctrine_transaction: ['custom']
  • XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:framework="http://symfony.com/schema/dic/symfony"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd
        http://symfony.com/schema/dic/symfony
        https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

    <framework:config>
        <framework:messenger>
            <framework:bus name="command_bus">
                <framework:middleware id="doctrine_transaction"/>
                <framework:middleware id="doctrine_ping_connection"/>
                <framework:middleware id="doctrine_close_connection"/>

                <!-- or pass a different entity manager to any -->
                <!--
                <framework:middleware id="doctrine_transaction">
                    <framework:argument>custom</framework:argument>
                </framework:middleware>
                -->
            </framework:bus>
        </framework:messenger>
    </framework:config>
</container>
  • PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
    'messenger' => [
        'buses' => [
            'command_bus' => [
                'middleware' => [
                    'doctrine_transaction',
                    'doctrine_ping_connection',
                    'doctrine_close_connection',
                    // Using another entity manager
                    ['id' => 'doctrine_transaction', 'arguments' => ['custom']],
                ],
            ],
        ],
    ],
]);

Messenger Events

In addition to middleware, Messenger also dispatches several events. You cancreate an event listener to hook into various partsof the process. For each, the event class is the event name:

Multiple Buses, Command & Event Buses

Messenger gives you a single message bus service by default. But, you can configureas many as you want, creating "command", "query" or "event" buses and controllingtheir middleware. See Multiple Buses.

Learn more