分析

度量操作和其它状态化操作一样,都是 Stream API 的一部分。实际上,熟悉 Spark 的用户能够认出一些方法。ScanAction 也提供了一些常用的同 reduce()scan() 相关的累积功能。

使用键/值型数据和度量操作
  1. Broadcaster<Integer> source = Broadcaster.<Integer> create(Environment.get());
  2. long avgTime = 50l;
  3. Promise<Long> result = source
  4. .throttle(avgTime) (1)
  5. .elapsed() (2)
  6. .nest() (3)
  7. .flatMap(self ->
  8. BiStreams.reduceByKey(self, (prev, next) -> prev + 1) (4)
  9. )
  10. .sort((a,b) -> a.t1.compareTo(b.t1)) (5)
  11. .log("elapsed")
  12. .reduce(-1L, (acc, next) ->
  13. acc > 0l ? ((next.t1 + acc) / 2) : next.t1 (6)
  14. )
  15. .next(); (7)
  16. for (int i = 0; i < 10; i++) {
  17. source.onNext(1);
  18. }
  19. source.onComplete();
  • 将传入的订阅者(Publisher)减速至每 50 毫秒一次,逐个等待数据发出。
  • onSubscribe 和 第一个信号之间,或是在两个信号之间产生一个拥有时间增量有效载荷Tuple2
  • 使当前流可以接收 onNext 信号,以便我们将其同 flatMap组合。
  • 累积所有数据,直到以 Tuple2.t1Tuple2.t2 为键值对的内部 Map 发出 onComplete() 信号。下一个匹配的主键将为累加器 BiFunction 提供前一次的值和新发出的 onNext 信号。这样我们就可以每个键增加一个有效载荷。
  • 累积所有数据,直到内部 PriorityQueue 发出 onComplete() 信号,并使用给定比较器对流逝的时间 t1 进行排序。在 onComplete() 之后,所有的数据都会按顺序发出,然后就完成了。
  • 累积所有数据,直到 onComplete() 信号的平均传送时间为默认的首次被接收的时间。
  • 发出下一个信号,并且只计算平均值。

输出

  1. 03:14:42.013 [main] INFO elapsed - subscribe: ScanAction
  2. 03:14:42.021 [main] INFO elapsed - onSubscribe: {push}
  3. 03:14:42.022 [main] INFO elapsed - request: 9223372036854775807
  4. 03:14:42.517 [hash-wheel-timer-run-3] INFO elapsed - onNext: 44,1
  5. 03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 48,1
  6. 03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 49,2
  7. 03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 50,3
  8. 03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 51,3
  9. 03:14:42.519 [hash-wheel-timer-run-3] INFO elapsed - complete: SortAction
  10. 03:14:42.520 [hash-wheel-timer-run-3] INFO elapsed - cancel: SortAction
表20,度量操作和其它状态化累积操作可用的操作
Stream<T> API 或工厂函数输出值作用
count() Long 在观测到 onComplete() 信号后,产生观测到的 onNext(T) 信号的总量。对定时的窗体(Windows)很有用,对限制大小的窗体(Windows)意义不大。比如说 stream.window(5).flatMap(w -> w.count()) 的结果是 5,棒棒的。
scan(BiFunction<T,T>) T
scan(A, BiFunction<A,T>) A
reduce(BiFunction<T,T>) T
reduce(A, BiFunction<A,T>) A
BiStreams.reduceByKey()
BiStreams.scanByKey()
timestamp() Tuple2<Long,T>
elapsed() Tuple2<Long,T>
materialize() dematerialize() Signal<T> 将上游信号转换为 Signal<T> ,并将其视为 onNext(Signal<T>) 信号进行处理。直接效果就是:它将接收错误信号和完结信号,因此可以用来高效的处理错误。一旦有错误产生,我们可以将 dematerialize() 回调中的 Signal<T> 转化到 Reactive Streams,保证服务的运行。