Step 18: Going Async

Going Async

Checking for spam during the handling of the form submission might lead to some problems. If the Akismet API becomes slow, our website will also be slow for users. But even worse, if we hit a timeout or if the Akismet API is unavailable, we might lose comments.

Ideally, we should store the submitted data without publishing it, and immediately return a response. Checking for spam can then be done out of band.

Flagging Comments

We need to introduce a state for comments: submitted, spam, and published.

Add the state property to the Comment class:

  1. $ symfony console make:entity Comment

Create a database migration:

  1. $ symfony console make:migration

Modify the migration to update all existing comments to be published by default:

patch_file

  1. --- a/migrations/Version00000000000000.php
  2. +++ b/migrations/Version00000000000000.php
  3. @@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
  4. public function up(Schema $schema) : void
  5. {
  6. // this up() migration is auto-generated, please modify it to your needs
  7. - $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
  8. + $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
  9. + $this->addSql("UPDATE comment SET state='published'");
  10. + $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
  11. }
  12. public function down(Schema $schema) : void

Migrate the database:

  1. $ symfony console doctrine:migrations:migrate

We should also make sure that, by default, the state is set to submitted:

patch_file

  1. --- a/src/Entity/Comment.php
  2. +++ b/src/Entity/Comment.php
  3. @@ -55,9 +55,9 @@ class Comment
  4. private $photoFilename;
  5. /**
  6. - * @ORM\Column(type="string", length=255)
  7. + * @ORM\Column(type="string", length=255, options={"default": "submitted"})
  8. */
  9. - private $state;
  10. + private $state = 'submitted';
  11. public function __toString(): string
  12. {

Update the EasyAdmin configuration to be able to see the comment’s state:

patch_file

  1. --- a/src/Controller/Admin/CommentCrudController.php
  2. +++ b/src/Controller/Admin/CommentCrudController.php
  3. @@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
  4. ->setLabel('Photo')
  5. ->onlyOnIndex()
  6. ;
  7. + yield TextField::new('state');
  8. $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
  9. 'html5' => true,

Don’t forget to also update the tests by setting the state of the fixtures:

patch_file

  1. --- a/src/DataFixtures/AppFixtures.php
  2. +++ b/src/DataFixtures/AppFixtures.php
  3. @@ -37,8 +37,16 @@ class AppFixtures extends Fixture
  4. $comment1->setAuthor('Fabien');
  5. $comment1->setEmail('[email protected]');
  6. $comment1->setText('This was a great conference.');
  7. + $comment1->setState('published');
  8. $manager->persist($comment1);
  9. + $comment2 = new Comment();
  10. + $comment2->setConference($amsterdam);
  11. + $comment2->setAuthor('Lucas');
  12. + $comment2->setEmail('[email protected]');
  13. + $comment2->setText('I think this one is going to be moderated.');
  14. + $manager->persist($comment2);
  15. +
  16. $admin = new Admin();
  17. $admin->setRoles(['ROLE_ADMIN']);
  18. $admin->setUsername('admin');

For the controller tests, simulate the validation:

patch_file

  1. --- a/tests/Controller/ConferenceControllerTest.php
  2. +++ b/tests/Controller/ConferenceControllerTest.php
  3. @@ -2,6 +2,8 @@
  4. namespace App\Tests\Controller;
  5. +use App\Repository\CommentRepository;
  6. +use Doctrine\ORM\EntityManagerInterface;
  7. use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
  8. class ConferenceControllerTest extends WebTestCase
  9. @@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
  10. $client->submitForm('Submit', [
  11. 'comment_form[author]' => 'Fabien',
  12. 'comment_form[text]' => 'Some feedback from an automated functional test',
  13. - 'comment_form[email]' => '[email protected]',
  14. + 'comment_form[email]' => $email = '[email protected]',
  15. 'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
  16. ]);
  17. $this->assertResponseRedirects();
  18. +
  19. + // simulate comment validation
  20. + $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
  21. + $comment->setState('published');
  22. + self::$container->get(EntityManagerInterface::class)->flush();
  23. +
  24. $client->followRedirect();
  25. $this->assertSelectorExists('div:contains("There are 2 comments")');
  26. }

From a PHPUnit test, you can get any service from the container via self::$container->get(); it also gives access to non-public services.

Understanding Messenger

Managing asynchronous code with Symfony is the job of the Messenger Component:

  1. $ symfony composer req messenger

When some logic should be executed asynchronously, send a message to a messenger bus. The bus stores the message in a queue and returns immediately to let the flow of operations resume as fast as possible.

A consumer runs continuously in the background to read new messages on the queue and execute the associated logic. The consumer can run on the same server as the web application or on a separate one.

It is very similar to the way HTTP requests are handled, except that we don’t have responses.

Coding a Message Handler

A message is a data object class that should not hold any logic. It will be serialized to be stored in a queue, so only store “simple” serializable data.

Create the CommentMessage class:

src/Message/CommentMessage.php

  1. namespace App\Message;
  2. class CommentMessage
  3. {
  4. private $id;
  5. private $context;
  6. public function __construct(int $id, array $context = [])
  7. {
  8. $this->id = $id;
  9. $this->context = $context;
  10. }
  11. public function getId(): int
  12. {
  13. return $this->id;
  14. }
  15. public function getContext(): array
  16. {
  17. return $this->context;
  18. }
  19. }

In the Messenger world, we don’t have controllers, but message handlers.

Create a CommentMessageHandler class under a new App\MessageHandler namespace that knows how to handle CommentMessage messages:

src/MessageHandler/CommentMessageHandler.php

  1. namespace App\MessageHandler;
  2. use App\Message\CommentMessage;
  3. use App\Repository\CommentRepository;
  4. use App\SpamChecker;
  5. use Doctrine\ORM\EntityManagerInterface;
  6. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  7. class CommentMessageHandler implements MessageHandlerInterface
  8. {
  9. private $spamChecker;
  10. private $entityManager;
  11. private $commentRepository;
  12. public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
  13. {
  14. $this->entityManager = $entityManager;
  15. $this->spamChecker = $spamChecker;
  16. $this->commentRepository = $commentRepository;
  17. }
  18. public function __invoke(CommentMessage $message)
  19. {
  20. $comment = $this->commentRepository->find($message->getId());
  21. if (!$comment) {
  22. return;
  23. }
  24. if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
  25. $comment->setState('spam');
  26. } else {
  27. $comment->setState('published');
  28. }
  29. $this->entityManager->flush();
  30. }
  31. }

MessageHandlerInterface is a marker interface. It only helps Symfony auto-register and auto-configure the class as a Messenger handler. By convention, the logic of a handler lives in a method called __invoke(). The CommentMessage type hint on this method’s one argument tells Messenger which class this will handle.

Update the controller to use the new system:

patch_file

  1. --- a/src/Controller/ConferenceController.php
  2. +++ b/src/Controller/ConferenceController.php
  3. @@ -5,14 +5,15 @@ namespace App\Controller;
  4. use App\Entity\Comment;
  5. use App\Entity\Conference;
  6. use App\Form\CommentFormType;
  7. +use App\Message\CommentMessage;
  8. use App\Repository\CommentRepository;
  9. use App\Repository\ConferenceRepository;
  10. -use App\SpamChecker;
  11. use Doctrine\ORM\EntityManagerInterface;
  12. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  13. use Symfony\Component\HttpFoundation\File\Exception\FileException;
  14. use Symfony\Component\HttpFoundation\Request;
  15. use Symfony\Component\HttpFoundation\Response;
  16. +use Symfony\Component\Messenger\MessageBusInterface;
  17. use Symfony\Component\Routing\Annotation\Route;
  18. use Twig\Environment;
  19. @@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
  20. {
  21. private $twig;
  22. private $entityManager;
  23. + private $bus;
  24. - public function __construct(Environment $twig, EntityManagerInterface $entityManager)
  25. + public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
  26. {
  27. $this->twig = $twig;
  28. $this->entityManager = $entityManager;
  29. + $this->bus = $bus;
  30. }
  31. #[Route('/', name: 'homepage')]
  32. @@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
  33. }
  34. #[Route('/conference/{slug}', name: 'conference')]
  35. - public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
  36. + public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
  37. {
  38. $comment = new Comment();
  39. $form = $this->createForm(CommentFormType::class, $comment);
  40. @@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
  41. }
  42. $this->entityManager->persist($comment);
  43. + $this->entityManager->flush();
  44. $context = [
  45. 'user_ip' => $request->getClientIp(),
  46. @@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
  47. 'referrer' => $request->headers->get('referer'),
  48. 'permalink' => $request->getUri(),
  49. ];
  50. - if (2 === $spamChecker->getSpamScore($comment, $context)) {
  51. - throw new \RuntimeException('Blatant spam, go away!');
  52. - }
  53. - $this->entityManager->flush();
  54. + $this->bus->dispatch(new CommentMessage($comment->getId(), $context));
  55. return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
  56. }

Instead of depending on the Spam Checker, we now dispatch a message on the bus. The handler then decides what to do with it.

We have achieved something unexpected. We have decoupled our controller from the Spam Checker and moved the logic to a new class, the handler. It is a perfect use case for the bus. Test the code, it works. Everything is still done synchronously, but the code is probably already “better”.

Restricting Displayed Comments

Update the display logic to avoid non-published comments from appearing on the frontend:

patch_file

  1. --- a/src/Repository/CommentRepository.php
  2. +++ b/src/Repository/CommentRepository.php
  3. @@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
  4. {
  5. $query = $this->createQueryBuilder('c')
  6. ->andWhere('c.conference = :conference')
  7. + ->andWhere('c.state = :state')
  8. ->setParameter('conference', $conference)
  9. + ->setParameter('state', 'published')
  10. ->orderBy('c.createdAt', 'DESC')
  11. ->setMaxResults(self::PAGINATOR_PER_PAGE)
  12. ->setFirstResult($offset)

Going Async for Real

By default, handlers are called synchronously. To go async, you need to explicitly configure which queue to use for each handler in the config/packages/messenger.yaml configuration file:

patch_file

  1. --- a/.env
  2. +++ b/.env
  3. @@ -29,7 +29,7 @@ DATABASE_URL="postgresql://127.0.0.1:5432/db?serverVersion=13&charset=utf8"
  4. ###> symfony/messenger ###
  5. # Choose one of the transports below
  6. -# MESSENGER_TRANSPORT_DSN=doctrine://default
  7. +MESSENGER_TRANSPORT_DSN=doctrine://default
  8. # MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
  9. # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
  10. ###< symfony/messenger ###
  11. --- a/config/packages/messenger.yaml
  12. +++ b/config/packages/messenger.yaml
  13. @@ -5,10 +5,15 @@ framework:
  14. transports:
  15. # https://symfony.com/doc/current/messenger.html#transport-configuration
  16. - # async: '%env(MESSENGER_TRANSPORT_DSN)%'
  17. + async:
  18. + dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
  19. + options:
  20. + auto_setup: false
  21. + use_notify: true
  22. + check_delayed_interval: 60000
  23. # failed: 'doctrine://default?queue_name=failed'
  24. # sync: 'sync://'
  25. routing:
  26. # Route your messages to the transports
  27. - # 'App\Message\YourMessage': async
  28. + App\Message\CommentMessage: async

The configuration tells the bus to send instances of App\Message\CommentMessage to the async queue, which is defined by a DSN (MESSENGER_TRANSPORT_DSN), which points to Doctrine as configured in .env. In plain English, we are using PostgreSQL as a queue for our messages.

Setup PostgreSQL tables and triggers:

  1. $ symfony console make:migration

And migrate the database:

  1. $ symfony console doctrine:migrations:migrate

Tip

Behind the scenes, Symfony uses the PostgreSQL builtin, performant, scalable, and transactional pub/sub system (LISTEN/NOTIFY). You can also read the RabbitMQ chapter if you want to use it instead of PostgreSQL as a message broker.

Consuming Messages

If you try to submit a new comment, the spam checker won’t be called anymore. Add an error_log() call in the getSpamScore() method to confirm. Instead, a message is waiting in the queue, ready to be consumed by some processes.

As you might imagine, Symfony comes with a consumer command. Run it now:

  1. $ symfony console messenger:consume async -vv

It should immediately consume the message dispatched for the submitted comment:

  1. [OK] Consuming messages from transports "async".
  2. // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
  3. // Quit the worker with CONTROL-C.
  4. 11:30:20 INFO [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
  5. 11:30:20 INFO [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
  6. 11:30:20 INFO [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
  7. 11:30:20 INFO [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
  8. 11:30:20 INFO [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

The message consumer activity is logged, but you get instant feedback on the console by passing the -vv flag. You should even be able to spot the call to the Akismet API.

To stop the consumer, press Ctrl+C.

Running Workers in the Background

Instead of launching the consumer every time we post a comment and stopping it immediately after, we want to run it continuously without having too many terminal windows or tabs open.

The Symfony CLI can manage such background commands or workers by using the daemon flag (-d) on the run command.

Run the message consumer again, but send it in the background:

  1. $ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

The --watch option tells Symfony that the command must be restarted whenever there is a filesystem change in the config/, src/, templates/, or vendor/ directories.

Note

Do not use -vv as you would have duplicated messages in server:log (logged messages and console messages).

If the consumer stops working for some reason (memory limit, bug, …), it will be restarted automatically. And if the consumer fails too fast, the Symfony CLI will give up.

Logs are streamed via symfony server:log with all the other logs coming from PHP, the web server, and the application:

  1. $ symfony server:log

Use the server:status command to list all background workers managed for the current project:

  1. $ symfony server:status
  2. Web server listening on https://127.0.0.1:8000
  3. Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

To stop a worker, stop the web server or kill the PID given by the server:status command:

  1. $ kill 15774

Retrying Failed Messages

What if Akismet is down while consuming a message? There is no impact for people submitting comments, but the message is lost and spam is not checked.

Messenger has a retry mechanism for when an exception occurs while handling a message. Let’s configure it:

patch_file

  1. --- a/config/packages/messenger.yaml
  2. +++ b/config/packages/messenger.yaml
  3. @@ -1,7 +1,7 @@
  4. framework:
  5. messenger:
  6. # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
  7. - # failure_transport: failed
  8. + failure_transport: failed
  9. transports:
  10. # https://symfony.com/doc/current/messenger.html#transport-configuration
  11. @@ -10,7 +10,10 @@ framework:
  12. options:
  13. use_notify: true
  14. check_delayed_interval: 60000
  15. - # failed: 'doctrine://default?queue_name=failed'
  16. + retry_strategy:
  17. + max_retries: 3
  18. + multiplier: 2
  19. + failed: 'doctrine://default?queue_name=failed'
  20. # sync: 'sync://'
  21. routing:

If a problem occurs while handling a message, the consumer will retry 3 times before giving up. But instead of discarding the message, it will store it permanently in the failed queue, which uses another database table.

Inspect failed messages and retry them via the following commands:

  1. $ symfony console messenger:failed:show
  2. $ symfony console messenger:failed:retry

Running Workers on SymfonyCloud

To consume messages from PostgreSQL, we need to run the messenger:consume command continuously. On SymfonyCloud, this is the role of a worker:

patch_file

  1. --- a/.symfony.cloud.yaml
  2. +++ b/.symfony.cloud.yaml
  3. @@ -50,3 +50,8 @@ hooks:
  4. set -x -e
  5. (>&2 symfony-deploy)
  6. +
  7. +workers:
  8. + messages:
  9. + commands:
  10. + start: symfony console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Like for the Symfony CLI, SymfonyCloud manages restarts and logs.

To get logs for a worker, use:

  1. $ symfony logs --worker=messages all

Going Further


This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.