6.25 Server Sent Events

The Micronaut HTTP server supports emitting Server Sent Events (SSE) using the Event API.

To emit events from the server, return a Reactive Streams Publisher that emits objects of type Event.

The Publisher itself could publish events from a background task, via an event system, etc.

Imagine for an example a event stream of news headlines; you may define a data class as follows:

Headline

  1. public class Headline {
  2. private String title;
  3. private String description;
  4. public Headline() {}
  5. public Headline(String title, String description) {
  6. this.title = title;
  7. this.description = description;
  8. }
  9. public String getTitle() {
  10. return title;
  11. }
  12. public String getDescription() {
  13. return description;
  14. }
  15. public void setTitle(String title) {
  16. this.title = title;
  17. }
  18. public void setDescription(String description) {
  19. this.description = description;
  20. }
  21. }

Headline

  1. class Headline {
  2. String title
  3. String description
  4. Headline() {}
  5. Headline(String title, String description) {
  6. this.title = title;
  7. this.description = description;
  8. }
  9. }

Headline

  1. class Headline {
  2. var title: String? = null
  3. var description: String? = null
  4. constructor()
  5. constructor(title: String, description: String) {
  6. this.title = title
  7. this.description = description
  8. }
  9. }

To emit news headline events, write a controller that returns a Publisher of Event instances using whichever Reactive library you prefer. The example below uses Project Reactor‘s reactor:Flux[] via the generate method:

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.MediaType;
  2. import io.micronaut.http.annotation.Controller;
  3. import io.micronaut.http.annotation.Get;
  4. import io.micronaut.http.sse.Event;
  5. import io.micronaut.scheduling.TaskExecutors;
  6. import io.micronaut.scheduling.annotation.ExecuteOn;
  7. import org.reactivestreams.Publisher;
  8. import reactor.core.publisher.Flux;
  9. @Controller("/headlines")
  10. public class HeadlineController {
  11. @ExecuteOn(TaskExecutors.IO)
  12. @Get(produces = MediaType.TEXT_EVENT_STREAM)
  13. public Publisher<Event<Headline>> index() { (1)
  14. String[] versions = {"1.0", "2.0"}; (2)
  15. return Flux.generate(() -> 0, (i, emitter) -> { (3)
  16. if (i < versions.length) {
  17. emitter.next( (4)
  18. Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
  19. );
  20. } else {
  21. emitter.complete(); (5)
  22. }
  23. return ++i;
  24. });
  25. }
  26. }

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.micronaut.http.sse.Event
  5. import io.micronaut.scheduling.TaskExecutors
  6. import io.micronaut.scheduling.annotation.ExecuteOn
  7. import org.reactivestreams.Publisher
  8. import reactor.core.publisher.Flux
  9. @Controller("/headlines")
  10. class HeadlineController {
  11. @ExecuteOn(TaskExecutors.IO)
  12. @Get(produces = MediaType.TEXT_EVENT_STREAM)
  13. Publisher<Event<Headline>> index() { (1)
  14. String[] versions = ["1.0", "2.0"] (2)
  15. Flux.generate(() -> 0, (i, emitter) -> {
  16. if (i < versions.length) {
  17. emitter.next( (4)
  18. Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
  19. )
  20. } else {
  21. emitter.complete() (5)
  22. }
  23. return i + 1
  24. })
  25. }
  26. }

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.micronaut.http.sse.Event
  5. import io.micronaut.scheduling.TaskExecutors
  6. import io.micronaut.scheduling.annotation.ExecuteOn
  7. import org.reactivestreams.Publisher
  8. import reactor.core.publisher.Flux
  9. import reactor.core.publisher.SynchronousSink
  10. import java.util.concurrent.Callable
  11. import java.util.function.BiFunction
  12. @Controller("/headlines")
  13. class HeadlineController {
  14. @ExecuteOn(TaskExecutors.IO)
  15. @Get(produces = [MediaType.TEXT_EVENT_STREAM])
  16. fun index(): Publisher<Event<Headline>> { (1)
  17. val versions = arrayOf("1.0", "2.0") (2)
  18. return Flux.generate(
  19. { 0 },
  20. BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> -> (3)
  21. if (i < versions.size) {
  22. emitter.next( (4)
  23. Event.of(
  24. Headline(
  25. "Micronaut " + versions[i] + " Released", "Come and get it"
  26. )
  27. )
  28. )
  29. } else {
  30. emitter.complete() (5)
  31. }
  32. return@BiFunction i + 1
  33. })
  34. }
  35. }
1The controller method returns a Publisher of Event
2A headline is emitted for each version of Micronaut
3The reactor:Flux[] type’s generate method generates a Publisher. The generate method accepts an initial value and a lambda that accepts the value and a Emitter. Note that this example executes on the same thread as the controller action, but you could use subscribeOn or map an existing “hot” Flux.
4The Emitter interface onNext method emits objects of type Event. The Event.of(ET) factory method constructs the event.
5The Emitter interface onComplete method indicates when to finish sending server sent events.
You typically want to schedule SSE event streams on a separate executor. The previous example uses @ExecuteOn to execute the stream on the I/O executor.

The above example sends back a response of type text/event-stream and for each Event emitted the Headline type previously will be converted to JSON resulting in responses such as:

Server Sent Event Response Output

  1. data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
  2. data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}

You can use the methods of the Event interface to customize the Server Sent Event data sent back, including associating event ids, comments, retry timeouts, etc.