7.3.2 Streaming with @Client

The @Client annotation can also handle streaming HTTP responses.

Streaming JSON with @Client

For example, to write a client that streams data from the controller defined in the JSON Streaming section of the documentation, you can define a client that returns an unbound Publisher such as Reactor’s reactor:Flux[] or a RxJava’s Flowable:

HeadlineClient.java

  1. import io.micronaut.http.annotation.Get;
  2. import io.micronaut.http.client.annotation.Client;
  3. import org.reactivestreams.Publisher;
  4. import reactor.core.publisher.Flux;
  5. import static io.micronaut.http.MediaType.APPLICATION_JSON_STREAM;
  6. @Client("/streaming")
  7. public interface HeadlineClient {
  8. @Get(value = "/headlines", processes = APPLICATION_JSON_STREAM) (1)
  9. Publisher<Headline> streamHeadlines(); (2)

HeadlineClient.java

  1. import io.micronaut.http.annotation.Get
  2. import io.micronaut.http.client.annotation.Client
  3. import org.reactivestreams.Publisher
  4. import static io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
  5. @Client("/streaming")
  6. interface HeadlineClient {
  7. @Get(value = "/headlines", processes = APPLICATION_JSON_STREAM) (1)
  8. Publisher<Headline> streamHeadlines() (2)
  9. }

HeadlineClient.java

  1. import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
  2. import io.micronaut.http.annotation.Get
  3. import io.micronaut.http.client.annotation.Client
  4. import reactor.core.publisher.Flux
  5. @Client("/streaming")
  6. interface HeadlineClient {
  7. @Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) (1)
  8. fun streamHeadlines(): Flux<Headline> (2)
1The @Get method processes responses of type APPLICATION_JSON_STREAM
2The return type is Publisher

The following example shows how the previously defined HeadlineClient can be invoked from a test:

Streaming HeadlineClient

  1. @Test
  2. public void testClientAnnotationStreaming() {
  3. try(EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class)) {
  4. HeadlineClient headlineClient = embeddedServer
  5. .getApplicationContext()
  6. .getBean(HeadlineClient.class); (1)
  7. Mono<Headline> firstHeadline = Mono.from(headlineClient.streamHeadlines()); (2)
  8. Headline headline = firstHeadline.block(); (3)
  9. assertNotNull(headline);
  10. assertTrue(headline.getText().startsWith("Latest Headline"));
  11. }
  12. }

Streaming HeadlineClient

  1. void "test client annotation streaming"() throws Exception {
  2. when:
  3. def headlineClient = embeddedServer.applicationContext
  4. .getBean(HeadlineClient) (1)
  5. Mono<Headline> firstHeadline = Mono.from(headlineClient.streamHeadlines()) (2)
  6. Headline headline = firstHeadline.block() (3)
  7. then:
  8. headline
  9. headline.text.startsWith("Latest Headline")
  10. }

Streaming HeadlineClient

  1. "test client annotation streaming" {
  2. val headlineClient = embeddedServer
  3. .applicationContext
  4. .getBean(HeadlineClient::class.java) (1)
  5. val firstHeadline = headlineClient.streamHeadlines().next() (2)
  6. val headline = firstHeadline.block() (3)
  7. headline shouldNotBe null
  8. headline.text shouldStartWith "Latest Headline"
  9. }
1The client is retrieved from the ApplicationContext
2The next method emits the first element emmited by the reactor:Flux[] into a reactor:Mono[].
3The block() method retrieves the result in the test.

Streaming Clients and Response Types

The example defined in the previous section expects the server to respond with a stream of JSON objects, and the content type to be application/x-json-stream. For example:

A JSON Stream

  1. {"title":"The Stand"}
  2. {"title":"The Shining"}

The reason for this is simple; a sequence of JSON object is not, in fact, valid JSON and hence the response content type cannot be application/json. For the JSON to be valid it would have to return an array:

A JSON Array

  1. [
  2. {"title":"The Stand"},
  3. {"title":"The Shining"}
  4. ]

Micronaut’s client does however support streaming of both individual JSON objects via application/x-json-stream and also JSON arrays defined with application/json.

If the server returns application/json and a non-single Publisher is returned (such as a Reactor’s reactor:Flux[] or a RxJava’s Flowable), the client streams the array elements as they become available.

Streaming Clients and Read Timeout

When streaming responses from servers, the underlying HTTP client will not apply the default readTimeout setting (which defaults to 10 seconds) of the HttpClientConfiguration since the delay between reads for streaming responses may differ from normal reads.

Instead, the read-idle-timeout setting (which defaults to 5 minutes) dictates when to close a connection after it becomes idle.

If you stream data from a server that defines a longer delay than 5 minutes between items, you should adjust readIdleTimeout. The following configuration in application.yml demonstrates how:

Adjusting the readIdleTimeout

  1. micronaut:
  2. http:
  3. client:
  4. read-idle-timeout: 10m

The above example sets the readIdleTimeout to ten minutes.

Streaming Server Sent Events

Micronaut features a native client for Server Sent Events (SSE) defined by the interface SseClient.

You can use this client to stream SSE events from any server that emits them.

Although SSE streams are typically consumed by a browser EventSource, there are cases where you may wish to consume an SSE stream via SseClient, such as in unit tests or when a Micronaut service acts as a gateway for another service.

The @Client annotation also supports consuming SSE streams. For example, consider the following controller method that produces a stream of SSE events:

SSE Controller

  1. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
  2. Publisher<Event<Headline>> streamHeadlines() {
  3. return Flux.<Event<Headline>>create((emitter) -> { (2)
  4. Headline headline = new Headline();
  5. headline.setText("Latest Headline at " + ZonedDateTime.now());
  6. emitter.next(Event.of(headline));
  7. emitter.complete();
  8. }, FluxSink.OverflowStrategy.BUFFER)
  9. .repeat(100) (3)
  10. .delayElements(Duration.of(1, ChronoUnit.SECONDS)); (4)
  11. }

SSE Controller

  1. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
  2. Flux<Event<Headline>> streamHeadlines() {
  3. Flux.<Event<Headline>>create( { emitter -> (2)
  4. Headline headline = new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
  5. emitter.next(Event.of(headline))
  6. emitter.complete()
  7. }, FluxSink.OverflowStrategy.BUFFER)
  8. .repeat(100) (3)
  9. .delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
  10. }

SSE Controller

  1. @Get(value = "/headlines", processes = [TEXT_EVENT_STREAM]) (1)
  2. internal fun streamHeadlines(): Flux<Event<Headline>> {
  3. return Flux.create<Event<Headline>>( { emitter -> (2)
  4. val headline = Headline()
  5. headline.text = "Latest Headline at ${ZonedDateTime.now()}"
  6. emitter.next(Event.of(headline))
  7. emitter.complete()
  8. }, FluxSink.OverflowStrategy.BUFFER)
  9. .repeat(100) (3)
  10. .delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
  11. }
1The controller defines a @Get annotation that produces a MediaType.TEXT_EVENT_STREAM
2The method uses Reactor to emit a Headline object
3The repeat method repeats the emission 100 times
4With a delay of one second between each

Notice that the return type of the controller is also Event and that the Event.of method creates events to stream to the client.

To define a client that consumes the events, define a method that processes MediaType.TEXT_EVENT_STREAM:

SSE Client

  1. @Client("/streaming/sse")
  2. public interface HeadlineClient {
  3. @Get(value = "/headlines", processes = TEXT_EVENT_STREAM)
  4. Publisher<Event<Headline>> streamHeadlines();
  5. }

SSE Client

  1. @Client("/streaming/sse")
  2. interface HeadlineClient {
  3. @Get(value = "/headlines", processes = TEXT_EVENT_STREAM)
  4. Flux<Event<Headline>> streamHeadlines()
  5. }

SSE Client

  1. @Client("/streaming/sse")
  2. interface HeadlineClient {
  3. @Get(value = "/headlines", processes = [TEXT_EVENT_STREAM])
  4. fun streamHeadlines(): Flux<Event<Headline>>
  5. }

The generic type of the Flux can be either an Event, in which case you will receive the full event object, or a POJO, in which case you will receive only the data contained within the event converted from JSON.