2.2 服务消费方发送请求

2.2.1 发送请求

本节我们来看一下同步调用模式下,服务消费方是如何发送调用请求的。在深入分析源码前,我们先来看一张图。

2.2 服务消费方发送请求 - 图1

这张图展示了服务消费方发送请求过程的部分调用栈,略为复杂。从上图可以看出,经过多次调用后,才将请求数据送至 Netty NioClientSocketChannel。这样做的原因是通过 Exchange 层为框架引入 Request 和 Response 语义,这一点会在接下来的源码分析过程中会看到。其他的就不多说了,下面开始进行分析。首先分析 ReferenceCountExchangeClient 的源码。

  1. final class ReferenceCountExchangeClient implements ExchangeClient {
  2. private final URL url;
  3. private final AtomicInteger referenceCount = new AtomicInteger(0);
  4. public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
  5. this.client = client;
  6. // 引用计数自增
  7. referenceCount.incrementAndGet();
  8. this.url = client.getUrl();
  9. // ...
  10. }
  11. @Override
  12. public ResponseFuture request(Object request) throws RemotingException {
  13. // 直接调用被装饰对象的同签名方法
  14. return client.request(request);
  15. }
  16. @Override
  17. public ResponseFuture request(Object request, int timeout) throws RemotingException {
  18. // 直接调用被装饰对象的同签名方法
  19. return client.request(request, timeout);
  20. }
  21. /** 引用计数自增,该方法由外部调用 */
  22. public void incrementAndGetCount() {
  23. // referenceCount 自增
  24. referenceCount.incrementAndGet();
  25. }
  26. @Override
  27. public void close(int timeout) {
  28. // referenceCount 自减
  29. if (referenceCount.decrementAndGet() <= 0) {
  30. if (timeout == 0) {
  31. client.close();
  32. } else {
  33. client.close(timeout);
  34. }
  35. client = replaceWithLazyClient();
  36. }
  37. }
  38. // 省略部分方法
  39. }

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。

  1. public class HeaderExchangeClient implements ExchangeClient {
  2. private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
  3. private final Client client;
  4. private final ExchangeChannel channel;
  5. private ScheduledFuture<?> heartbeatTimer;
  6. private int heartbeat;
  7. private int heartbeatTimeout;
  8. public HeaderExchangeClient(Client client, boolean needHeartbeat) {
  9. if (client == null) {
  10. throw new IllegalArgumentException("client == null");
  11. }
  12. this.client = client;
  13. // 创建 HeaderExchangeChannel 对象
  14. this.channel = new HeaderExchangeChannel(client);
  15. // 以下代码均与心跳检测逻辑有关
  16. String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
  17. this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
  18. this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
  19. if (heartbeatTimeout < heartbeat * 2) {
  20. throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
  21. }
  22. if (needHeartbeat) {
  23. // 开启心跳检测定时器
  24. startHeartbeatTimer();
  25. }
  26. }
  27. @Override
  28. public ResponseFuture request(Object request) throws RemotingException {
  29. // 直接 HeaderExchangeChannel 对象的同签名方法
  30. return channel.request(request);
  31. }
  32. @Override
  33. public ResponseFuture request(Object request, int timeout) throws RemotingException {
  34. // 直接 HeaderExchangeChannel 对象的同签名方法
  35. return channel.request(request, timeout);
  36. }
  37. @Override
  38. public void close() {
  39. doClose();
  40. channel.close();
  41. }
  42. private void doClose() {
  43. // 停止心跳检测定时器
  44. stopHeartbeatTimer();
  45. }
  46. private void startHeartbeatTimer() {
  47. stopHeartbeatTimer();
  48. if (heartbeat > 0) {
  49. heartbeatTimer = scheduled.scheduleWithFixedDelay(
  50. new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
  51. @Override
  52. public Collection<Channel> getChannels() {
  53. return Collections.<Channel>singletonList(HeaderExchangeClient.this);
  54. }
  55. }, heartbeat, heartbeatTimeout),
  56. heartbeat, heartbeat, TimeUnit.MILLISECONDS);
  57. }
  58. }
  59. private void stopHeartbeatTimer() {
  60. if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
  61. try {
  62. heartbeatTimer.cancel(true);
  63. scheduled.purge();
  64. } catch (Throwable e) {
  65. if (logger.isWarnEnabled()) {
  66. logger.warn(e.getMessage(), e);
  67. }
  68. }
  69. }
  70. heartbeatTimer = null;
  71. }
  72. // 省略部分方法
  73. }

HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。

  1. final class HeaderExchangeChannel implements ExchangeChannel {
  2. private final Channel channel;
  3. HeaderExchangeChannel(Channel channel) {
  4. if (channel == null) {
  5. throw new IllegalArgumentException("channel == null");
  6. }
  7. // 这里的 channel 指向的是 NettyClient
  8. this.channel = channel;
  9. }
  10. @Override
  11. public ResponseFuture request(Object request) throws RemotingException {
  12. return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
  13. }
  14. @Override
  15. public ResponseFuture request(Object request, int timeout) throws RemotingException {
  16. if (closed) {
  17. throw new RemotingException(..., "Failed to send request ...);
  18. }
  19. // 创建 Request 对象
  20. Request req = new Request();
  21. req.setVersion(Version.getProtocolVersion());
  22. // 设置双向通信标志为 true
  23. req.setTwoWay(true);
  24. // 这里的 request 变量类型为 RpcInvocation
  25. req.setData(request);
  26. // 创建 DefaultFuture 对象
  27. DefaultFuture future = new DefaultFuture(channel, req, timeout);
  28. try {
  29. // 调用 NettyClient 的 send 方法发送请求
  30. channel.send(req);
  31. } catch (RemotingException e) {
  32. future.cancel();
  33. throw e;
  34. }
  35. // 返回 DefaultFuture 对象
  36. return future;
  37. }
  38. }

到这里大家终于看到了 Request 语义了,上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient 的 send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。

  1. public abstract class AbstractPeer implements Endpoint, ChannelHandler {
  2. @Override
  3. public void send(Object message) throws RemotingException {
  4. // 该方法由 AbstractClient 类实现
  5. send(message, url.getParameter(Constants.SENT_KEY, false));
  6. }
  7. // 省略其他方法
  8. }
  9. public abstract class AbstractClient extends AbstractEndpoint implements Client {
  10. @Override
  11. public void send(Object message, boolean sent) throws RemotingException {
  12. if (send_reconnect && !isConnected()) {
  13. connect();
  14. }
  15. // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
  16. Channel channel = getChannel();
  17. if (channel == null || !channel.isConnected()) {
  18. throw new RemotingException(this, "message can not send ...");
  19. }
  20. // 继续向下调用
  21. channel.send(message, sent);
  22. }
  23. protected abstract Channel getChannel();
  24. // 省略其他方法
  25. }

默认情况下,Dubbo 使用 Netty 作为底层的通信框架,因此下面我们到 NettyClient 类中看一下 getChannel 方法的实现逻辑。

  1. public class NettyClient extends AbstractClient {
  2. // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
  3. private volatile Channel channel;
  4. @Override
  5. protected com.alibaba.dubbo.remoting.Channel getChannel() {
  6. Channel c = channel;
  7. if (c == null || !c.isConnected())
  8. return null;
  9. // 获取一个 NettyChannel 类型对象
  10. return NettyChannel.getOrAddChannel(c, getUrl(), this);
  11. }
  12. }
  13. final class NettyChannel extends AbstractChannel {
  14. private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
  15. new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
  16. private final org.jboss.netty.channel.Channel channel;
  17. /** 私有构造方法 */
  18. private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
  19. super(url, handler);
  20. if (channel == null) {
  21. throw new IllegalArgumentException("netty channel == null;");
  22. }
  23. this.channel = channel;
  24. }
  25. static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
  26. if (ch == null) {
  27. return null;
  28. }
  29. // 尝试从集合中获取 NettyChannel 实例
  30. NettyChannel ret = channelMap.get(ch);
  31. if (ret == null) {
  32. // 如果 ret = null,则创建一个新的 NettyChannel 实例
  33. NettyChannel nc = new NettyChannel(ch, url, handler);
  34. if (ch.isConnected()) {
  35. // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
  36. ret = channelMap.putIfAbsent(ch, nc);
  37. }
  38. if (ret == null) {
  39. ret = nc;
  40. }
  41. }
  42. return ret;
  43. }
  44. }

获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannel 的 send 方法。

  1. public void send(Object message, boolean sent) throws RemotingException {
  2. super.send(message, sent);
  3. boolean success = true;
  4. int timeout = 0;
  5. try {
  6. // 发送消息(包含请求和响应消息)
  7. ChannelFuture future = channel.write(message);
  8. // sent 的值源于 &lt;dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
  9. // 1. true: 等待消息发出,消息发送失败将抛出异常
  10. // 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
  11. // 默认情况下 sent = false;
  12. if (sent) {
  13. timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  14. // 等待消息发出,若在规定时间没能发出,success 会被置为 false
  15. success = future.await(timeout);
  16. }
  17. Throwable cause = future.getCause();
  18. if (cause != null) {
  19. throw cause;
  20. }
  21. } catch (Throwable e) {
  22. throw new RemotingException(this, "Failed to send message ...");
  23. }
  24. // 若 success 为 false,这里抛出异常
  25. if (!success) {
  26. throw new RemotingException(this, "Failed to send message ...");
  27. }
  28. }

经历多次调用,到这里请求数据的发送过程就结束了,过程漫长。为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。

  1. proxy0#sayHello(String)
  2. —> InvokerInvocationHandler#invoke(Object, Method, Object[])
  3. —> MockClusterInvoker#invoke(Invocation)
  4. —> AbstractClusterInvoker#invoke(Invocation)
  5. —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
  6. —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
  7. —> ListenerInvokerWrapper#invoke(Invocation)
  8. —> AbstractInvoker#invoke(Invocation)
  9. —> DubboInvoker#doInvoke(Invocation)
  10. —> ReferenceCountExchangeClient#request(Object, int)
  11. —> HeaderExchangeClient#request(Object, int)
  12. —> HeaderExchangeChannel#request(Object, int)
  13. —> AbstractPeer#send(Object)
  14. —> AbstractClient#send(Object, boolean)
  15. —> NettyChannel#send(Object, boolean)
  16. —> NioClientSocketChannel#write(Object)

在 Netty 中,出站数据在发出之前还需要进行编码操作,接下来我们来分析一下请求数据的编码逻辑。

2.2.2 请求编码

在分析请求编码逻辑之前,我们先来看一下 Dubbo 数据包结构。

2.2 服务消费方发送请求 - 图2

Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。

偏移量(Bit)字段取值
0 ~ 7魔数高位0xda00
8 ~ 15魔数低位0xbb
16数据包类型0 - Response, 1 - Request
17调用方式仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18事件标识0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23序列化器编号2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~ 31状态20 - OK
30 - CLIENT_TIMEOUT
31 - SERVER_TIMEOUT
40 - BAD_REQUEST
50 - BAD_RESPONSE
……
32 ~ 95请求编号共8字节,运行时生成
96 ~ 127消息体长度运行时计算

了解了 Dubbo 数据包格式,接下来我们就可以探索编码过程了。这次我们开门见山,直接分析编码逻辑所在类。如下:

  1. public class ExchangeCodec extends TelnetCodec {
  2. // 消息头长度
  3. protected static final int HEADER_LENGTH = 16;
  4. // 魔数内容
  5. protected static final short MAGIC = (short) 0xdabb;
  6. protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
  7. protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
  8. protected static final byte FLAG_REQUEST = (byte) 0x80;
  9. protected static final byte FLAG_TWOWAY = (byte) 0x40;
  10. protected static final byte FLAG_EVENT = (byte) 0x20;
  11. protected static final int SERIALIZATION_MASK = 0x1f;
  12. private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
  13. public Short getMagicCode() {
  14. return MAGIC;
  15. }
  16. @Override
  17. public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
  18. if (msg instanceof Request) {
  19. // 对 Request 对象进行编码
  20. encodeRequest(channel, buffer, (Request) msg);
  21. } else if (msg instanceof Response) {
  22. // 对 Response 对象进行编码,后面分析
  23. encodeResponse(channel, buffer, (Response) msg);
  24. } else {
  25. super.encode(channel, buffer, msg);
  26. }
  27. }
  28. protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
  29. Serialization serialization = getSerialization(channel);
  30. // 创建消息头字节数组,长度为 16
  31. byte[] header = new byte[HEADER_LENGTH];
  32. // 设置魔数
  33. Bytes.short2bytes(MAGIC, header);
  34. // 设置数据包类型(Request/Response)和序列化器编号
  35. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
  36. // 设置通信方式(单向/双向)
  37. if (req.isTwoWay()) {
  38. header[2] |= FLAG_TWOWAY;
  39. }
  40. // 设置事件标识
  41. if (req.isEvent()) {
  42. header[2] |= FLAG_EVENT;
  43. }
  44. // 设置请求编号,8个字节,从第4个字节开始设置
  45. Bytes.long2bytes(req.getId(), header, 4);
  46. // 获取 buffer 当前的写位置
  47. int savedWriteIndex = buffer.writerIndex();
  48. // 更新 writerIndex,为消息头预留 16 个字节的空间
  49. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
  50. ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
  51. // 创建序列化器,比如 Hessian2ObjectOutput
  52. ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
  53. if (req.isEvent()) {
  54. // 对事件数据进行序列化操作
  55. encodeEventData(channel, out, req.getData());
  56. } else {
  57. // 对请求数据进行序列化操作
  58. encodeRequestData(channel, out, req.getData(), req.getVersion());
  59. }
  60. out.flushBuffer();
  61. if (out instanceof Cleanable) {
  62. ((Cleanable) out).cleanup();
  63. }
  64. bos.flush();
  65. bos.close();
  66. // 获取写入的字节数,也就是消息体长度
  67. int len = bos.writtenBytes();
  68. checkPayload(channel, len);
  69. // 将消息体长度写入到消息头中
  70. Bytes.int2bytes(len, header, 12);
  71. // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
  72. buffer.writerIndex(savedWriteIndex);
  73. // 从 savedWriteIndex 下标处写入消息头
  74. buffer.writeBytes(header);
  75. // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
  76. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
  77. }
  78. // 省略其他方法
  79. }

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:

  1. public class DubboCodec extends ExchangeCodec implements Codec2 {
  2. protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
  3. RpcInvocation inv = (RpcInvocation) data;
  4. // 依次序列化 dubbo version、path、version
  5. out.writeUTF(version);
  6. out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
  7. out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
  8. // 序列化调用方法名
  9. out.writeUTF(inv.getMethodName());
  10. // 将参数类型转换为字符串,并进行序列化
  11. out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
  12. Object[] args = inv.getArguments();
  13. if (args != null)
  14. for (int i = 0; i < args.length; i++) {
  15. // 对运行时参数进行序列化
  16. out.writeObject(encodeInvocationArgument(channel, inv, i));
  17. }
  18. // 序列化 attachments
  19. out.writeObject(inv.getAttachments());
  20. }
  21. }

至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。