组合多个服务调用
第二步,我们将思维扩大到消费方面。在过渡阶段,请牢记我们的 Stream
可以使用运算符来阻塞 。 我们有两个问题亟需处理:一个是鲁棒性(网络分裂问题等),一个是如何避免在新服务执行前等待前一服务:
表16,进化成响应型微服务,第二部分,RickAndMortyService 中的并行请求和永续性
不是很好的 | 很好的 |
---|
int tries = 0;while(tries < 3){ try{ Future<List<User>> rickFriends = userService.fitleredFind("Rick"); Future<List<User>> mortyFriends = userService.fitleredFind("Morty"); System.out.println( rickFriends.get(3, TimeUnit.SECONDS) .addAll( mortyFriends.get(3, TimeUnit.SECONDS)) ); }catch(Exception e){ if(tries++ >= 3) throw e; Thread.sleep(tries*1000); }}
| return Streams.merge( userService.filteredFind("Rick"), userService.filteredFind("Morty")).buffer().retryWhen( errors -> errors .zipWith(Streams.range(1,3), t -> t.getT2()) .flatMap( tries -> Streams.timer(tries) )).consume(System.out::println);
|
结果 |
---|
- Streams.merge() 将两个查询合并,是一个非阻塞的协调操作。- buffer() 将聚合所有结果,直到运行完结或失败(在之前计数的)。- retryWhen(Function<Stream<Throwable>, Publisher<?>> 将在出错时保证重新订阅。 - zipWith 将合并错误,并进行至多 3 次的重试。 - zipWith 只返回元组重试的数量。 - flatMap + Streams.timer(long) 将每次重试转化为延迟信号(使用默认时间)。 - 每当此出错信号返回到发布者 时,取消并重新订阅,直到一个 onComplete 信号或 onError 信号被发出。 - flatMap 只在内部计数器和上游都完结时结束,也就是在 3次重试之后,或是在错误序列之后,它才会总结。 |