6.13 Reactive HTTP Request Processing

As mentioned previously, Micronaut is built on Netty which is designed around an Event loop model and non-blocking I/O. Micronaut will execute code defined in @Controller beans in the same thread as the request thread (an Event Loop thread).

This makes it critical that if you do any blocking I/O operations (for example interactions with Hibernate/JPA or JDBC) that you offload those tasks to a separate thread pool that does not block the Event loop.

For example the following configuration will configure the I/O thread pool as a fixed thread pool with 75 threads (similar to what a traditional blocking server such as Tomcat uses in the thread per request model):

Configuring the IO thread pool

  1. micronaut:
  2. executors:
  3. io:
  4. type: fixed
  5. nThreads: 75

To use this thread pool in a @Controller bean you have a number of options. The simplest option is to use the @ExecuteOn annotation which can be declared at the type or method level to indicate which configured thread pool you wish to run the method or methods of the controller on:

Using @ExecuteOn

  1. import io.micronaut.http.annotation.*;
  2. import io.micronaut.scheduling.TaskExecutors;
  3. import io.micronaut.scheduling.annotation.ExecuteOn;
  4. @Controller("/executeOn/people")
  5. public class PersonController {
  6. private final PersonService personService;
  7. PersonController(
  8. PersonService personService) {
  9. this.personService = personService;
  10. }
  11. @Get("/{name}")
  12. @ExecuteOn(TaskExecutors.IO) (1)
  13. Person byName(String name) {
  14. return personService.findByName(name);
  15. }
  16. }

Using @ExecuteOn

  1. import io.micronaut.http.annotation.*
  2. import io.micronaut.scheduling.TaskExecutors
  3. import io.micronaut.scheduling.annotation.ExecuteOn
  4. @Controller("/executeOn/people")
  5. class PersonController {
  6. private final PersonService personService
  7. PersonController(
  8. PersonService personService) {
  9. this.personService = personService
  10. }
  11. @Get("/{name}")
  12. @ExecuteOn(TaskExecutors.IO) (1)
  13. Person byName(String name) {
  14. return personService.findByName(name)
  15. }
  16. }

Using @ExecuteOn

  1. import io.micronaut.http.annotation.*
  2. import io.micronaut.scheduling.TaskExecutors
  3. import io.micronaut.scheduling.annotation.ExecuteOn
  4. @Controller("/executeOn/people")
  5. class PersonController (private val personService: PersonService) {
  6. @Get("/{name}")
  7. @ExecuteOn(TaskExecutors.IO) (1)
  8. fun byName(name: String): Person {
  9. return personService.findByName(name)
  10. }
  11. }
1The @ExecuteOn annotation is used to execute the operation on the I/O thread pool

The value of the @ExecuteOn annotation can be any named executor defined under micronaut.executors.

Generally speaking for database operations you will want a thread pool configured that matches maximum number of connections you have specified in the database connection pool.

An alternative to the @ExecuteOn annotation is to use the facility provided by the reactive library you have chosen. RxJava for example features a subscribeOn method which allows you to alter which thread executes user code. For example:

RxJava subscribeOn Example

  1. import io.micronaut.http.annotation.*;
  2. import io.micronaut.scheduling.TaskExecutors;
  3. import io.reactivex.*;
  4. import io.reactivex.schedulers.Schedulers;
  5. import javax.inject.Named;
  6. import java.util.concurrent.ExecutorService;
  7. @Controller("/subscribeOn/people")
  8. public class PersonController {
  9. private final Scheduler scheduler;
  10. private final PersonService personService;
  11. PersonController(
  12. @Named(TaskExecutors.IO) ExecutorService executorService, (1)
  13. PersonService personService) {
  14. this.scheduler = Schedulers.from(executorService);
  15. this.personService = personService;
  16. }
  17. @Get("/{name}")
  18. Single<Person> byName(String name) {
  19. return Single.fromCallable(() ->
  20. personService.findByName(name) (2)
  21. ).subscribeOn(scheduler); (3)
  22. }
  23. }

RxJava subscribeOn Example

  1. import io.micronaut.http.annotation.*
  2. import io.micronaut.scheduling.TaskExecutors
  3. import io.reactivex.*
  4. import io.reactivex.schedulers.Schedulers
  5. import javax.inject.Named
  6. import java.util.concurrent.ExecutorService
  7. @Controller("/subscribeOn/people")
  8. class PersonController {
  9. private final Scheduler scheduler
  10. private final PersonService personService
  11. PersonController(
  12. @Named(TaskExecutors.IO) ExecutorService executorService, (1)
  13. PersonService personService) {
  14. this.scheduler = Schedulers.from(executorService)
  15. this.personService = personService
  16. }
  17. @Get("/{name}")
  18. Single<Person> byName(String name) {
  19. return Single.fromCallable({ -> (2)
  20. personService.findByName(name)
  21. }).subscribeOn(scheduler) (3)
  22. }
  23. }

RxJava subscribeOn Example

  1. import io.micronaut.http.annotation.*
  2. import io.micronaut.scheduling.TaskExecutors
  3. import io.reactivex.*
  4. import io.reactivex.schedulers.Schedulers
  5. import java.util.concurrent.ExecutorService
  6. import javax.inject.Named
  7. @Controller("/subscribeOn/people")
  8. class PersonController internal constructor(
  9. @Named(TaskExecutors.IO) executorService: ExecutorService, (1)
  10. val personService: PersonService) {
  11. private val scheduler: Scheduler
  12. init {
  13. scheduler = Schedulers.from(executorService)
  14. }
  15. @Get("/{name}")
  16. fun byName(name: String): Single<Person> {
  17. return Single.fromCallable { personService.findByName(name) } (2)
  18. .subscribeOn(scheduler) (3)
  19. }
  20. }
1The configured I/O executor service is injected
2RxJava’s fromCallable method is used to wrap the blocking operation
3RxJava’s subscribeOn method is used to schedule the operation on the I/O thread pool