Context Propagation in Quarkus

Traditional blocking code uses ThreadLocal variables to store contextual objects in order to avoidpassing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operateproperly: RESTEasy, ArC and Transactionfor example.

If you write reactive/async code, you have to cut your work into a pipeline of code blocks that get executed"later", and in practice after the method you defined them in have returned. As such, try/finally blocksas well as ThreadLocal variables stop working, because your reactive code gets executed in another thread,after the caller ran its finally block.

Microprofile Context Propagation was made tomake those Quarkus extensions work properly in reactive/async settings. It works by capturing those contextualvalues that used to be in thread-locals, and restoring them when your code is called.

Setting it up

If you are using SmallRye Reactive Streams Operators(the quarkus-smallrye-reactive-streams-operators module),you get automatic context propagation for all your streams, because the quarkus-smallrye-context-propagationmodule is automatically imported.

If you are not using SmallRye Reactive Streams Operators, add the following to your pom.xml:

  1. <dependencies>
  2. <!-- Context Propagation extension -->
  3. <dependency>
  4. <groupId>io.quarkus</groupId>
  5. <artifactId>quarkus-smallrye-context-propagation</artifactId>
  6. </dependency>
  7. </dependencies>

With this, you will get context propagation for ArC, RESTEasy and transactions, if you are using them.

Usage example with SmallRye Reactive Streams Operators

If you are using SmallRye Reactive Streams Operators (the quarkus-smallrye-reactive-streams-operators module),you get context propagation out of the box, so, for example, let’s write a REST endpoint that reads the next3 items from a Kafka topic, stores them in a database usingHibernate ORM with Panache (all in the same transaction) before returningthem to the client, you can do it like this:

  1. // Get the price-stream Kafka topic
  2. @Inject
  3. @Channel("price-stream") Publisher<Double> prices;
  4. @Transactional
  5. @GET
  6. @Path("/prices")
  7. public Publisher<Double> prices() throws SystemException {
  8. // get the next three prices from the price stream
  9. return ReactiveStreams.fromPublisher(prices)
  10. .limit(3)
  11. .map(price -> {
  12. // store each price before we send them
  13. Price priceEntity = new Price();
  14. priceEntity.value = price;
  15. // here we are all in the same transaction
  16. // thanks to context propagation
  17. priceEntity.persist();
  18. return price;
  19. })
  20. .buildRs();
  21. }

Notice that thanks to automatic support for context propagation with Reactive Streams Operators this works out of the box.

Usage example for CompletionStage

If you are using CompletionStageyou need manual context propagation. You can do that by injecting a ThreadContextor ManagedExecutor that will propagate every context. For example, here we use the Vert.x Web Clientto get the list of Star Wars people, then store them in the database usingHibernate ORM with Panache (all in the same transaction) before returningthem to the client:

  1. @Inject ThreadContext threadContext;
  2. @Inject Vertx vertx;
  3. @Transactional
  4. @GET
  5. @Path("/people")
  6. public CompletionStage<Person> people() throws SystemException {
  7. // Create a REST client to the Star Wars API
  8. WebClient client = WebClient.create(vertx,
  9. new WebClientOptions()
  10. .setDefaultHost("swapi.co")
  11. .setDefaultPort(443)
  12. .setSsl(true));
  13. // get the list of Star Wars people, with context capture
  14. return threadContext.withContextCapture(client.get("/api/people/").send())
  15. .thenApply(response -> {
  16. JsonObject json = response.bodyAsJsonObject();
  17. List<Person> persons = new ArrayList<>(json.getInteger("count"));
  18. // Store them in the DB
  19. // Note that we're still in the same transaction as the outer method
  20. for (Object element : json.getJsonArray("results")) {
  21. Person person = new Person();
  22. person.name = ((JsonObject)element).getString("name");
  23. person.persist();
  24. persons.add(person);
  25. }
  26. return persons;
  27. });
  28. }

Using ThreadContext or ManagedExecutor you can wrap most useful functional types and CompletionStagein order to get context propagated.

The injected ManagedExecutor uses the Quarkus thread pool.

Adding support for RxJava2

If you use Reactive Streams Operators (the quarkus-smallrye-reactive-streams-operators module),you get support for RxJava2 context propagation automatically, but if you don’t, youmay want to include the following modules to get RxJava2 support:

  1. <dependencies>
  2. <!-- Automatic context propagation for RxJava2 -->
  3. <dependency>
  4. <groupId>io.smallrye</groupId>
  5. <artifactId>smallrye-context-propagation-propagators-rxjava2</artifactId>
  6. </dependency>
  7. <!--
  8. Required if you want transactions extended to the end of methods returning
  9. an RxJava2 type.
  10. -->
  11. <dependency>
  12. <groupId>io.smallrye.reactive</groupId>
  13. <artifactId>smallrye-reactive-converter-rxjava2</artifactId>
  14. </dependency>
  15. <!-- Required if you return RxJava2 types from your REST endpoints -->
  16. <dependency>
  17. <groupId>org.jboss.resteasy</groupId>
  18. <artifactId>resteasy-rxjava2</artifactId>
  19. </dependency>
  20. </dependencies>