2.2.4 服务注册

本节我们来分析服务注册过程,服务注册操作对于 Dubbo 来说不是必需的,通过服务直连的方式就可以绕过注册中心。但通常我们不会这么做,直连方式不利于服务治理,仅推荐在测试服务时使用。对于 Dubbo 来说,注册中心虽不是必需,但却是必要的。因此,关于注册中心以及服务注册相关逻辑,我们也需要搞懂。

本节内容以 Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册的入口方法开始分析,我们把目光再次移到 RegistryProtocol 的 export 方法上。如下:

  1. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  2. // ${导出服务}
  3. // 省略其他代码
  4. boolean register = registeredProviderUrl.getParameter("register", true);
  5. if (register) {
  6. // 注册服务
  7. register(registryUrl, registeredProviderUrl);
  8. ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
  9. }
  10. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
  11. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
  12. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  13. // 订阅 override 数据
  14. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  15. // 省略部分代码
  16. }

RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,相关代码如下:

  1. public void register(URL registryUrl, URL registedProviderUrl) {
  2. // 获取 Registry
  3. Registry registry = registryFactory.getRegistry(registryUrl);
  4. // 注册服务
  5. registry.register(registedProviderUrl);
  6. }

register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。

2.2.4.1 创建注册中心

本节内容以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry 方法的源码,这个方法由 AbstractRegistryFactory 实现。如下:

  1. public Registry getRegistry(URL url) {
  2. url = url.setPath(RegistryService.class.getName())
  3. .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
  4. .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
  5. String key = url.toServiceString();
  6. LOCK.lock();
  7. try {
  8. // 访问缓存
  9. Registry registry = REGISTRIES.get(key);
  10. if (registry != null) {
  11. return registry;
  12. }
  13. // 缓存未命中,创建 Registry 实例
  14. registry = createRegistry(url);
  15. if (registry == null) {
  16. throw new IllegalStateException("Can not create registry...");
  17. }
  18. // 写入缓存
  19. REGISTRIES.put(key, registry);
  20. return registry;
  21. } finally {
  22. LOCK.unlock();
  23. }
  24. }
  25. protected abstract Registry createRegistry(URL url);

如上,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry,然后写入缓存。这里的 createRegistry 是一个模板方法,由具体的子类实现。因此,下面我们到 ZookeeperRegistryFactory 中探究一番。

  1. public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
  2. // zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
  3. private ZookeeperTransporter zookeeperTransporter;
  4. public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
  5. this.zookeeperTransporter = zookeeperTransporter;
  6. }
  7. @Override
  8. public Registry createRegistry(URL url) {
  9. // 创建 ZookeeperRegistry
  10. return new ZookeeperRegistry(url, zookeeperTransporter);
  11. }
  12. }

ZookeeperRegistryFactory 的 createRegistry 方法仅包含一句代码,无需解释,继续跟下去。

  1. public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
  2. super(url);
  3. if (url.isAnyHost()) {
  4. throw new IllegalStateException("registry address == null");
  5. }
  6. // 获取组名,默认为 dubbo
  7. String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
  8. if (!group.startsWith(Constants.PATH_SEPARATOR)) {
  9. // group = "/" + group
  10. group = Constants.PATH_SEPARATOR + group;
  11. }
  12. this.root = group;
  13. // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
  14. zkClient = zookeeperTransporter.connect(url);
  15. // 添加状态监听器
  16. zkClient.addStateListener(new StateListener() {
  17. @Override
  18. public void stateChanged(int state) {
  19. if (state == RECONNECTED) {
  20. try {
  21. recover();
  22. } catch (Exception e) {
  23. logger.error(e.getMessage(), e);
  24. }
  25. }
  26. }
  27. });
  28. }

在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。

前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。

  1. public ZookeeperClient connect(URL url) {
  2. // 创建 CuratorZookeeperClient
  3. return new CuratorZookeeperClient(url);
  4. }

继续向下看。

  1. public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
  2. private final CuratorFramework client;
  3. public CuratorZookeeperClient(URL url) {
  4. super(url);
  5. try {
  6. // 创建 CuratorFramework 构造器
  7. CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
  8. .connectString(url.getBackupAddress())
  9. .retryPolicy(new RetryNTimes(1, 1000))
  10. .connectionTimeoutMs(5000);
  11. String authority = url.getAuthority();
  12. if (authority != null && authority.length() > 0) {
  13. builder = builder.authorization("digest", authority.getBytes());
  14. }
  15. // 构建 CuratorFramework 实例
  16. client = builder.build();
  17. // 添加监听器
  18. client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
  19. @Override
  20. public void stateChanged(CuratorFramework client, ConnectionState state) {
  21. if (state == ConnectionState.LOST) {
  22. CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
  23. } else if (state == ConnectionState.CONNECTED) {
  24. CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
  25. } else if (state == ConnectionState.RECONNECTED) {
  26. CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
  27. }
  28. }
  29. });
  30. // 启动客户端
  31. client.start();
  32. } catch (Exception e) {
  33. throw new IllegalStateException(e.getMessage(), e);
  34. }
  35. }
  36. }

CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码,大家如果对 Curator 框架不是很了解,可以参考 Curator 官方文档。

本节分析了 ZookeeperRegistry 实例的创建过程,整个过程并不是很复杂。大家在看完分析后,可以自行调试,以加深理解。现在注册中心实例创建好了,接下来要做的事情是向注册中心注册服务,我们继续往下看。

2.2.4.2 节点创建

以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下。为了让大家有一个直观的了解,下面我们将 Dubbo 的 demo 跑起来,然后通过 Zookeeper 可视化客户端 ZooInspector 查看节点数据。如下:

2.2.4 服务注册 - 图1

从上图中可以看到 com.alibaba.dubbo.demo.DemoService 这个服务对应的配置信息(存储在 URL 中)最终被注册到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 节点下。搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中。代码如下:

  1. public void register(URL url) {
  2. super.register(url);
  3. failedRegistered.remove(url);
  4. failedUnregistered.remove(url);
  5. try {
  6. // 模板方法,由子类实现
  7. doRegister(url);
  8. } catch (Exception e) {
  9. Throwable t = e;
  10. // 获取 check 参数,若 check = true 将会直接抛出异常
  11. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  12. && url.getParameter(Constants.CHECK_KEY, true)
  13. && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
  14. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  15. if (check || skipFailback) {
  16. if (skipFailback) {
  17. t = t.getCause();
  18. }
  19. throw new IllegalStateException("Failed to register");
  20. } else {
  21. logger.error("Failed to register");
  22. }
  23. // 记录注册失败的链接
  24. failedRegistered.add(url);
  25. }
  26. }
  27. protected abstract void doRegister(URL url);

如上,我们重点关注 doRegister 方法调用即可,其他的代码先忽略。doRegister 方法是一个模板方法,因此我们到 FailbackRegistry 子类 ZookeeperRegistry 中进行分析。如下:

  1. protected void doRegister(URL url) {
  2. try {
  3. // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
  4. // /${group}/${serviceInterface}/providers/${url}
  5. // 比如
  6. // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
  7. zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  8. } catch (Throwable e) {
  9. throw new RpcException("Failed to register...");
  10. }
  11. }

如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:

  1. public void create(String path, boolean ephemeral) {
  2. if (!ephemeral) {
  3. // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
  4. if (checkExists(path)) {
  5. return;
  6. }
  7. }
  8. int i = path.lastIndexOf('/');
  9. if (i > 0) {
  10. // 递归创建上一级路径
  11. create(path.substring(0, i), false);
  12. }
  13. // 根据 ephemeral 的值创建临时或持久节点
  14. if (ephemeral) {
  15. createEphemeral(path);
  16. } else {
  17. createPersistent(path);
  18. }
  19. }

上面方法先是通过递归创建当前节点的上一级路径,然后再根据 ephemeral 的值决定创建临时还是持久节点。createEphemeral 和 createPersistent 这两个方法都比较简单,这里简单分析其中的一个。如下:

  1. public void createEphemeral(String path) {
  2. try {
  3. // 通过 Curator 框架创建节点
  4. client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
  5. } catch (NodeExistsException e) {
  6. } catch (Exception e) {
  7. throw new IllegalStateException(e.getMessage(), e);
  8. }
  9. }

好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。本节先到这,接下来分析数据订阅过程。