2.3 服务提供方接收请求

前面说过,默认情况下 Dubbo 使用 Netty 作为底层的通信框架。Netty 检测到有数据入站后,首先会通过解码器对数据进行解码,并将解码后的数据传递给下一个入站处理器的指定方法。所以在进行后续的分析之前,我们先来看一下数据解码过程。

2.3.1 请求解码

这里直接分析请求数据的解码逻辑,忽略中间过程,如下:

  1. public class ExchangeCodec extends TelnetCodec {
  2. @Override
  3. public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
  4. int readable = buffer.readableBytes();
  5. // 创建消息头字节数组
  6. byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
  7. // 读取消息头数据
  8. buffer.readBytes(header);
  9. // 调用重载方法进行后续解码工作
  10. return decode(channel, buffer, readable, header);
  11. }
  12. @Override
  13. protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
  14. // 检查魔数是否相等
  15. if (readable > 0 && header[0] != MAGIC_HIGH
  16. || readable > 1 && header[1] != MAGIC_LOW) {
  17. int length = header.length;
  18. if (header.length < readable) {
  19. header = Bytes.copyOf(header, readable);
  20. buffer.readBytes(header, length, readable - length);
  21. }
  22. for (int i = 1; i < header.length - 1; i++) {
  23. if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
  24. buffer.readerIndex(buffer.readerIndex() - header.length + i);
  25. header = Bytes.copyOf(header, i);
  26. break;
  27. }
  28. }
  29. // 通过 telnet 命令行发送的数据包不包含消息头,所以这里
  30. // 调用 TelnetCodec 的 decode 方法对数据包进行解码
  31. return super.decode(channel, buffer, readable, header);
  32. }
  33. // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
  34. if (readable < HEADER_LENGTH) {
  35. return DecodeResult.NEED_MORE_INPUT;
  36. }
  37. // 从消息头中获取消息体长度
  38. int len = Bytes.bytes2int(header, 12);
  39. // 检测消息体长度是否超出限制,超出则抛出异常
  40. checkPayload(channel, len);
  41. int tt = len + HEADER_LENGTH;
  42. // 检测可读的字节数是否小于实际的字节数
  43. if (readable < tt) {
  44. return DecodeResult.NEED_MORE_INPUT;
  45. }
  46. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
  47. try {
  48. // 继续进行解码工作
  49. return decodeBody(channel, is, header);
  50. } finally {
  51. if (is.available() > 0) {
  52. try {
  53. StreamUtils.skipUnusedStream(is);
  54. } catch (IOException e) {
  55. logger.warn(e.getMessage(), e);
  56. }
  57. }
  58. }
  59. }
  60. }

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。下面我们来看一下该方法的代码。

  1. public class DubboCodec extends ExchangeCodec implements Codec2 {
  2. @Override
  3. protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
  4. // 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
  5. byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
  6. Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
  7. // 获取调用编号
  8. long id = Bytes.bytes2long(header, 4);
  9. // 通过逻辑与运算得到调用类型,0 - Response,1 - Request
  10. if ((flag & FLAG_REQUEST) == 0) {
  11. // 对响应结果进行解码,得到 Response 对象。这个非本节内容,后面再分析
  12. // ...
  13. } else {
  14. // 创建 Request 对象
  15. Request req = new Request(id);
  16. req.setVersion(Version.getProtocolVersion());
  17. // 通过逻辑与运算得到通信方式,并设置到 Request 对象中
  18. req.setTwoWay((flag & FLAG_TWOWAY) != 0);
  19. // 通过位运算检测数据包是否为事件类型
  20. if ((flag & FLAG_EVENT) != 0) {
  21. // 设置心跳事件到 Request 对象中
  22. req.setEvent(Request.HEARTBEAT_EVENT);
  23. }
  24. try {
  25. Object data;
  26. if (req.isHeartbeat()) {
  27. // 对心跳包进行解码,该方法已被标注为废弃
  28. data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
  29. } else if (req.isEvent()) {
  30. // 对事件数据进行解码
  31. data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
  32. } else {
  33. DecodeableRpcInvocation inv;
  34. // 根据 url 参数判断是否在 IO 线程上对消息体进行解码
  35. if (channel.getUrl().getParameter(
  36. Constants.DECODE_IN_IO_THREAD_KEY,
  37. Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
  38. inv = new DecodeableRpcInvocation(channel, req, is, proto);
  39. // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
  40. // 调用方法名、attachment、以及调用参数解析出来
  41. inv.decode();
  42. } else {
  43. // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
  44. inv = new DecodeableRpcInvocation(channel, req,
  45. new UnsafeByteArrayInputStream(readMessageData(is)), proto);
  46. }
  47. data = inv;
  48. }
  49. // 设置 data 到 Request 对象中
  50. req.setData(data);
  51. } catch (Throwable t) {
  52. // 若解码过程中出现异常,则将 broken 字段设为 true,
  53. // 并将异常对象设置到 Reqeust 对象中
  54. req.setBroken(true);
  55. req.setData(t);
  56. }
  57. return req;
  58. }
  59. }
  60. }

如上,decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation 的 decode 方法进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及调用参数解析出来。下面我们来看一下 DecodeableRpcInvocation 的 decode 方法逻辑。

  1. public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
  2. @Override
  3. public Object decode(Channel channel, InputStream input) throws IOException {
  4. ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
  5. .deserialize(channel.getUrl(), input);
  6. // 通过反序列化得到 dubbo version,并保存到 attachments 变量中
  7. String dubboVersion = in.readUTF();
  8. request.setVersion(dubboVersion);
  9. setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
  10. // 通过反序列化得到 path,version,并保存到 attachments 变量中
  11. setAttachment(Constants.PATH_KEY, in.readUTF());
  12. setAttachment(Constants.VERSION_KEY, in.readUTF());
  13. // 通过反序列化得到调用方法名
  14. setMethodName(in.readUTF());
  15. try {
  16. Object[] args;
  17. Class<?>[] pts;
  18. // 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
  19. String desc = in.readUTF();
  20. if (desc.length() == 0) {
  21. pts = DubboCodec.EMPTY_CLASS_ARRAY;
  22. args = DubboCodec.EMPTY_OBJECT_ARRAY;
  23. } else {
  24. // 将 desc 解析为参数类型数组
  25. pts = ReflectUtils.desc2classArray(desc);
  26. args = new Object[pts.length];
  27. for (int i = 0; i < args.length; i++) {
  28. try {
  29. // 解析运行时参数
  30. args[i] = in.readObject(pts[i]);
  31. } catch (Exception e) {
  32. if (log.isWarnEnabled()) {
  33. log.warn("Decode argument failed: " + e.getMessage(), e);
  34. }
  35. }
  36. }
  37. }
  38. // 设置参数类型数组
  39. setParameterTypes(pts);
  40. // 通过反序列化得到原 attachment 的内容
  41. Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
  42. if (map != null && map.size() > 0) {
  43. Map<String, String> attachment = getAttachments();
  44. if (attachment == null) {
  45. attachment = new HashMap<String, String>();
  46. }
  47. // 将 map 与当前对象中的 attachment 集合进行融合
  48. attachment.putAll(map);
  49. setAttachments(attachment);
  50. }
  51. // 对 callback 类型的参数进行处理
  52. for (int i = 0; i < args.length; i++) {
  53. args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
  54. }
  55. // 设置参数列表
  56. setArguments(args);
  57. } catch (ClassNotFoundException e) {
  58. throw new IOException(StringUtils.toString("Read invocation data failed.", e));
  59. } finally {
  60. if (in instanceof Cleanable) {
  61. ((Cleanable) in).cleanup();
  62. }
  63. }
  64. return this;
  65. }
  66. }

上面的方法通过反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

到这里,请求数据解码的过程就分析完了。此时我们得到了一个 Request 对象,这个对象会被传送到下一个入站处理器中,我们继续往下看。

2.3.2 调用服务

解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。整个调用栈如下:

  1. NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  2. —> AbstractPeer#received(Channel, Object)
  3. —> MultiMessageHandler#received(Channel, Object)
  4. —> HeartbeatHandler#received(Channel, Object)
  5. —> AllChannelHandler#received(Channel, Object)
  6. —> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑

考虑到篇幅,以及很多中间调用的逻辑并非十分重要,所以这里就不对调用栈中的每个方法都进行分析了。这里我们直接分析调用栈中的分析第一个和最后一个调用方法逻辑。如下:

  1. @Sharable
  2. public class NettyHandler extends SimpleChannelHandler {
  3. private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
  4. private final URL url;
  5. private final ChannelHandler handler;
  6. public NettyHandler(URL url, ChannelHandler handler) {
  7. if (url == null) {
  8. throw new IllegalArgumentException("url == null");
  9. }
  10. if (handler == null) {
  11. throw new IllegalArgumentException("handler == null");
  12. }
  13. this.url = url;
  14. // 这里的 handler 类型为 NettyServer
  15. this.handler = handler;
  16. }
  17. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  18. // 获取 NettyChannel
  19. NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
  20. try {
  21. // 继续向下调用
  22. handler.received(channel, e.getMessage());
  23. } finally {
  24. NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
  25. }
  26. }
  27. }

如上,NettyHandler 中的 messageReceived 逻辑比较简单。首先根据一些信息获取 NettyChannel 实例,然后将 NettyChannel 实例以及 Request 对象向下传递。下面再来看看 AllChannelHandler 的逻辑,在详细分析代码之前,我们先来了解一下 Dubbo 中的线程派发模型。

2.3.2.1 线程派发模型

Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。

以上就是线程派发的背景,下面我们再来通过 Dubbo 调用图,看一下线程派发器所处的位置。

2.3 服务提供方接收请求 - 图1

如上图,红框中的 Dispatcher 就是线程派发器。需要说明的是,Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略,下面通过一个表格列举一下。

策略用途
all所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct所有消息都不派发到线程池,全部在 IO 线程上直接执行
message只有请求响应消息派发到线程池,其它消息均在 IO 线程上执行
execution只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池

默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中。下面我们来分析一下 AllChannelHandler 的代码。

  1. public class AllChannelHandler extends WrappedChannelHandler {
  2. public AllChannelHandler(ChannelHandler handler, URL url) {
  3. super(handler, url);
  4. }
  5. /** 处理连接事件 */
  6. @Override
  7. public void connected(Channel channel) throws RemotingException {
  8. // 获取线程池
  9. ExecutorService cexecutor = getExecutorService();
  10. try {
  11. // 将连接事件派发到线程池中处理
  12. cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
  13. } catch (Throwable t) {
  14. throw new ExecutionException(..., " error when process connected event .", t);
  15. }
  16. }
  17. /** 处理断开事件 */
  18. @Override
  19. public void disconnected(Channel channel) throws RemotingException {
  20. ExecutorService cexecutor = getExecutorService();
  21. try {
  22. cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
  23. } catch (Throwable t) {
  24. throw new ExecutionException(..., "error when process disconnected event .", t);
  25. }
  26. }
  27. /** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
  28. @Override
  29. public void received(Channel channel, Object message) throws RemotingException {
  30. ExecutorService cexecutor = getExecutorService();
  31. try {
  32. // 将请求和响应消息派发到线程池中处理
  33. cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
  34. } catch (Throwable t) {
  35. if(message instanceof Request && t instanceof RejectedExecutionException){
  36. Request request = (Request)message;
  37. // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
  38. // 错误信息封装到 Response 中,并返回给服务消费方。
  39. if(request.isTwoWay()){
  40. String msg = "Server side(" + url.getIp() + "," + url.getPort()
  41. + ") threadpool is exhausted ,detail msg:" + t.getMessage();
  42. Response response = new Response(request.getId(), request.getVersion());
  43. response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
  44. response.setErrorMessage(msg);
  45. // 返回包含错误信息的 Response 对象
  46. channel.send(response);
  47. return;
  48. }
  49. }
  50. throw new ExecutionException(..., " error when process received event .", t);
  51. }
  52. }
  53. /** 处理异常信息 */
  54. @Override
  55. public void caught(Channel channel, Throwable exception) throws RemotingException {
  56. ExecutorService cexecutor = getExecutorService();
  57. try {
  58. cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
  59. } catch (Throwable t) {
  60. throw new ExecutionException(..., "error when process caught event ...");
  61. }
  62. }
  63. }

如上,请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。所以接下来我们以 ChannelEventRunnable 为起点向下探索。

2.3.2.2 调用服务

本小节,我们从 ChannelEventRunnable 开始分析,该类的主要代码如下:

  1. public class ChannelEventRunnable implements Runnable {
  2. private final ChannelHandler handler;
  3. private final Channel channel;
  4. private final ChannelState state;
  5. private final Throwable exception;
  6. private final Object message;
  7. @Override
  8. public void run() {
  9. // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
  10. if (state == ChannelState.RECEIVED) {
  11. try {
  12. // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
  13. handler.received(channel, message);
  14. } catch (Exception e) {
  15. logger.warn("... operation error, channel is ... message is ...");
  16. }
  17. }
  18. // 其他消息类型通过 switch 进行处理
  19. else {
  20. switch (state) {
  21. case CONNECTED:
  22. try {
  23. handler.connected(channel);
  24. } catch (Exception e) {
  25. logger.warn("... operation error, channel is ...");
  26. }
  27. break;
  28. case DISCONNECTED:
  29. // ...
  30. case SENT:
  31. // ...
  32. case CAUGHT:
  33. // ...
  34. default:
  35. logger.warn("unknown state: " + state + ", message is " + message);
  36. }
  37. }
  38. }
  39. }

如上,请求和响应消息出现频率明显比其他类型消息高,所以这里对该类型的消息进行了针对性判断。ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。

  1. public class DecodeHandler extends AbstractChannelHandlerDelegate {
  2. public DecodeHandler(ChannelHandler handler) {
  3. super(handler);
  4. }
  5. @Override
  6. public void received(Channel channel, Object message) throws RemotingException {
  7. if (message instanceof Decodeable) {
  8. // 对 Decodeable 接口实现类对象进行解码
  9. decode(message);
  10. }
  11. if (message instanceof Request) {
  12. // 对 Request 的 data 字段进行解码
  13. decode(((Request) message).getData());
  14. }
  15. if (message instanceof Response) {
  16. // 对 Request 的 result 字段进行解码
  17. decode(((Response) message).getResult());
  18. }
  19. // 执行后续逻辑
  20. handler.received(channel, message);
  21. }
  22. private void decode(Object message) {
  23. // Decodeable 接口目前有两个实现类,
  24. // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
  25. if (message != null && message instanceof Decodeable) {
  26. try {
  27. // 执行解码逻辑
  28. ((Decodeable) message).decode();
  29. } catch (Throwable e) {
  30. if (log.isWarnEnabled()) {
  31. log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
  32. }
  33. }
  34. }
  35. }
  36. }

DecodeHandler 主要是包含了一些解码逻辑。2.2.1 节分析请求解码时说过,请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时配置。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。

  1. public class HeaderExchangeHandler implements ChannelHandlerDelegate {
  2. private final ExchangeHandler handler;
  3. public HeaderExchangeHandler(ExchangeHandler handler) {
  4. if (handler == null) {
  5. throw new IllegalArgumentException("handler == null");
  6. }
  7. this.handler = handler;
  8. }
  9. @Override
  10. public void received(Channel channel, Object message) throws RemotingException {
  11. channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
  12. ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
  13. try {
  14. // 处理请求对象
  15. if (message instanceof Request) {
  16. Request request = (Request) message;
  17. if (request.isEvent()) {
  18. // 处理事件
  19. handlerEvent(channel, request);
  20. }
  21. // 处理普通的请求
  22. else {
  23. // 双向通信
  24. if (request.isTwoWay()) {
  25. // 向后调用服务,并得到调用结果
  26. Response response = handleRequest(exchangeChannel, request);
  27. // 将调用结果返回给服务消费端
  28. channel.send(response);
  29. }
  30. // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
  31. else {
  32. handler.received(exchangeChannel, request.getData());
  33. }
  34. }
  35. }
  36. // 处理响应对象,服务消费方会执行此处逻辑,后面分析
  37. else if (message instanceof Response) {
  38. handleResponse(channel, (Response) message);
  39. } else if (message instanceof String) {
  40. // telnet 相关,忽略
  41. } else {
  42. handler.received(exchangeChannel, message);
  43. }
  44. } finally {
  45. HeaderExchangeChannel.removeChannelIfDisconnected(channel);
  46. }
  47. }
  48. Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
  49. Response res = new Response(req.getId(), req.getVersion());
  50. // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
  51. if (req.isBroken()) {
  52. Object data = req.getData();
  53. String msg;
  54. if (data == null)
  55. msg = null;
  56. else if
  57. (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
  58. else
  59. msg = data.toString();
  60. res.setErrorMessage("Fail to decode request due to: " + msg);
  61. // 设置 BAD_REQUEST 状态
  62. res.setStatus(Response.BAD_REQUEST);
  63. return res;
  64. }
  65. // 获取 data 字段值,也就是 RpcInvocation 对象
  66. Object msg = req.getData();
  67. try {
  68. // 继续向下调用
  69. Object result = handler.reply(channel, msg);
  70. // 设置 OK 状态码
  71. res.setStatus(Response.OK);
  72. // 设置调用结果
  73. res.setResult(result);
  74. } catch (Throwable e) {
  75. // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
  76. res.setStatus(Response.SERVICE_ERROR);
  77. res.setErrorMessage(StringUtils.toString(e));
  78. }
  79. return res;
  80. }
  81. }

到这里,我们看到了比较清晰的请求和响应逻辑。对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。接下来我们继续向后分析,把剩余的调用过程分析完。下面分析定义在 DubboProtocol 类中的匿名类对象逻辑,如下:

  1. public class DubboProtocol extends AbstractProtocol {
  2. public static final String NAME = "dubbo";
  3. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
  4. @Override
  5. public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
  6. if (message instanceof Invocation) {
  7. Invocation inv = (Invocation) message;
  8. // 获取 Invoker 实例
  9. Invoker<?> invoker = getInvoker(channel, inv);
  10. if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
  11. // 回调相关,忽略
  12. }
  13. RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
  14. // 通过 Invoker 调用具体的服务
  15. return invoker.invoke(inv);
  16. }
  17. throw new RemotingException(channel, "Unsupported request: ...");
  18. }
  19. // 忽略其他方法
  20. }
  21. Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
  22. // 忽略回调和本地存根相关逻辑
  23. // ...
  24. int port = channel.getLocalAddress().getPort();
  25. // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
  26. // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
  27. String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
  28. // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
  29. // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
  30. DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
  31. if (exporter == null)
  32. throw new RemotingException(channel, "Not found exported service ...");
  33. // 获取 Invoker 对象,并返回
  34. return exporter.getInvoker();
  35. }
  36. // 忽略其他方法
  37. }

以上逻辑用于获取与指定服务对应的 Invoker 实例,并通过 Invoker 的 invoke 方法调用服务逻辑。invoke 方法定义在 AbstractProxyInvoker 中,代码如下。

  1. public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
  2. @Override
  3. public Result invoke(Invocation invocation) throws RpcException {
  4. try {
  5. // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并
  6. return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
  7. } catch (InvocationTargetException e) {
  8. return new RpcResult(e.getTargetException());
  9. } catch (Throwable e) {
  10. throw new RpcException("Failed to invoke remote proxy method ...");
  11. }
  12. }
  13. protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
  14. }

如上,doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下:

  1. public class JavassistProxyFactory extends AbstractProxyFactory {
  2. // 省略其他方法
  3. @Override
  4. public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
  5. final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
  6. // 创建匿名类对象
  7. return new AbstractProxyInvoker<T>(proxy, type, url) {
  8. @Override
  9. protected Object doInvoke(T proxy, String methodName,
  10. Class<?>[] parameterTypes,
  11. Object[] arguments) throws Throwable {
  12. // 调用 invokeMethod 方法进行后续的调用
  13. return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
  14. }
  15. };
  16. }
  17. }

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

  1. /** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
  2. public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
  3. public static String[] pns;
  4. public static Map pts;
  5. public static String[] mns;
  6. public static String[] dmns;
  7. public static Class[] mts0;
  8. // 省略其他方法
  9. public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
  10. DemoService demoService;
  11. try {
  12. // 类型转换
  13. demoService = (DemoService)object;
  14. }
  15. catch (Throwable throwable) {
  16. throw new IllegalArgumentException(throwable);
  17. }
  18. try {
  19. // 根据方法名调用指定的方法
  20. if ("sayHello".equals(string) && arrclass.length == 1) {
  21. return demoService.sayHello((String)arrobject[0]);
  22. }
  23. }
  24. catch (Throwable throwable) {
  25. throw new InvocationTargetException(throwable);
  26. }
  27. throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
  28. }
  29. }

到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:

  1. ChannelEventRunnable#run()
  2. —> DecodeHandler#received(Channel, Object)
  3. —> HeaderExchangeHandler#received(Channel, Object)
  4. —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
  5. —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
  6. —> Filter#invoke(Invoker, Invocation)
  7. —> AbstractProxyInvoker#invoke(Invocation)
  8. —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
  9. —> DemoServiceImpl#sayHello(String)