Quarkus - Asynchronous messages between beans

Quarkus allows different beans to interact using asynchronous messages, enforcing loose-coupling.The messages are sent to virtual addresses.It offers 3 types of delivery mechanism:

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round robin is applied;

  • publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;

  • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion

All these delivery mechanism are non-blocking, and are providing one of the fundamental brick to build reactive applications.

The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging.However, it is limited to single-event behavior (no stream) and to local messages.

Installing

This mechanism uses the Vert.x EventBus, so you need to enable the vertx extension to use this feature.If you are creating a new project, set the extensions parameter are follows:

  1. mvn io.quarkus:quarkus-maven-plugin:1.0.0.CR1:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=vertx-quickstart \
  4. -Dextensions="vertx"
  5. cd vertx-quickstart

If you have an already created project, the vertx extension can be added to an existing Quarkus project withthe add-extension command:

  1. ./mvnw quarkus:add-extension -Dextensions="vertx"

Otherwise, you can manually add this to the dependencies section of your pom.xml file:

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-vertx</artifactId>
  4. </dependency>

Consuming events

To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

  1. package org.acme.vertx;
  2. import io.quarkus.vertx.ConsumeEvent;
  3. import javax.enterprise.context.ApplicationScoped;
  4. @ApplicationScoped
  5. public class GreetingService {
  6. @ConsumeEvent (1)
  7. public String consume(String name) { (2)
  8. return name.toUpperCase();
  9. }
  10. }
1If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService.
2The method parameter is the message body. If the method returns something it’s the message response.
By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop.If your processing is blocking, use the blocking arttribute:
  1. @ConsumeEvent(value = "blocking-consumer", blocking = true)void consumeBlocking(String message) { // Something blocking}

Configuring the address

The @ConsumeEvent annotation can be configured to set the address:

  1. @ConsumeEvent("greeting") (1)
  2. public String consume(String name) {
  3. return name.toUpperCase();
  4. }
1Receive the messages sent to the greeting address

Replying

The return value of a method annotated with @ConsumeEvent is used as response to the incoming message.For instance, in the following snippet, the returned String is the response.

  1. @ConsumeEvent("greeting")
  2. public String consume(String name) {
  3. return name.toUpperCase();
  4. }

You can also return a CompletionStage<T> to handle asynchronous reply:

  1. @ConsumeEvent("greeting")
  2. public CompletionStage<String> consume2(String name) {
  3. return CompletableFuture.supplyAsync(name::toUpperCase, executor);
  4. }

Implementing fire and forget interactions

You don’t have to reply to received messages.Typically for a fire and forget interaction, the messages are consumed and the sender does not need to know about it.To implement this, your consumer method just returns void

  1. @ConsumeEvent("greeting")
  2. public void consume(String event) {
  3. // Do something with the event
  4. }

Dealing with messages

As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message directly:

  1. @ConsumeEvent("greeting")
  2. public void consume(Message<String> msg) {
  3. System.out.println(msg.address());
  4. System.out.println(msg.body());
  5. }

Sending messages

Ok, we have seen how to receive messages, let’s now switch to the other side: the sender.Sending and publishing messages use the Vert.x event bus:

  1. package org.acme;
  2. import io.vertx.axle.core.eventbus.EventBus;
  3. import io.vertx.axle.core.eventbus.Message;
  4. import javax.inject.Inject;
  5. import javax.ws.rs.GET;
  6. import javax.ws.rs.Path;
  7. import javax.ws.rs.PathParam;
  8. import java.util.concurrent.CompletionStage;
  9. @Path("/async")
  10. public class EventResource {
  11. @Inject
  12. EventBus bus; (1)
  13. @GET
  14. @Path("/{name}")
  15. public CompletionStage<String> hello(String name) {
  16. return bus.<String>send("greeting", name) (2)
  17. .thenApply(Message::body);
  18. }
  19. }
1Inject the Event bus
2Send a message to the address greeting. Message payload is name

The EventBus object provides methods to:

  • send a message to a specific address - one single consumer receives the message.

  • publish a message to a specific address - all consumers receive the messages.

  • send a message and expect reply

  1. // Case 1
  2. bus.send("address", "hello");
  3. // Case 2
  4. bus.publish("address", "hello");
  5. // Case 3
  6. bus.send("address", "hello, how are you?").thenAccept(message -> {
  7. // reponse
  8. });

Putting things together - bridging HTTP and messages

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean.It uses the request/reply dispatching mechanism.Instead of implementing the business logic inside the JAX-RS endpoint, we are sending a message.This message is consumed by another bean and the response is sent using the reply mechanism.

First create a new project using:

  1. mvn io.quarkus:quarkus-maven-plugin:1.0.0.CR1:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=vertx-http-quickstart \
  4. -Dextensions="vertx"
  5. cd vertx-http-quickstart

You can already start the application in dev mode using ./mvnw compile quarkus:dev.

Then, creates a new JAX-RS resource with the following content:

src/main/java/org/acme/vertx/EventResource.java

  1. package org.acme.vertx;
  2. import io.vertx.axle.core.eventbus.EventBus;
  3. import io.vertx.axle.core.eventbus.Message;
  4. import javax.inject.Inject;
  5. import javax.ws.rs.GET;
  6. import javax.ws.rs.Path;
  7. import javax.ws.rs.PathParam;
  8. import java.util.concurrent.CompletionStage;
  9. @Path("/hello")
  10. public class EventResource {
  11. @Inject EventBus bus;
  12. @GET
  13. @Path("/async/{name}")
  14. public CompletionStage<String> hello(@PathParam("name") String name) {
  15. return bus.<String>send("greeting", name) (1)
  16. .thenApply(Message::body); (2)
  17. }
  18. }
1send the name to the greeting address
2when we get the reply, extract the body and send this as response to the user

If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening.So, we need a consumer listening on the greeting address. Create a GreetingService bean with the following content:

src/main/java/org/acme/vertx/GreetingService.java

  1. package org.acme.vertx;
  2. import io.quarkus.vertx.ConsumeEvent;
  3. import javax.enterprise.context.ApplicationScoped;
  4. @ApplicationScoped
  5. public class GreetingService {
  6. @ConsumeEvent("greeting")
  7. public String greeting(String name) {
  8. return "Hello " + name;
  9. }
  10. }

This bean receives the name, and returns the greeting message.

Now, open your browser to http://localhost:8080/async/Quarkus, and you should see:

  1. Hello Quarkus

To better understand, let’s detail how the HTTP request/response has been handled:

  • The request is received by the hello method

  • a message containing the name is sent to the event bus

  • Another bean receives this message and computes the response

  • This response is sent back using the reply mechanism

  • Once the reply is received by the sender, the content is written to the HTTP response

This application can be packaged using:

  1. ./mvnw clean package

You can also compile it as a native executable with:

  1. ./mvnw clean package -Pnative