创建非阻塞服务

首先要做的就是隔离微服务的访问。现在,我们将不再返回 TFuture<T>,取而代之的将是 Publisher<T> 以及特定的 Stream<T>Promise<T>。立竿见影的好处就是我们可以不必费心考虑错误处理或是线程(仅是现在)了:错误将在 onError 调用(非冒泡)中传递,而线程可以在后续过程中使用类似 dispatchOn 的方法进行调整。另一个好处就是它能让我们的代码更加实用。Java 8 Lambdas 也可以正常使用。我们的目标是减少控制语句中的括号噪音(if、for、while 等),并限制上下文中分享的需求。我们的最终设计目标就是鼓励在大型数据集中流式轮询:函数将适用于序列,一个一个返回结果,避免循环重复。

我们提倡使用实现的部件且非 Publisher<T> 在编译时访问所有的 Reactor Stream API,除非你希望你的 API 隐藏(图书馆的开发者可能有此需求)。`Streams.wrap(Publisher<T>) 将会使尽解数将这种通用的返回类型转化成一个合适的 Stream<T>

表15,进化成响应型微服务,第一部分,UserService 中的错误隔离和非阻塞操作
不是很好的很好的
  1. public User get(String name)
  2. throws Exception {
  3. Result r = userDB.findByName(name);
  4. return convert(r);
  5. }
  6. public List<User> allFriends(User user)
  7. throws Exception {
  8. ResultSet rs = userDB.findAllFriends(user);
  9. return convertToList(r);
  10. }
  11. public Future<List<User>> filteredFind(String name)
  12. throws Exception {
  13. User user = get(name);
  14. if(user == null || !user.isAdmin()){
  15. return CompletedFuture.completedFuture(null);
  16. } else {
  17. //could be in an async thread if wanted
  18. return CompletedFuture.completedFuture(allFriends(user));
  19. }
  20. }
  1. public Promise<User> get(final String name) {
  2. return Promises
  3. .task( () -> userDB.findByName(name))
  4. .timeout(3, TimeUnit.Seconds)
  5. .map(this::convert)
  6. .subscribeOn(workDispatcher());
  7. }
  8. public Stream<User> allFriends(final User user) {
  9. return Streams
  10. .defer(() ->
  11. Streams.just(userDB.findAllFriends(user)))
  12. .timeout(3, TimeUnit.Seconds)
  13. .map(this::convertToList)
  14. .flatMap(Streams::from)
  15. .dispatchOn(cachedDispatcher());
  16. .subscribeOn(workDispatcher());
  17. }
  18. public Stream<User> filteredFind(String name){
  19. return get(name)
  20. .stream()
  21. .filter(User::isAdmin)
  22. .flatMap(this::allFriends);
  23. }
结果
- 所有的查询方法 时: - 不再有异常抛出,异常都在管道中传递。 - 不再有控制逻辑,我们使用例如 map 或 filter 这样的预定义操作。 - 只返回发布者(Stream 或 Promise)。 - 通过超时机制限制阻塞的查询(之后可以进行重试或者回退等操作)。 - 在订阅时使用一个分配好的 workDispatcher。 - get(name): 时: - 使用类型定义的单一数据发布者,或是Promise。 - 在订阅时,调用task回调函数。 - allFriends(user): 时: - 在 onSubscribe 线程中使用 defer() 惰性的调用数据库查询。 - 现在还没有背压策略,我们在一个阻塞(但异步)的调用中读取所有的结果。 - 在 FlatMap 中,我们将返回的列表转化为一个数据流。 - 在一个异步调度器中分派数据,这样下游处理不会影响读取操作。 - filteredFind(name): 时: - 我们使用 stream() 将第一个 get 操作获取到的 Promise 转化为 Stream。 - 如果用户可用,我们只需要调用 allFriends() sub-stream。 - 返回的 Stream 将在第一次 allFriends() 信号发出后恢复。