Writing a Filter

Consider a hypothetical use case whereby you wish to trace each request to the Micronaut “Hello World” example using some external system. The external system could be a database, a distributed tracing service and may require I/O operations.

What you don’t want to do is block the underlying Netty event loop within your filter, instead you want the filter to proceed with execution once any I/O is complete.

As an example, consider the following example TraceService that uses RxJava to compose an I/O operation:

A TraceService Example using RxJava

  1. import io.micronaut.http.HttpRequest;
  2. import io.reactivex.Flowable;
  3. import io.reactivex.schedulers.Schedulers;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import javax.inject.Singleton;
  7. @Singleton
  8. public class TraceService {
  9. private static final Logger LOG = LoggerFactory.getLogger(TraceService.class);
  10. Flowable<Boolean> trace(HttpRequest<?> request) {
  11. return Flowable.fromCallable(() -> { (1)
  12. if (LOG.isDebugEnabled()) {
  13. LOG.debug("Tracing request: " + request.getUri());
  14. }
  15. // trace logic here, potentially performing I/O (2)
  16. return true;
  17. }).subscribeOn(Schedulers.io()); (3)
  18. }
  19. }

A TraceService Example using RxJava

  1. import io.micronaut.http.HttpRequest
  2. import io.reactivex.Flowable
  3. import io.reactivex.schedulers.Schedulers
  4. import org.slf4j.Logger
  5. import org.slf4j.LoggerFactory
  6. import javax.inject.Singleton
  7. @Singleton
  8. class TraceService {
  9. private static final Logger LOG = LoggerFactory.getLogger(TraceService.class)
  10. Flowable<Boolean> trace(HttpRequest<?> request) {
  11. Flowable.fromCallable({ -> (1)
  12. if (LOG.isDebugEnabled()) {
  13. LOG.debug("Tracing request: " + request.getUri())
  14. }
  15. // trace logic here, potentially performing I/O (2)
  16. return true
  17. }).subscribeOn(Schedulers.io()) (3)
  18. }
  19. }

A TraceService Example using RxJava

  1. import io.micronaut.http.HttpRequest
  2. import io.reactivex.Flowable
  3. import io.reactivex.schedulers.Schedulers
  4. import org.slf4j.LoggerFactory
  5. import javax.inject.Singleton
  6. @Singleton
  7. class TraceService {
  8. private val LOG = LoggerFactory.getLogger(TraceService::class.java)
  9. internal fun trace(request: HttpRequest<*>): Flowable<Boolean> {
  10. return Flowable.fromCallable {
  11. (1)
  12. if (LOG.isDebugEnabled) {
  13. LOG.debug("Tracing request: " + request.uri)
  14. }
  15. // trace logic here, potentially performing I/O (2)
  16. true
  17. }.subscribeOn(Schedulers.io()) (3)
  18. }
  19. }
1The Flowable type is used to create logic that executes potentially blocking operations to write the trace data from the request
2Since this is just an example the logic does nothing and a place holder comment is used
3The RxJava I/O scheduler is used to execute the logic

You can then inject this implementation into your filter definition:

An Example HttpServerFilter

  1. import io.micronaut.http.*;
  2. import io.micronaut.http.annotation.Filter;
  3. import io.micronaut.http.filter.*;
  4. import org.reactivestreams.Publisher;
  5. @Filter("/hello/**") (1)
  6. public class TraceFilter implements HttpServerFilter { (2)
  7. private final TraceService traceService;
  8. public TraceFilter(TraceService traceService) { (3)
  9. this.traceService = traceService;
  10. }
  11. }

An Example HttpServerFilter

  1. import io.micronaut.http.HttpRequest
  2. import io.micronaut.http.MutableHttpResponse
  3. import io.micronaut.http.annotation.Filter
  4. import io.micronaut.http.filter.HttpServerFilter
  5. import io.micronaut.http.filter.ServerFilterChain
  6. import org.reactivestreams.Publisher
  7. @Filter("/hello/**") (1)
  8. class TraceFilter implements HttpServerFilter { (2)
  9. private final TraceService traceService
  10. TraceFilter(TraceService traceService) { (3)
  11. this.traceService = traceService
  12. }
  13. }

An Example HttpServerFilter

  1. import io.micronaut.http.HttpRequest
  2. import io.micronaut.http.MutableHttpResponse
  3. import io.micronaut.http.annotation.Filter
  4. import io.micronaut.http.filter.HttpServerFilter
  5. import io.micronaut.http.filter.ServerFilterChain
  6. import org.reactivestreams.Publisher
  7. @Filter("/hello/**") (1)
  8. class TraceFilter((2)
  9. private val traceService: TraceService)(3)
  10. : HttpServerFilter {
  11. }
1The Filter annotation is used to define the URI patterns the filter matches
2The class implements the HttpServerFilter interface
3The previously defined TraceService is injected via a constructor argument

The final step is write the doFilter implementation of the HttpServerFilter interface.

The doFilter implementation

  1. @Override
  2. public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
  3. return traceService.trace(request) (1)
  4. .switchMap(aBoolean -> chain.proceed(request)) (2)
  5. .doOnNext(res -> (3)
  6. res.getHeaders().add("X-Trace-Enabled", "true")
  7. );
  8. }

The doFilter implementation

  1. @Override
  2. Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
  3. traceService.trace(request) (1)
  4. .switchMap({ aBoolean -> chain.proceed(request) }) (2)
  5. .doOnNext({ res -> (3)
  6. res.getHeaders().add("X-Trace-Enabled", "true")
  7. })
  8. }

The doFilter implementation

  1. override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Publisher<MutableHttpResponse<*>> {
  2. return traceService.trace(request) (1)
  3. .switchMap { aBoolean -> chain.proceed(request) } (2)
  4. .doOnNext { res ->
  5. (3)
  6. res.headers.add("X-Trace-Enabled", "true")
  7. }
  8. }
1The previously defined TraceService is called to trace the request
2If the trace call succeeds then the filter switches back to resuming the request processing using RxJava’s switchMap method, which invokes the proceed method of the ServerFilterChain
3Finally, RxJava’s doOnNext method is used to add a header called X-Trace-Enabled to the response.

The previous example demonstrates some key concepts such as executing logic in a non-blocking matter before proceeding with the request and modifying the outgoing response.

The examples use RxJava, however you can use any reactive framework that supports the Reactive streams specifications