6.13 Reactive HTTP Request Processing

As mentioned previously, Micronaut is built on Netty which is designed around an Event loop model and non-blocking I/O. Micronaut executes code defined in @Controller beans in the same thread as the request thread (an Event Loop thread).

This makes it critical that if you do any blocking I/O operations (for example interactions with Hibernate/JPA or JDBC) that you offload those tasks to a separate thread pool that does not block the Event loop.

For example the following configuration configures the I/O thread pool as a fixed thread pool with 75 threads (similar to what a traditional blocking server such as Tomcat uses in the thread-per-request model):

Configuring the IO thread pool

  1. micronaut:
  2. executors:
  3. io:
  4. type: fixed
  5. nThreads: 75

To use this thread pool in a @Controller bean you have a number of options. The simplest is to use the @ExecuteOn annotation, which can be declared at the type or method level to indicate which configured thread pool to run the method(s) of the controller on:

Using @ExecuteOn

  1. import io.micronaut.docs.http.server.reactive.PersonService;
  2. import io.micronaut.docs.ioc.beans.Person;
  3. import io.micronaut.http.annotation.Controller;
  4. import io.micronaut.http.annotation.Get;
  5. import io.micronaut.scheduling.TaskExecutors;
  6. import io.micronaut.scheduling.annotation.ExecuteOn;
  7. @Controller("/executeOn/people")
  8. public class PersonController {
  9. private final PersonService personService;
  10. PersonController(PersonService personService) {
  11. this.personService = personService;
  12. }
  13. @Get("/{name}")
  14. @ExecuteOn(TaskExecutors.IO) (1)
  15. Person byName(String name) {
  16. return personService.findByName(name);
  17. }
  18. }

Using @ExecuteOn

  1. import io.micronaut.docs.http.server.reactive.PersonService
  2. import io.micronaut.docs.ioc.beans.Person
  3. import io.micronaut.http.annotation.Controller
  4. import io.micronaut.http.annotation.Get
  5. import io.micronaut.scheduling.TaskExecutors
  6. import io.micronaut.scheduling.annotation.ExecuteOn
  7. @Controller("/executeOn/people")
  8. class PersonController {
  9. private final PersonService personService
  10. PersonController(PersonService personService) {
  11. this.personService = personService
  12. }
  13. @Get("/{name}")
  14. @ExecuteOn(TaskExecutors.IO) (1)
  15. Person byName(String name) {
  16. personService.findByName(name)
  17. }
  18. }

Using @ExecuteOn

  1. import io.micronaut.docs.http.server.reactive.PersonService
  2. import io.micronaut.docs.ioc.beans.Person
  3. import io.micronaut.http.annotation.Controller
  4. import io.micronaut.http.annotation.Get
  5. import io.micronaut.scheduling.TaskExecutors
  6. import io.micronaut.scheduling.annotation.ExecuteOn
  7. @Controller("/executeOn/people")
  8. class PersonController (private val personService: PersonService) {
  9. @Get("/{name}")
  10. @ExecuteOn(TaskExecutors.IO) (1)
  11. fun byName(name: String): Person {
  12. return personService.findByName(name)
  13. }
  14. }
1The @ExecuteOn annotation is used to execute the operation on the I/O thread pool

The value of the @ExecuteOn annotation can be any named executor defined under micronaut.executors.

Generally speaking for database operations you want a thread pool configured that matches the maximum number of connections specified in the database connection pool.

An alternative to the @ExecuteOn annotation is to use the facility provided by the reactive library you have chosen. Reactive implementations such as Project Reactor or RxJava feature a subscribeOn method which lets you alter which thread executes user code. For example:

Reactive subscribeOn Example

  1. import io.micronaut.docs.ioc.beans.Person;
  2. import io.micronaut.http.annotation.Controller;
  3. import io.micronaut.http.annotation.Get;
  4. import io.micronaut.scheduling.TaskExecutors;
  5. import jakarta.inject.Named;
  6. import org.reactivestreams.Publisher;
  7. import reactor.core.publisher.Mono;
  8. import reactor.core.scheduler.Scheduler;
  9. import reactor.core.scheduler.Schedulers;
  10. import io.micronaut.core.async.annotation.SingleResult;
  11. import java.util.concurrent.ExecutorService;
  12. @Controller("/subscribeOn/people")
  13. public class PersonController {
  14. private final Scheduler scheduler;
  15. private final PersonService personService;
  16. PersonController(
  17. @Named(TaskExecutors.IO) ExecutorService executorService, (1)
  18. PersonService personService) {
  19. this.scheduler = Schedulers.fromExecutorService(executorService);
  20. this.personService = personService;
  21. }
  22. @Get("/{name}")
  23. @SingleResult
  24. Publisher<Person> byName(String name) {
  25. return Mono
  26. .fromCallable(() -> personService.findByName(name)) (2)
  27. .subscribeOn(scheduler); (3)
  28. }
  29. }

Reactive subscribeOn Example

  1. import io.micronaut.docs.ioc.beans.Person
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.micronaut.scheduling.TaskExecutors
  5. import jakarta.inject.Named
  6. import reactor.core.publisher.Mono
  7. import reactor.core.scheduler.Scheduler
  8. import reactor.core.scheduler.Schedulers
  9. import java.util.concurrent.ExecutorService
  10. @Controller("/subscribeOn/people")
  11. class PersonController {
  12. private final Scheduler scheduler
  13. private final PersonService personService
  14. PersonController(
  15. @Named(TaskExecutors.IO) ExecutorService executorService, (1)
  16. PersonService personService) {
  17. this.scheduler = Schedulers.fromExecutorService(executorService)
  18. this.personService = personService
  19. }
  20. @Get("/{name}")
  21. Mono<Person> byName(String name) {
  22. return Mono
  23. .fromCallable({ -> personService.findByName(name) }) (2)
  24. .subscribeOn(scheduler) (3)
  25. }
  26. }

Reactive subscribeOn Example

  1. import io.micronaut.docs.ioc.beans.Person
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.micronaut.scheduling.TaskExecutors
  5. import java.util.concurrent.ExecutorService
  6. import jakarta.inject.Named
  7. import reactor.core.publisher.Mono
  8. import reactor.core.scheduler.Scheduler
  9. import reactor.core.scheduler.Schedulers
  10. @Controller("/subscribeOn/people")
  11. class PersonController internal constructor(
  12. @Named(TaskExecutors.IO) executorService: ExecutorService, (1)
  13. private val personService: PersonService) {
  14. private val scheduler: Scheduler = Schedulers.fromExecutorService(executorService)
  15. @Get("/{name}")
  16. fun byName(name: String): Mono<Person> {
  17. return Mono
  18. .fromCallable { personService.findByName(name) } (2)
  19. .subscribeOn(scheduler) (3)
  20. }
  21. }
1The configured I/O executor service is injected
2The Mono::fromCallable method wraps the blocking operation
3The Project Reactor subscribeOn method schedules the operation on the I/O thread pool