7.1.4 Streaming JSON over HTTP

Micronaut’s HTTP client includes support for streaming data over HTTP via the RxStreamingHttpClient interface which includes methods specific to HTTP streaming including:

Table 1. HTTP Streaming Methods
MethodDescription

dataStream(HttpRequest<I> request)

Returns a stream of data as a Flowable of ByteBuffer

exchangeStream(HttpRequest<I> request)

Returns the HttpResponse wrapping a Flowable of ByteBuffer

jsonStream(HttpRequest<I> request)

Returns a non-blocking stream of JSON objects

In order to do JSON streaming you should on the server side declare a controller method that returns a application/x-json-stream of JSON objects. For example:

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType;
  2. import io.micronaut.http.annotation.Controller;
  3. import io.micronaut.http.annotation.Get;
  4. import io.reactivex.Flowable;
  5. import java.time.ZonedDateTime;
  6. import java.util.concurrent.TimeUnit;
  7. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  8. Flowable<Headline> streamHeadlines() {
  9. return Flowable.fromCallable(() -> { (2)
  10. Headline headline = new Headline();
  11. headline.setText("Latest Headline at " + ZonedDateTime.now());
  12. return headline;
  13. }).repeat(100) (3)
  14. .delay(1, TimeUnit.SECONDS); (4)
  15. }

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.reactivex.Flowable
  5. import java.time.ZonedDateTime
  6. import java.util.concurrent.TimeUnit
  7. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  8. Flowable<Headline> streamHeadlines() {
  9. Flowable.fromCallable({ (2)
  10. Headline headline = new Headline()
  11. headline.setText("Latest Headline at " + ZonedDateTime.now())
  12. return headline
  13. }).repeat(100) (3)
  14. .delay(1, TimeUnit.SECONDS) (4)
  15. }

Streaming JSON on the Server

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Controller
  3. import io.micronaut.http.annotation.Get
  4. import io.reactivex.Flowable
  5. import java.time.ZonedDateTime
  6. import java.util.concurrent.TimeUnit
  7. @Get(value = "/headlines", processes = [MediaType.APPLICATION_JSON_STREAM]) (1)
  8. internal fun streamHeadlines(): Flowable<Headline> {
  9. return Flowable.fromCallable {
  10. (2)
  11. val headline = Headline()
  12. headline.text = "Latest Headline at " + ZonedDateTime.now()
  13. headline
  14. }.repeat(100) (3)
  15. .delay(1, TimeUnit.SECONDS) (4)
  16. }
1A method streamHeadlines is defined that produces application/x-json-stream
2A Flowable is created from a Callable function (note no blocking occurs within the function so this is ok, otherwise you would want to subscribeOn an I/O thread pool).
3The Flowable is set to repeat 100 times
4The Flowable will emit items with a delay of 1 second between each item
The server does not have to be written in Micronaut, any server that supports JSON streaming will do.

Then on the client simply subscribe to the stream using jsonStream and every time the server emits a JSON object the client will decode and consume it:

Streaming JSON on the Client

  1. Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class); (1)
  2. CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
  3. headlineStream.subscribe(new Subscriber<Headline>() {
  4. @Override
  5. public void onSubscribe(Subscription s) {
  6. s.request(1); (3)
  7. }
  8. @Override
  9. public void onNext(Headline headline) {
  10. System.out.println("Received Headline = " + headline.getText());
  11. future.complete(headline); (4)
  12. }
  13. @Override
  14. public void onError(Throwable t) {
  15. future.completeExceptionally(t); (5)
  16. }
  17. @Override
  18. public void onComplete() {
  19. // no-op (6)
  20. }
  21. });

Streaming JSON on the Client

  1. Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class) (1)
  2. CompletableFuture<Headline> future = new CompletableFuture<>() (2)
  3. headlineStream.subscribe(new Subscriber<Headline>() {
  4. @Override
  5. void onSubscribe(Subscription s) {
  6. s.request(1) (3)
  7. }
  8. @Override
  9. void onNext(Headline headline) {
  10. System.out.println("Received Headline = " + headline.getText())
  11. future.complete(headline) (4)
  12. }
  13. @Override
  14. void onError(Throwable t) {
  15. future.completeExceptionally(t) (5)
  16. }
  17. @Override
  18. void onComplete() {
  19. // no-op (6)
  20. }
  21. })

Streaming JSON on the Client

  1. val headlineStream = client.jsonStream(GET<Any>("/streaming/headlines"), Headline::class.java) (1)
  2. val future = CompletableFuture<Headline>() (2)
  3. headlineStream.subscribe(object : Subscriber<Headline> {
  4. override fun onSubscribe(s: Subscription) {
  5. s.request(1) (3)
  6. }
  7. override fun onNext(headline: Headline) {
  8. println("Received Headline = " + headline.text!!)
  9. future.complete(headline) (4)
  10. }
  11. override fun onError(t: Throwable) {
  12. future.completeExceptionally(t) (5)
  13. }
  14. override fun onComplete() {
  15. // no-op (6)
  16. }
  17. })
1The jsonStream method is used return a Flowable
2A CompletableFuture is used in the example to receive a value, but what you do with each emitted item is application specific
3The Subscription is used to request a single item. You can use the Subscription to regulate back pressure and demand.
4The onNext method is called when an item is emitted
5The onError method is called when an error occurs
6The onComplete method is called when all Headline instances have been emitted

Note neither the server or the client in the example above perform blocking I/O at any point.