绑定一个 Stream

Streams 操作 — 除了一些像终端动作和 broadcast() 的异常 — 将永远不会直接订阅,而是将懒惰地预备自己被订阅。这在函数式编程中常常被称为提升。

基本的意思就是 Reactor Stream 用户会明确的调用 Stream.subscribe(Subscriber) 或者可选的终端动作,比如 Stream.consume(Consumer) 来实现所有注册了的操作。在那之前 Actions 并不是真的存在的。我们使用 Stream.lift(Supplier) 来将这些 Action 的创建延迟到 Stream.subscribe(Subscriber) 被明确调用。

当所有的东西都绑定好了,每个动作都会维持一个上行流 Subscription 和一个下行流 Subscription 而 Reactive Streams 所有的约定都会应用到管道。

通常终端的动作会返回一个 Control 对象而不是 Stream。这是一个你可以用来请求或者取消一个管道的组件,不用再一个 Subscriber 上下文里面或者是实现整个 Subscriber 约定。

绑定两个管道

  1. import static reactor.Environment.*;
  2. import reactor.rx.Streams;
  3. import reactor.rx.Stream;
  4. //...
  5. Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");
  6. //prepare two unique pipelines
  7. Stream<String> actionChain1 = stream.map(String::toUpperCase).filter(w -> w.equals("C"));
  8. Stream<Long> actionChain2 = stream.dispatchOn(sharedDispatcher()).take(5).count();
  9. actionChain1.consume(System.out::println); //start chain1
  10. Control c = actionChain2.consume(System.out::println); //start chain2
  11. //...
  12. c.cancel(); //force this consumer to stop receiving data

enter description here

图 10. 绑定之后

发布/订阅(Publish/Subscribe)

要从一个统一管道向订阅者输出, 可以使用 Stream.process(Processor)Stream.broadcast()Stream.broadcastOn()Stream.broadcastTo()

共享一个上行流管道,并绑定两个下行流管道

  1. import static reactor.Environment.*;
  2. import reactor.rx.Streams;
  3. import reactor.rx.Stream;
  4. //...
  5. Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");
  6. //prepare a shared pipeline
  7. Stream<String> sharedStream = stream.observe(System.out::println).broadcast();
  8. //prepare two unique pipelines
  9. Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C"));
  10. Stream<Long> actionChain2 = sharedStream.take(5).count();
  11. actionChain1.consume(System.out::println); //start chain1
  12. actionChain2.consume(System.out::println); //start chain2

enter description here

图 11. 在绑定一个共享的Stream之后
表 8. 考虑终端的或者明确的订阅的操作
Stream<T> 方法返回类型作用
subscribe(Subscriber<T>) subscribeOn void 订阅传入的 Subscriber<T> 并将任何附带的上行流实例化, 进行懒绑定(针对非终端操作的非明确的提升)。注意一个 Subscriber 必须请求数据,如果它期望一些数据的话。dispatchOnsubscribeOn 是提供给用传入的 Dispatcher 发送 onSubscribe 信号的可选项。
consume(Consumer<T>,Consumer<T>,Consumer<T>) consumeOn Control 每当有相关的信号被侦测到,就会使用一个同每个传入的 Consumer 交互的 ConsumerAction 来调用 subscribe。它将会向接收的 Subscription 请求 request(Streams.capacity()) 获取容量限制, 默认是 Long.MAX_VALUE, 这会导致无节制的使用。subscribeOnconsumeOn 是提供给传入的 Dispatcher 发送 onSubscribe 信号的可选项。如有必要,就返回一个 Control 组件来取消 实例化的 Stream。 注意如果 onNext(T) 信号触发了一个阻塞请求, ConsumeAction 就会维持一个无限的递归。
consumeLater() Control 类似于 consume,但并不会启动一个初始的 Subscription.request(long)。 返回的 Control 可以被用来在任何时候执行 request(long)
tap() TapAndControls 类似于 consume 但返回的是一个 TapAndControls,它将会在每次 onNext(T) 接收到信号或者被取消时动态的更新。
batchConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Long,Long>)batchConsumeOn Control 类似于 consume 但将会请求映射的 Long 需求,给定了之前的需求并且以默认的 Stream.capacity() 开始。 对于要动态适配各种因素的需求很有用。
adaptiveConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Stream<Long>,Publisher<Long>>) adaptiveConsumeOn Control 类似于 batchConsume,但是将会请求需要的 Long 值计算出来的序列。它可以被用来插入流程控制,比如延迟需求的 Streams.timer()
next() Promise<T> 返回一个 Promise<T> ,它会积极的向 Stream 发出订阅,对其进行实例化,并在解除注册之前请求一个信号数据。最近的 onNext(T), onComplete() or onError(Throwable)信号 将会满足约定。
toList() Promise<List<T>> 类似于 next(),但是会等待直到整个序列已经产生 (onComplete()) 并且会在一个单独的 List<T> 中传递累积的 onNext(T) 以满足返回的约定。
Stream.toBlockingQueue() CompletableBlockingQueue<T> Stream 发出信号并返回一个累积所有 onNext 信号的阻塞 Queue<T>CompletableBlockingQueue.isTerminated() 可以被用来作为一个阻塞 poll() 循环退出的条件。
cache() Stream<T> 将任何 Stream 转成 Stream,能够针对每个 Subscriber 单独回放所有的信号序列。因为动作的无边界属性,所以你可能应该只将它用于小的 (ish) 序列上。
broadcast() broadcastOn(Environment, Dispatcher) Stream<T> 将任何 Stream 转成 Stream。这将会阻止管道通过当前的 Stream 的实例化发生重复,并准备向 N 个 Subscriber 下行流发布该信号。 需求将会从所有的子 Subscriber 处聚集。
broadcastTo(Subscriber<T>) Subscriber<T> 一个 Stream.subscribe 替代,因为返回的实体就是传入的参数,所以允许方法链。
process(Processor<T, O>) Stream<O> 类似于 broadcast() 但接受任何给定的 Processor<T, O>。这里有一个对于 Core Processor 的完美介绍!