7.1.4 Streaming JSON over HTTP

Micronaut’s HTTP client includes support for streaming data over HTTP via the ReactorStreamingHttpClient interface which includes methods specific to streaming including:

Table 1. HTTP Streaming Methods
MethodDescription

dataStream(HttpRequest<I> request)

Returns a stream of data as a reactor:Flux[] of ByteBuffer

exchangeStream(HttpRequest<I> request)

Returns the HttpResponse wrapping a reactor:Flux[] of ByteBuffer

jsonStream(HttpRequest<I> request)

Returns a non-blocking stream of JSON objects

To use JSON streaming, declare a controller method on the server that returns a application/x-json-stream of JSON objects. For example:

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType;
  2. import io.micronaut.http.annotation.Controller;
  3. import io.micronaut.http.annotation.Get;
  4. import org.reactivestreams.Publisher;
  5. import reactor.core.publisher.Flux;
  6. import reactor.core.publisher.Mono;
  7. import java.time.Duration;
  8. import java.time.ZonedDateTime;
  9. import java.time.temporal.ChronoUnit;
  10. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  11. Publisher<Headline> streamHeadlines() {
  12. return Mono.fromCallable(() -> { (2)
  13. Headline headline = new Headline();
  14. headline.setText("Latest Headline at " + ZonedDateTime.now());
  15. return headline;
  16. }).repeat(100) (3)
  17. .delayElements(Duration.of(1, ChronoUnit.SECONDS)); (4)
  18. }

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import reactor.core.publisher.Flux
  5. import reactor.core.publisher.Mono
  6. import java.time.Duration
  7. import java.time.ZonedDateTime
  8. import java.time.temporal.ChronoUnit
  9. import java.util.concurrent.TimeUnit
  10. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  11. Flux<Headline> streamHeadlines() {
  12. Mono.fromCallable({ (2)
  13. new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
  14. }).repeat(100) (3)
  15. .delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
  16. }

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import reactor.core.publisher.Flux
  5. import reactor.core.publisher.Mono
  6. import java.time.Duration
  7. import java.time.ZonedDateTime
  8. import java.time.temporal.ChronoUnit
  9. import java.util.concurrent.TimeUnit.SECONDS
  10. @Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) (1)
  11. internal fun streamHeadlines(): Flux<Headline> {
  12. return Mono.fromCallable { (2)
  13. val headline = Headline()
  14. headline.text = "Latest Headline at ${ZonedDateTime.now()}"
  15. headline
  16. }.repeat(100) (3)
  17. .delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
  18. }
1The streamHeadlines method produces application/x-json-stream
2A reactor:Flux[] is created from a Callable function (note no blocking occurs within the function, so this is ok, otherwise you should subscribeOn an I/O thread pool).
3The reactor:Flux[] repeats 100 times
4The reactor:Flux[] emits items with a delay of one second between each
The server does not have to be written in Micronaut, any server that supports JSON streaming will do.

Then on the client, subscribe to the stream using jsonStream and every time the server emits a JSON object the client will decode and consume it:

Streaming JSON on the Client

  1. Flux<Headline> headlineStream = Flux.from(client.jsonStream(
  2. GET("/streaming/headlines"), Headline.class)); (1)
  3. CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
  4. headlineStream.subscribe(new Subscriber<Headline>() {
  5. @Override
  6. public void onSubscribe(Subscription s) {
  7. s.request(1); (3)
  8. }
  9. @Override
  10. public void onNext(Headline headline) {
  11. System.out.println("Received Headline = " + headline.getText());
  12. future.complete(headline); (4)
  13. }
  14. @Override
  15. public void onError(Throwable t) {
  16. future.completeExceptionally(t); (5)
  17. }
  18. @Override
  19. public void onComplete() {
  20. // no-op (6)
  21. }
  22. });

Streaming JSON on the Client

  1. Flux<Headline> headlineStream = Flux.from(client.jsonStream(
  2. GET("/streaming/headlines"), Headline)) (1)
  3. CompletableFuture<Headline> future = new CompletableFuture<>() (2)
  4. headlineStream.subscribe(new Subscriber<Headline>() {
  5. @Override
  6. void onSubscribe(Subscription s) {
  7. s.request(1) (3)
  8. }
  9. @Override
  10. void onNext(Headline headline) {
  11. println "Received Headline = $headline.text"
  12. future.complete(headline) (4)
  13. }
  14. @Override
  15. void onError(Throwable t) {
  16. future.completeExceptionally(t) (5)
  17. }
  18. @Override
  19. void onComplete() {
  20. // no-op (6)
  21. }
  22. })

Streaming JSON on the Client

  1. val headlineStream = client.jsonStream(
  2. GET<Any>("/streaming/headlines"), Headline::class.java) (1)
  3. val future = CompletableFuture<Headline>() (2)
  4. headlineStream.subscribe(object : Subscriber<Headline> {
  5. override fun onSubscribe(s: Subscription) {
  6. s.request(1) (3)
  7. }
  8. override fun onNext(headline: Headline) {
  9. println("Received Headline = ${headline.text!!}")
  10. future.complete(headline) (4)
  11. }
  12. override fun onError(t: Throwable) {
  13. future.completeExceptionally(t) (5)
  14. }
  15. override fun onComplete() {
  16. // no-op (6)
  17. }
  18. })
1The jsonStream method returns a reactor:Flux[]
2A CompletableFuture is used to receive a value, but what you do with each emitted item is application-specific
3The Subscription requests a single item. You can use the Subscription to regulate back pressure and demand.
4The onNext method is called when an item is emitted
5The onError method is called when an error occurs
6The onComplete method is called when all Headline instances have been emitted

Note neither the server nor the client in the example above perform any blocking I/O.