6.25 Server Sent Events

The Micronaut HTTP server supports emitting Server Sent Events (SSE) using the Event API.

To emit events from the server you simply return a Reactive Streams Publisher that emits objects of type Event.

The Publisher itself could publish events from a background task, via an event system or whatever.

Imagine for an example a event stream of news headlines, you may define a data class as follows:

Headline

  1. public class Headline {
  2. private String title;
  3. private String description;
  4. public Headline() { }
  5. public Headline(String title, String description) {
  6. this.title = title;
  7. this.description = description;
  8. }
  9. public String getTitle() {
  10. return title;
  11. }
  12. public String getDescription() {
  13. return description;
  14. }
  15. public void setTitle(String title) {
  16. this.title = title;
  17. }
  18. public void setDescription(String description) {
  19. this.description = description;
  20. }
  21. }

Headline

  1. class Headline {
  2. String title;
  3. String description;
  4. Headline() {}
  5. Headline(String title, String description) {
  6. this.title = title;
  7. this.description = description;
  8. }
  9. }

Headline

  1. class Headline {
  2. var title: String? = null
  3. var description: String? = null
  4. constructor() {}
  5. constructor(title: String, description: String) {
  6. this.title = title
  7. this.description = description
  8. }
  9. }

To emit news headline events you can write a controller that returns a Publisher of Event instances using which ever Reactive library you prefer. The example below uses RxJava 2’s Flowable via the generate method:

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.MediaType;
  2. import io.micronaut.http.annotation.*;
  3. import io.micronaut.http.sse.Event;
  4. import io.micronaut.scheduling.TaskExecutors;
  5. import io.micronaut.scheduling.annotation.ExecuteOn;
  6. import io.reactivex.Flowable;
  7. import org.reactivestreams.Publisher;
  8. @Controller("/headlines")
  9. public class HeadlineController {
  10. @ExecuteOn(TaskExecutors.IO)
  11. @Get(produces = MediaType.TEXT_EVENT_STREAM)
  12. public Publisher<Event<Headline>> index() { (1)
  13. String[] versions = new String[]{"1.0", "2.0"}; (2)
  14. return Flowable.generate(() -> 0, (i, emitter) -> { (3)
  15. if (i < versions.length) {
  16. emitter.onNext( (4)
  17. Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
  18. );
  19. } else {
  20. emitter.onComplete(); (5)
  21. }
  22. return ++i;
  23. });
  24. }
  25. }

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.annotation.Controller
  2. import io.micronaut.http.annotation.Get
  3. import io.micronaut.http.sse.Event
  4. import io.micronaut.scheduling.TaskExecutors
  5. import io.micronaut.scheduling.annotation.ExecuteOn
  6. import io.reactivex.Emitter
  7. import io.reactivex.Flowable
  8. import io.reactivex.functions.BiFunction
  9. import org.reactivestreams.Publisher
  10. @Controller("/headlines")
  11. class HeadlineController {
  12. @ExecuteOn(TaskExecutors.IO)
  13. @Get(produces = MediaType.TEXT_EVENT_STREAM)
  14. Publisher<Event<Headline>> index() { (1)
  15. String[] versions = ["1.0", "2.0"] (2)
  16. def initialState = { -> 0 }
  17. def emitterFunction = { Integer i, Emitter emitter -> (3)
  18. if (i < versions.length) {
  19. emitter.onNext( (4)
  20. Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
  21. )
  22. } else {
  23. emitter.onComplete() (5)
  24. }
  25. return ++i
  26. }
  27. return Flowable.generate(initialState, emitterFunction as BiFunction<Integer,Emitter<Event<Headline>>,Integer>)
  28. }
  29. }

Publishing Server Sent Events from a Controller

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.micronaut.http.sse.Event
  5. import io.micronaut.scheduling.TaskExecutors
  6. import io.micronaut.scheduling.annotation.ExecuteOn
  7. import io.reactivex.Emitter
  8. import io.reactivex.Flowable
  9. import io.reactivex.functions.BiFunction
  10. import org.reactivestreams.Publisher
  11. import java.util.concurrent.Callable
  12. @Controller("/headlines")
  13. class HeadlineController {
  14. @ExecuteOn(TaskExecutors.IO)
  15. @Get(produces = [MediaType.TEXT_EVENT_STREAM])
  16. fun index(): Publisher<Event<Headline>> { (1)
  17. val versions = arrayOf("1.0", "2.0") (2)
  18. return Flowable.generate<Event<Headline>, Int>(Callable<Int>{ 0 }, BiFunction { (3)
  19. i: Int, emitter: Emitter<Event<Headline>> ->
  20. var nextInt: Int = i
  21. if (i < versions.size) {
  22. emitter.onNext( (4)
  23. Event.of<Headline>(Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
  24. )
  25. } else {
  26. emitter.onComplete() (5)
  27. }
  28. ++nextInt
  29. })
  30. }
  31. }
1The controller method returns a Publisher of Event
2For each version of Micronaut a headline is emitted
3The Flowable type’s generate method is used to generate a Publisher. The generate method accepts an initial value and a lambda that accepts the value and a Emitter. Note that this example executes on the same thread as the controller action, but you could use subscribeOn or map and existing “hot” Flowable.
4The Emitter interface’s onNext method is used to emit objects of type Event. The Event.of(ET) factory method is used to construct the event.
5The Emitter interface’s onComplete method is used to indicate when to finish sending server sent events.
You typically want to schedule SSE event streams on a separate executor. The previous example uses @ExecuteOn to execute the stream on the I/O executor.

The above example will send back a response of type text/event-stream and for each Event emitted the Headline type previously will be converted to JSON resulting in responses such as:

Server Sent Event Response Output

  1. data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
  2. data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}

You can use the methods of the Event interface to customize the Server Sent Event data sent back including associating event ids, comments, retry timeouts etc.