reactor-stream

注意,你应该再也别去使用 Future.get() 了。 — Stephane Maldini 与一个银行业的客户

首先来看看一个 Java 8 示例中流 (Stream) 的运作方式

  1. import static reactor.Environment.*;
  2. import reactor.rx.Streams;
  3. import reactor.rx.BiStreams;
  4. //...
  5. Environment.initialize()
  6. //找到一个 String 列表中开头的 10 个词
  7. Streams.from(aListOfString)
  8. .dispatchOn(sharedDispatcher())
  9. .flatMap(sentence ->
  10. Streams
  11. .from(sentence.split(" "))
  12. .dispatchOn(cachedDispatcher())
  13. .filter(word -> !word.trim().isEmpty())
  14. .observe(word -> doSomething(word))
  15. )
  16. .map(word -> Tuple.of(word, 1))
  17. .window(1, TimeUnit.SECONDS)
  18. .flatMap(words ->
  19. BiStreams.reduceByKey(words, (prev, next) -> prev + next)
  20. .sort((wordWithCountA, wordWithCountB) -> -wordWithCountA.t2.compareTo(wordWithCountB.t2))
  21. .take(10)
  22. .finallyDo(event -> LOG.info("---- window complete! ----"))
  23. )
  24. .consume(
  25. wordWithCount -> LOG.info(wordWithCount.t1 + ": " + wordWithCount.t2),
  26. error -> LOG.error("", error)
  27. );