2.1 服务调用方式

Dubbo 支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。所谓“无返回值”异步调用是指服务消费方只管调用,但不关心调用结果,此时 Dubbo 会直接返回一个空的 RpcResult。若要使用异步特性,需要服务消费方手动进行配置。默认情况下,Dubbo 使用同步调用方式。

本节以及其他章节将会使用 Dubbo 官方提供的 Demo 分析整个调用过程,下面我们从 DemoService 接口的代理类开始进行分析。Dubbo 默认使用 Javassist 框架为服务接口生成动态代理类,因此我们需要先将代理类进行反编译才能看到源码。这里使用阿里开源 Java 应用诊断工具 Arthas 反编译代理类,结果如下:

  1. /**
  2. * Arthas 反编译步骤:
  3. * 1. 启动 Arthas
  4. * java -jar arthas-boot.jar
  5. *
  6. * 2. 输入编号选择进程
  7. * Arthas 启动后,会打印 Java 应用进程列表,如下:
  8. * [1]: 11232 org.jetbrains.jps.cmdline.Launcher
  9. * [2]: 22370 org.jetbrains.jps.cmdline.Launcher
  10. * [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
  11. * [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
  12. * [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
  13. * 这里输入编号 3,让 Arthas 关联到启动类为 com.....Consumer 的 Java 进程上
  14. *
  15. * 3. 由于 Demo 项目中只有一个服务接口,因此此接口的代理类类名为 proxy0,此时使用 sc 命令搜索这个类名。
  16. * $ sc *.proxy0
  17. * com.alibaba.dubbo.common.bytecode.proxy0
  18. *
  19. * 4. 使用 jad 命令反编译 com.alibaba.dubbo.common.bytecode.proxy0
  20. * $ jad com.alibaba.dubbo.common.bytecode.proxy0
  21. *
  22. * 更多使用方法请参考 Arthas 官方文档:
  23. * https://alibaba.github.io/arthas/quick-start.html
  24. */
  25. public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
  26. // 方法数组
  27. public static Method[] methods;
  28. private InvocationHandler handler;
  29. public proxy0(InvocationHandler invocationHandler) {
  30. this.handler = invocationHandler;
  31. }
  32. public proxy0() {
  33. }
  34. public String sayHello(String string) {
  35. // 将参数存储到 Object 数组中
  36. Object[] arrobject = new Object[]{string};
  37. // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
  38. Object object = this.handler.invoke(this, methods[0], arrobject);
  39. // 返回调用结果
  40. return (String)object;
  41. }
  42. /** 回声测试方法 */
  43. public Object $echo(Object object) {
  44. Object[] arrobject = new Object[]{object};
  45. Object object2 = this.handler.invoke(this, methods[1], arrobject);
  46. return object2;
  47. }
  48. }

如上,代理类的逻辑比较简单。首先将运行时参数存储到数组中,然后调用 InvocationHandler 接口实现类的 invoke 方法,得到调用结果,最后将结果转型并返回给调用方。关于代理类的逻辑就说这么多,继续向下分析。

  1. public class InvokerInvocationHandler implements InvocationHandler {
  2. private final Invoker<?> invoker;
  3. public InvokerInvocationHandler(Invoker<?> handler) {
  4. this.invoker = handler;
  5. }
  6. @Override
  7. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  8. String methodName = method.getName();
  9. Class<?>[] parameterTypes = method.getParameterTypes();
  10. // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
  11. if (method.getDeclaringClass() == Object.class) {
  12. return method.invoke(invoker, args);
  13. }
  14. // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
  15. if ("toString".equals(methodName) && parameterTypes.length == 0) {
  16. return invoker.toString();
  17. }
  18. if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
  19. return invoker.hashCode();
  20. }
  21. if ("equals".equals(methodName) && parameterTypes.length == 1) {
  22. return invoker.equals(args[0]);
  23. }
  24. // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
  25. return invoker.invoke(new RpcInvocation(method, args)).recreate();
  26. }
  27. }

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑。下面简单看一下:

  1. public class MockClusterInvoker<T> implements Invoker<T> {
  2. private final Invoker<T> invoker;
  3. public Result invoke(Invocation invocation) throws RpcException {
  4. Result result = null;
  5. // 获取 mock 配置值
  6. String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
  7. if (value.length() == 0 || value.equalsIgnoreCase("false")) {
  8. // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
  9. // 比如 FailoverClusterInvoker
  10. result = this.invoker.invoke(invocation);
  11. } else if (value.startsWith("force")) {
  12. // force:xxx 直接执行 mock 逻辑,不发起远程调用
  13. result = doMockInvoke(invocation, null);
  14. } else {
  15. // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
  16. try {
  17. // 调用其他 Invoker 对象的 invoke 方法
  18. result = this.invoker.invoke(invocation);
  19. } catch (RpcException e) {
  20. if (e.isBiz()) {
  21. throw e;
  22. } else {
  23. // 调用失败,执行 mock 逻辑
  24. result = doMockInvoke(invocation, e);
  25. }
  26. }
  27. }
  28. return result;
  29. }
  30. // 省略其他方法
  31. }

服务降级不是本文重点,因此这里就不分析 doMockInvoke 方法了。考虑到前文已经详细分析过 FailoverClusterInvoker,因此本节略过 FailoverClusterInvoker,直接分析 DubboInvoker。

  1. public abstract class AbstractInvoker<T> implements Invoker<T> {
  2. public Result invoke(Invocation inv) throws RpcException {
  3. if (destroyed.get()) {
  4. throw new RpcException("Rpc invoker for service ...");
  5. }
  6. RpcInvocation invocation = (RpcInvocation) inv;
  7. // 设置 Invoker
  8. invocation.setInvoker(this);
  9. if (attachment != null && attachment.size() > 0) {
  10. // 设置 attachment
  11. invocation.addAttachmentsIfAbsent(attachment);
  12. }
  13. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
  14. if (contextAttachments != null && contextAttachments.size() != 0) {
  15. // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
  16. invocation.addAttachments(contextAttachments);
  17. }
  18. if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
  19. // 设置异步信息到 RpcInvocation#attachment 中
  20. invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
  21. }
  22. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
  23. try {
  24. // 抽象方法,由子类实现
  25. return doInvoke(invocation);
  26. } catch (InvocationTargetException e) {
  27. // ...
  28. } catch (RpcException e) {
  29. // ...
  30. } catch (Throwable e) {
  31. return new RpcResult(e);
  32. }
  33. }
  34. protected abstract Result doInvoke(Invocation invocation) throws Throwable;
  35. // 省略其他方法
  36. }

上面的代码来自 AbstractInvoker 类,其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下。

  1. public class DubboInvoker<T> extends AbstractInvoker<T> {
  2. private final ExchangeClient[] clients;
  3. protected Result doInvoke(final Invocation invocation) throws Throwable {
  4. RpcInvocation inv = (RpcInvocation) invocation;
  5. final String methodName = RpcUtils.getMethodName(invocation);
  6. // 设置 path 和 version 到 attachment 中
  7. inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
  8. inv.setAttachment(Constants.VERSION_KEY, version);
  9. ExchangeClient currentClient;
  10. if (clients.length == 1) {
  11. // 从 clients 数组中获取 ExchangeClient
  12. currentClient = clients[0];
  13. } else {
  14. currentClient = clients[index.getAndIncrement() % clients.length];
  15. }
  16. try {
  17. // 获取异步配置
  18. boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
  19. // isOneway 为 true,表示“单向”通信
  20. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  21. int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  22. // 异步无返回值
  23. if (isOneway) {
  24. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
  25. // 发送请求
  26. currentClient.send(inv, isSent);
  27. // 设置上下文中的 future 字段为 null
  28. RpcContext.getContext().setFuture(null);
  29. // 返回一个空的 RpcResult
  30. return new RpcResult();
  31. }
  32. // 异步有返回值
  33. else if (isAsync) {
  34. // 发送请求,并得到一个 ResponseFuture 实例
  35. ResponseFuture future = currentClient.request(inv, timeout);
  36. // 设置 future 到上下文中
  37. RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
  38. // 暂时返回一个空结果
  39. return new RpcResult();
  40. }
  41. // 同步调用
  42. else {
  43. RpcContext.getContext().setFuture(null);
  44. // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
  45. return (Result) currentClient.request(inv, timeout).get();
  46. }
  47. } catch (TimeoutException e) {
  48. throw new RpcException(..., "Invoke remote method timeout....");
  49. } catch (RemotingException e) {
  50. throw new RpcException(..., "Failed to invoke remote method: ...");
  51. }
  52. }
  53. // 省略其他方法
  54. }

上面的代码包含了 Dubbo 对同步和异步调用的处理逻辑,搞懂了上面的代码,会对 Dubbo 的同步和异步调用方式有更深入的了解。Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。ResponseFuture 是一个接口,下面我们来看一下它的默认实现类 DefaultFuture 的源码。

  1. public class DefaultFuture implements ResponseFuture {
  2. private static final Map<Long, Channel> CHANNELS =
  3. new ConcurrentHashMap<Long, Channel>();
  4. private static final Map<Long, DefaultFuture> FUTURES =
  5. new ConcurrentHashMap<Long, DefaultFuture>();
  6. private final long id;
  7. private final Channel channel;
  8. private final Request request;
  9. private final int timeout;
  10. private final Lock lock = new ReentrantLock();
  11. private final Condition done = lock.newCondition();
  12. private volatile Response response;
  13. public DefaultFuture(Channel channel, Request request, int timeout) {
  14. this.channel = channel;
  15. this.request = request;
  16. // 获取请求 id,这个 id 很重要,后面还会见到
  17. this.id = request.getId();
  18. this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  19. // 存储 <requestId, DefaultFuture> 映射关系到 FUTURES 中
  20. FUTURES.put(id, this);
  21. CHANNELS.put(id, channel);
  22. }
  23. @Override
  24. public Object get() throws RemotingException {
  25. return get(timeout);
  26. }
  27. @Override
  28. public Object get(int timeout) throws RemotingException {
  29. if (timeout <= 0) {
  30. timeout = Constants.DEFAULT_TIMEOUT;
  31. }
  32. // 检测服务提供方是否成功返回了调用结果
  33. if (!isDone()) {
  34. long start = System.currentTimeMillis();
  35. lock.lock();
  36. try {
  37. // 循环检测服务提供方是否成功返回了调用结果
  38. while (!isDone()) {
  39. // 如果调用结果尚未返回,这里等待一段时间
  40. done.await(timeout, TimeUnit.MILLISECONDS);
  41. // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑
  42. if (isDone() || System.currentTimeMillis() - start > timeout) {
  43. break;
  44. }
  45. }
  46. } catch (InterruptedException e) {
  47. throw new RuntimeException(e);
  48. } finally {
  49. lock.unlock();
  50. }
  51. // 如果调用结果仍未返回,则抛出超时异常
  52. if (!isDone()) {
  53. throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
  54. }
  55. }
  56. // 返回调用结果
  57. return returnFromResponse();
  58. }
  59. @Override
  60. public boolean isDone() {
  61. // 通过检测 response 字段为空与否,判断是否收到了调用结果
  62. return response != null;
  63. }
  64. private Object returnFromResponse() throws RemotingException {
  65. Response res = response;
  66. if (res == null) {
  67. throw new IllegalStateException("response cannot be null");
  68. }
  69. // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果
  70. if (res.getStatus() == Response.OK) {
  71. return res.getResult();
  72. }
  73. // 抛出异常
  74. if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
  75. throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
  76. }
  77. throw new RemotingException(channel, res.getErrorMessage());
  78. }
  79. // 省略其他方法
  80. }

如上,当服务消费者还未接收到调用结果时,用户线程调用 get 方法会被阻塞住。同步调用模式下,框架获得 DefaultFuture 对象后,会立即调用 get 方法进行等待。而异步模式下则是将该对象封装到 FutureAdapter 实例中,并将 FutureAdapter 实例设置到 RpcContext 中,供用户使用。FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配,最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法。

到这里关于 Dubbo 几种调用方式的代码逻辑就分析完了,下面来分析请求数据的发送与接收,以及响应数据的发送与接收过程。