使用缓冲区

将数据 T 按照序列分组为列表 List<T> 的主要目的有二:

  • 将匹配分界条件的序列暴露给一个 JVM API 常用的Iterable 结构体。
  • 减少 onNext(T) 的信号量,类如 buffer(5) 会将一个有10元素的序列转换成2个列表(每个列表有5个元素)。

收集数据将会产生内存甚或 CPU 的开销,应当适当的调整大小。建议使用小巧且定时的分界,以避免任何长时间的聚合。

如果一个 buffer() 被标记为定时的,却并未提供 Timer 参数时,必须先为其初始化一个环境(Environment)

  1. long timeout = 100;
  2. final int batchsize = 4;
  3. CountDownLatch latch = new CountDownLatch(1);
  4. final Broadcaster<Integer> streamBatcher = Broadcaster.<Integer>create(env);
  5. streamBatcher
  6. .buffer(batchsize, timeout, TimeUnit.MILLISECONDS)
  7. .consume(i -> latch.countDown());
  8. streamBatcher.onNext(12);
  9. streamBatcher.onNext(123);
  10. Thread.sleep(200);
  11. streamBatcher.onNext(42);
  12. streamBatcher.onNext(666);
  13. latch.await(2, TimeUnit.SECONDS);
表 10,使用Stream buffers进行块处理(返回Stream<List<T>>):
Stream<T> API作用
buffer(int) 聚合数据,直到 onComplete() 被调用;或是到达给定 int 参数的值,然后开始一个新的聚合。
buffer(Publisher<?>, Supplier<? extends Publisher<?>>) 聚合数据,直到 onComplete() 被调用,或是第一个 Publisher<?> 参数发出信号。可选参数 Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>>
buffer(Supplier<? extends Publisher<?>>) 聚合数据,直到 onComplete() 被调用;或是与提供的 Publisher<?> 协调。Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合并立即开始一个新的聚合。
buffer(int, int) 聚合数据,直到 onComplete() 被调用;或是到达给定的忽略值(第二个参数),然后开始一个新的聚合。第一个尺寸参数int 将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>>
buffer(long, TimeUnit, Timer) 聚合数据,直到 onComplete() 被调用;或是到达等待时长(第一个长整型参数),然后开始一个新的聚合。
buffer(long, long, TimeUnit, Timer) 聚合数据,直到 onComplete() 被调用;或是到达给定的时移(第二个长整型参数),然后开始一个新的聚合。时间跨度(第一个长整型参数)将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>>
buffer(int, long, TimeUnit, Timer) buffer(int) buffer(long, TimeUnit, Timer) 条件的组合。数据聚合的过程在到达给定大小时间跨度耗尽时完成。