2.2.3 导出服务到远程

与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,比较复杂。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。下面开始分析,我们把目光移动到 RegistryProtocol 的 export 方法上。

  1. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  2. // 导出服务
  3. final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
  4. // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
  5. // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
  6. URL registryUrl = getRegistryUrl(originInvoker);
  7. // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
  8. final Registry registry = getRegistry(originInvoker);
  9. // 获取已注册的服务提供者 URL,比如:
  10. // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
  11. final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
  12. // 获取 register 参数
  13. boolean register = registeredProviderUrl.getParameter("register", true);
  14. // 向服务提供者与消费者注册表中注册服务提供者
  15. ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
  16. // 根据 register 的值决定是否注册服务
  17. if (register) {
  18. // 向注册中心注册服务
  19. register(registryUrl, registeredProviderUrl);
  20. ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
  21. }
  22. // 获取订阅 URL,比如:
  23. // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
  24. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
  25. // 创建监听器
  26. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
  27. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  28. // 向注册中心进行订阅 override 数据
  29. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  30. // 创建并返回 DestroyableExporter
  31. return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
  32. }

上面代码看起来比较复杂,主要做如下一些操作:

  1. 调用 doLocalExport 导出服务
  2. 向注册中心注册服务
  3. 向注册中心进行订阅 override 数据
  4. 创建并返回 DestroyableExporter

在以上操作中,除了创建并返回 DestroyableExporter 没什么难度外,其他几步操作都不是很简单。这其中,导出服务和注册服务是本章要重点分析的逻辑。 订阅 override 数据并非本文重点内容,后面会简单介绍一下。下面先来分析 doLocalExport 方法的逻辑,如下:

  1. private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
  2. String key = getCacheKey(originInvoker);
  3. // 访问缓存
  4. ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
  5. if (exporter == null) {
  6. synchronized (bounds) {
  7. exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
  8. if (exporter == null) {
  9. // 创建 Invoker 为委托类对象
  10. final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
  11. // 调用 protocol 的 export 方法导出服务
  12. exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
  13. // 写缓存
  14. bounds.put(key, exporter);
  15. }
  16. }
  17. }
  18. return exporter;
  19. }

上面的代码是典型的双重检查锁,大家在阅读 Dubbo 的源码中,会多次见到。接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。所以,接下来我们目光转移到 DubboProtocol 的 export 方法上,相关分析如下:

  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  2. URL url = invoker.getUrl();
  3. // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
  4. // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
  5. String key = serviceKey(url);
  6. // 创建 DubboExporter
  7. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  8. // 将 <key, exporter> 键值对放入缓存中
  9. exporterMap.put(key, exporter);
  10. // 本地存根相关代码
  11. Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
  12. Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
  13. if (isStubSupportEvent && !isCallbackservice) {
  14. String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
  15. if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
  16. // 省略日志打印代码
  17. } else {
  18. stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
  19. }
  20. }
  21. // 启动服务器
  22. openServer(url);
  23. // 优化序列化
  24. optimizeSerialization(url);
  25. return exporter;
  26. }

如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。另外,DubboExporter 的代码比较简单,就不分析了。下面分析 openServer 方法。

  1. private void openServer(URL url) {
  2. // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
  3. String key = url.getAddress();
  4. boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
  5. if (isServer) {
  6. // 访问缓存
  7. ExchangeServer server = serverMap.get(key);
  8. if (server == null) {
  9. // 创建服务器实例
  10. serverMap.put(key, createServer(url));
  11. } else {
  12. // 服务器已创建,则根据 url 中的配置重置服务器
  13. server.reset(url);
  14. }
  15. }
  16. }

如上,在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。考虑到篇幅问题,关于服务器实例重置的代码就不分析了。接下来分析服务器实例的创建过程。如下:

  1. private ExchangeServer createServer(URL url) {
  2. url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
  3. // 添加心跳检测配置到 url 中
  4. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  5. // 获取 server 参数,默认为 netty
  6. String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
  7. // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
  8. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
  9. throw new RpcException("Unsupported server type: " + str + ", url: " + url);
  10. // 添加编码解码器参数
  11. url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
  12. ExchangeServer server;
  13. try {
  14. // 创建 ExchangeServer
  15. server = Exchangers.bind(url, requestHandler);
  16. } catch (RemotingException e) {
  17. throw new RpcException("Fail to start server...");
  18. }
  19. // 获取 client 参数,可指定 netty,mina
  20. str = url.getParameter(Constants.CLIENT_KEY);
  21. if (str != null && str.length() > 0) {
  22. // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
  23. Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
  24. // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
  25. // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
  26. if (!supportedTypes.contains(str)) {
  27. throw new RpcException("Unsupported client type...");
  28. }
  29. }
  30. return server;
  31. }

如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。

  1. public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handler == null) {
  6. throw new IllegalArgumentException("handler == null");
  7. }
  8. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
  9. // 获取 Exchanger,默认为 HeaderExchanger。
  10. // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
  11. return getExchanger(url).bind(url, handler);
  12. }

上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法。

  1. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  2. // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
  3. // 1. new HeaderExchangeHandler(handler)
  4. // 2. new DecodeHandler(new HeaderExchangeHandler(handler))
  5. // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
  6. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  7. }

HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下:

  1. public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handlers == null || handlers.length == 0) {
  6. throw new IllegalArgumentException("handlers == null");
  7. }
  8. ChannelHandler handler;
  9. if (handlers.length == 1) {
  10. handler = handlers[0];
  11. } else {
  12. // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
  13. handler = new ChannelHandlerDispatcher(handlers);
  14. }
  15. // 获取自适应 Transporter 实例,并调用实例方法
  16. return getTransporter().bind(url, handler);
  17. }

如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。下面我们继续跟下去,这次分析的是 NettyTransporter 的 bind 方法。

  1. public Server bind(URL url, ChannelHandler listener) throws RemotingException {
  2. // 创建 NettyServer
  3. return new NettyServer(url, listener);
  4. }

这里仅有一句创建 NettyServer 的代码,无需多说,我们继续向下看。

  1. public class NettyServer extends AbstractServer implements Server {
  2. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
  3. // 调用父类构造方法
  4. super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
  5. }
  6. }
  7. public abstract class AbstractServer extends AbstractEndpoint implements Server {
  8. public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  9. // 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑
  10. super(url, handler);
  11. localAddress = getUrl().toInetSocketAddress();
  12. // 获取 ip 和端口
  13. String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
  14. int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
  15. if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
  16. // 设置 ip 为 0.0.0.0
  17. bindIp = NetUtils.ANYHOST;
  18. }
  19. bindAddress = new InetSocketAddress(bindIp, bindPort);
  20. // 获取最大可接受连接数
  21. this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
  22. this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
  23. try {
  24. // 调用模板方法 doOpen 启动服务器
  25. doOpen();
  26. } catch (Throwable t) {
  27. throw new RemotingException("Failed to bind ");
  28. }
  29. DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
  30. executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
  31. }
  32. protected abstract void doOpen() throws Throwable;
  33. protected abstract void doClose() throws Throwable;
  34. }

上面代码多为赋值代码,不需要多讲。我们重点关注 doOpen 抽象方法,该方法需要子类实现。下面回到 NettyServer 中。

  1. protected void doOpen() throws Throwable {
  2. NettyHelper.setNettyLoggerFactory();
  3. // 创建 boss 和 worker 线程池
  4. ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
  5. ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
  6. ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
  7. // 创建 ServerBootstrap
  8. bootstrap = new ServerBootstrap(channelFactory);
  9. final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
  10. channels = nettyHandler.getChannels();
  11. bootstrap.setOption("child.tcpNoDelay", true);
  12. // 设置 PipelineFactory
  13. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  14. @Override
  15. public ChannelPipeline getPipeline() {
  16. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
  17. ChannelPipeline pipeline = Channels.pipeline();
  18. pipeline.addLast("decoder", adapter.getDecoder());
  19. pipeline.addLast("encoder", adapter.getEncoder());
  20. pipeline.addLast("handler", nettyHandler);
  21. return pipeline;
  22. }
  23. });
  24. // 绑定到指定的 ip 和端口上
  25. channel = bootstrap.bind(getBindAddress());
  26. }

以上就是 NettyServer 创建的过程,dubbo 默认使用的 NettyServer 是基于 netty 3.x 版本实现的,比较老了。因此 Dubbo 另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的过程中按需进行配置。

到此,关于服务导出的过程就分析完了。整个过程比较复杂,大家在分析的过程中耐心一些。并且多写 Demo 进行调试,以便能够更好的理解代码逻辑。

本节内容先到这里,接下来分析服务导出的另一块逻辑 — 服务注册。