从冷数据源创建

你可以从各种来源创建 Stream,包括一个已知值的 Iterable 对象,一个用来作为基础任务流的单一值,或者甚至是来自于诸如 Future 或者 Supplier 这样的块结构。

Streams.just()

  1. Stream<String> st = Streams.just("Hello ", "World", "!"); (1)
  2. st.dispatchOn(Environment.cachedDispatcher()) (2)
  3. .map(String::toUpperCase) (3)
  4. .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (4)
  • 从一个已知值创建 Stream,但并不指定一个默认的 调度器(Dispatcher)
  • .dispatchOn(Dispatcher) 告诉 Stream 在哪一个线程上执行任务。用这来将任务的执行从一个线程转移到另外一个线程。
  • 使用常见的约定 —— map() 方法 —— 来对输入进行转化。
  • 在管道上产生需求,这意味着“处理开始了”。这是一个对 subscribe(Subscriber) 进行了优化后的快捷方式,默认只请求 Long.MAX_VALUE 次。

冷数据源从一开始就会因为每一个传入 Stream.subscribe(Subscriber) 的新的 Subscriber 而被重新发放而因此就可能发生重复的消耗。

表5, 创建预先确定的 Stream 和 Promise
工厂方法数据类型作用
Streams.<T>empty() T 只在被订阅者请求时,发出一次 onComplete()
Streams.<T>never() T 从不发出任何东西。 对于保持活动状态的行为很有用。
Streams.<T, Throwable>fail(Throwable) T 只发出 onError(Throwable)
Streams.from(Future<T>) T 在传入的可能会发出 onNext(T)onComplete() ,或者异常时发出 onError(Throwable)Future.get() 上阻止 Subscription.request(long)
Streams.from(T[]) T 每次 Subscription.request(N) 被调用到时发出 N 个 onNext(T) 元素。 如果 N == Long.MAXVALUE, 就发出所有数据。 一旦整个数组都已经被读取了一遍,就发出 onComplete()
Streams.from(Iterable<T>) T 每次 Subscription.request(N) 被调用到时发出 N 个 onNext(T) 元素。 如果 N == Long.MAX_VALUE, 就发出所有数据。一旦整个数组都已经被读取了一遍,就发出 onComplete()
Streams.range(long, _long) Long 每次 Subscription.request(N) 被调用到时就发出有 N 个 onNext(Long) 的一个序列。如果 N == Long.MAXVALUE, 就发出所有东西。 一旦读取达到所能包容的上限,就发出 onComplete()
Streams.just(T, _T, T, T, T, T, T, T) T 在只是行为相似的 Streams.from(Iterable) 的一种优化。 用来发送没有和 Streams.from() 签名相冲突的 Iterable, Array 或者 Future 也很有用。
Streams.generate(Supplier<T>) T 每当 Subscription.request(N) 被调用时就发送从 Supplier.get() 工厂产出的 onNext(T)。忽略掉要求的数量 N,因为只有一个数据会被发送。当返回一个 null 值时,就发送 onComplete()
Promises.syncTask(Supplier<T>), Promises.task(, Supplier<T>) T Subscription.request(N) 第一次接收时发送一个从 Supplier.get() 产生的 onNext(T)onComplete() 。忽略掉数量 N。
Promises.success(T) T 无论一个订阅者何时被提供给 Promise.subscribe(Subscriber),都发送 onNext(T)onComplete()
Promises.<T>error(Throwable) T 无论一个被订阅了的订阅者何时被提供给 Promise.subscribe(Subscriber),都发送 onError(Throwable)