创建 Stream 和 Promise

如果你是数据源的拥有者,并且想要使其 Reactive 化,带有能直接访问各种 Reactive ExtensionReactive Stream 的能力,这里就是起点。

有时的情况是要使用 Stream 的 API 来扩展已经存在的 Reactive Stream Publisher ,而幸运的是我们也提供了一站式的静态 API 来处理这种情况。

就像我们利用 IterableStreamSingleValueStream 等等所做的那样创建使用 Reactor API 注入的 Publisher 源来扩展现有的 Reactor Stream 也是一种受到鼓励的选择

Stream 和 Promise 的花费相对也不大,我们的基准测试套件在商用硬件上成功创造了超过 150M/s 的记录。大多数的 Stream 都坚持使用 无分享模式(Share-Nothing ),只在需要时才创建新的不可变对象。

每一个操作都将返回新的实体:

  1. Stream<A> stream = Streams.just(a);
  2. Stream<B> transformedStream = stream.map(transformationToB);
  3. Assert.isTrue(transformationStream != stream);
  4. stream.subscribe(subscriber1); //subscriber1 将会看到数据 A 保持不变
  5. transformedStream.subscribe(subscriber2); //subscriber2 将会看到转换自 A 之后的数据 B。
  6. //注意这两个订阅者将实现独立的流管道,这是一个我们称之为提升的过程