
23.1 简介


服务发现,即消费端自动发现服务地址列表的能力,是微服务框架需要具备的关键能力,借助于自动化的服务发现,微服务之间可以在无需感知对端部署位置与 IP 地址的情况下实现通信。




  1. @Override
  2. public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
  3. CountDownLatch latch = new CountDownLatch(0);
  4. refreshInterfaceInvoker(latch);
  5. refreshServiceDiscoveryInvoker(latch);
  6. // directly calculate preferred invoker, will not wait until address notify
  7. // calculation will re-occurred when address notify later
  8. calcPreferredInvoker(newRule);
  9. }


23.2 关于refreshInterfaceInvoker方法


  1. protected void refreshInterfaceInvoker(CountDownLatch latch) {
  2. //如果invoker对象存在则清理invoker对象的InvokersChangedListener
  3. clearListener(invoker);
  4. //invoker对象为空或者已经被销毁了则执行invoker对象创建的逻辑
  5. if (needRefresh(invoker)) {
  6. if (logger.isDebugEnabled()) {
  7. logger.debug("Re-subscribing interface addresses for interface " + type.getName());
  8. }
  9. if (invoker != null) {
  10. invoker.destroy();
  11. }
  12. //关键代码获取调用器invoker对象
  13. invoker = registryProtocol.getInvoker(cluster, registry, type, url);
  14. }
  15. setListener(invoker, () -> {
  16. latch.countDown();
  17. if (reportService.hasReporter()) {
  18. reportService.reportConsumptionStatus(
  19. reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));
  20. }
  21. if (step == APPLICATION_FIRST) {
  22. calcPreferredInvoker(rule);
  23. }
  24. });
  25. }

23.2 从注册中心协议中获取调用器对象ClusterInvoker


  1. //关键代码获取调用器invoker对象
  2. //这个registryProtocol类型为InterfaceCompatibleRegistryProtocol
  3. invoker = registryProtocol.getInvoker(cluster, registry, type, url);

23.2.1 InterfaceCompatibleRegistryProtocol的getInvoker方法


  1. @Override
  2. public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
  3. //创建注册中心目录(Directory这个英文单词翻译的结果为目录)
  4. DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
  5. //创建Invoker对象
  6. return doCreateInvoker(directory, cluster, registry, type);
  7. }

23.3 服务目录Directory

Directory 即服务目录, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息

23.3.1 RegistryDirectory对象的初始化



  1. public RegistryDirectory(Class<T> serviceType, URL url) {
  2. super(serviceType, url);
  3. moduleModel = getModuleModel(url.getScopeModel());
  4. //这里对应类型为:ConsumerConfigurationListener
  5. consumerConfigurationListener = getConsumerConfigurationListener(moduleModel);
  6. }


23.3.2 RegistryDirectory对象的初始化


  1. public DynamicDirectory(Class<T> serviceType, URL url) {
  2. super(url, true);
  3. ModuleModel moduleModel = url.getOrDefaultModuleModel();
  4. //容错适配器,Cluster$Adaptive 默认的容错机制是失效转移 failover
  5. this.cluster = moduleModel.getExtensionLoader(Cluster.class).getAdaptiveExtension();
  6. //路由工厂适配器RouterFactory$Adaptive
  7. this.routerFactory = moduleModel.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
  8. if (serviceType == null) {
  9. throw new IllegalArgumentException("service type is null.");
  10. }
  11. if (StringUtils.isEmpty(url.getServiceKey())) {
  12. throw new IllegalArgumentException("registry serviceKey is null.");
  13. }
  14. this.shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
  15. this.shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
  16. //这里对应我们的例子中的服务类型 为:link.elastic.dubbo.entity.DemoService
  17. this.serviceType = serviceType;
  18. //服务没有分组和版本 默认的key是服务信息 :link.elastic.dubbo.entity.DemoService
  19. this.serviceKey = super.getConsumerUrl().getServiceKey();
  20. this.directoryUrl = consumerUrl;
  21. //分组信息查询
  22. String group = directoryUrl.getGroup("");
  23. this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
  24. //服务目录信息为空是否快速失败 默认为true
  25. this.shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));
  26. }

23.3.3 AbstractDirectory抽象服务目录的构造器


  1. public AbstractDirectory(URL url, boolean isUrlFromRegistry) {
  2. this(url, null, isUrlFromRegistry);
  3. }


  1. public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromRegistry) {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. this.url = url.removeAttribute(REFER_KEY).removeAttribute(MONITOR_KEY);
  6. Map<String, String> queryMap;
  7. //注册中心中引用URL的关键字名称 这个查询到的是服务引用的一些配置信息
  8. Object referParams = url.getAttribute(REFER_KEY);
  9. if (referParams instanceof Map) {
  10. queryMap = (Map<String, String>) referParams;
  11. //查询
  12. this.consumerUrl = (URL) url.getAttribute(CONSUMER_URL_KEY);
  13. } else {
  14. queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
  15. }
  16. // remove some local only parameters
  17. ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
  18. this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);
  19. if (consumerUrl == null) {
  20. String host = isNotEmpty(queryMap.get(REGISTER_IP_KEY)) ? queryMap.get(REGISTER_IP_KEY) : this.url.getHost();
  21. String path = isNotEmpty(queryMap.get(PATH_KEY)) ? queryMap.get(PATH_KEY) : queryMap.get(INTERFACE_KEY);
  22. String consumedProtocol = isNotEmpty(queryMap.get(PROTOCOL_KEY)) ? queryMap.get(PROTOCOL_KEY) : CONSUMER;
  23. URL consumerUrlFrom = this.url
  24. .setHost(host)
  25. .setPort(0)
  26. .setProtocol(consumedProtocol)
  27. .setPath(path);
  28. if (isUrlFromRegistry) {
  29. // reserve parameters if url is already a consumer url
  30. consumerUrlFrom = consumerUrlFrom.clearParameters();
  31. }
  32. this.consumerUrl = consumerUrlFrom.addParameters(queryMap);
  33. }
  34. //用于检查连接的线程池 核心线程数为CPU核心数,线程的名字为:Dubbo-framework-connectivity-scheduler 分析线程时候看到这个名字要知道它的用处
  35. this.connectivityExecutor = applicationModel.getFrameworkModel().getBeanFactory()
  36. .getBean(FrameworkExecutorRepository.class).getConnectivityScheduledExecutor();
  37. //获取全局配置,全局配置就是配置信息
  38. Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());
  39. //选择尝试重新连接的每个重新连接任务的最大调用程序数。 默认为10
  40. //从invokersToReconnect中选取调用程序限制最大重新连接任务TryCount,防止此任务长时间挂起所有ConnectionExecutor
  41. this.reconnectTaskTryCount = configuration.getInt(RECONNECT_TASK_TRY_COUNT, DEFAULT_RECONNECT_TASK_TRY_COUNT);
  42. //重连线程池两次触发重连任务的间隔时间,默认1000毫秒 重新连接任务的时间段(如果需要)。(单位:毫秒)
  43. this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD);
  44. //路由调用链
  45. setRouterChain(routerChain);
  46. }

23.4 调用对象ClusterInvoker的创建过程doCreateInvoker


  1. protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
  2. //初始化服务目录 为其设置 当前类型的的注册中心和协议
  3. directory.setRegistry(registry);
  4. directory.setProtocol(protocol);
  5. // all attributes of REFER_KEY
  6. Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
  7. //消费者配置转ServiceConfigURL
  8. URL urlToRegistry = new ServiceConfigURL(
  9. parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
  10. parameters.remove(REGISTER_IP_KEY),
  11. 0,
  12. getPath(parameters, type),
  13. parameters
  14. );
  15. urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
  16. urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
  17. if (directory.isShouldRegister()) {
  18. directory.setRegisteredConsumerUrl(urlToRegistry);
  19. //这一行代码是将服务消费者的配置信息注册到注册中心的逻辑
  20. registry.register(directory.getRegisteredConsumerUrl());
  21. }
  22. //这一行代码是用来创建路由链的
  23. directory.buildRouterChain(urlToRegistry);
  24. //服务发现并订阅的逻辑
  25. directory.subscribe(toSubscribeUrl(urlToRegistry));
  26. //cluster类型为 MockClusterWrapper 包装了 FailoverCluster
  27. //这个是处理调用链路的 最前面的调用是容错然后回加上失效转移,过滤器负载均衡等等invoker执行的时候按顺序执行
  28. return (ClusterInvoker<T>) cluster.join(directory, true);
  29. }

ListenerRegistryWrapper 的register方法

  1. @Override
  2. public void register(URL url) {
  3. try {
  4. //这个registry类型为ZookeeperRegistry
  5. if (registry != null) {
  6. registry.register(url);
  7. }
  8. } finally {
  9. if (CollectionUtils.isNotEmpty(listeners) && !UrlUtils.isConsumer(url)) {
  10. RuntimeException exception = null;
  11. for (RegistryServiceListener listener : listeners) {
  12. if (listener != null) {
  13. try {
  14. listener.onRegister(url, registry);
  15. } catch (RuntimeException t) {
  16. logger.error(t.getMessage(), t);
  17. exception = t;
  18. }
  19. }
  20. }
  21. if (exception != null) {
  22. throw exception;
  23. }
  24. }
  25. }
  26. }


  1. @Override
  2. public void register(URL url) {
  3. if (!acceptable(url)) {
  4. logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
  5. return;
  6. }
  7. super.register(url);
  8. removeFailedRegistered(url);
  9. removeFailedUnregistered(url);
  10. try {
  11. // Sending a registration request to the server side
  12. doRegister(url);
  13. } catch (Exception e) {
  14. Throwable t = e;
  15. // If the startup detection is opened, the Exception is thrown directly.
  16. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  17. && url.getParameter(Constants.CHECK_KEY, true)
  18. && (url.getPort() != 0);
  19. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  20. if (check || skipFailback) {
  21. if (skipFailback) {
  22. t = t.getCause();
  23. }
  24. throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  25. } else {
  26. logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  27. }
  28. // Record a failed registration request to a failed list, retry regularly
  29. addFailedRegistered(url);
  30. }
  31. }


  1. @Override
  2. public void register(URL url) {
  3. if (url == null) {
  4. throw new IllegalArgumentException("register url == null");
  5. }
  6. if (url.getPort() != 0) {
  7. if (logger.isInfoEnabled()) {
  8. logger.info("Register: " + url);
  9. }
  10. }
  11. registered.add(url);
  12. }


  1. @Override
  2. public void doRegister(URL url) {
  3. try {
  4. checkDestroyed();
  5. //写入消费者路径 /dubbo/服务接口/consumers/消费者配置url 第二个参数是否为临时节点默认是的,如果动态配置为false就会是持久节点了
  6. zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
  7. } catch (Throwable e) {
  8. throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  9. }
  10. }


  1. /dubbo/link.elastic.dubbo.entity.DemoService/consumers/consumer%3A%2F%2F192.168.1.169%2Flink.elastic.dubbo.entity.DemoService%3Fapplication%3Ddubbo-demo-api-consumer%26background%3Dfalse%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dlink.elastic.dubbo.entity.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D52237%26qos.enable%3Dfalse%26qos.port%3D-1%26release%3D3.0.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1659862505044


  1. @Override
  2. public void subscribe(URL url) {
  3. super.subscribe(url);
  4. if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
  5. consumerConfigurationListener.addNotifyListener(this);
  6. referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);
  7. }
  8. }


  1. public void subscribe(URL url) {
  2. setSubscribeUrl(url);
  3. registry.subscribe(url, this);
  4. }


  1. @Override
  2. public void subscribe(URL url, NotifyListener listener) {
  3. try {
  4. if (registry != null) {
  5. registry.subscribe(url, listener);
  6. }
  7. } finally {
  8. if (CollectionUtils.isNotEmpty(listeners)) {
  9. RuntimeException exception = null;
  10. for (RegistryServiceListener registryListener : listeners) {
  11. if (registryListener != null) {
  12. try {
  13. registryListener.onSubscribe(url, registry);
  14. } catch (RuntimeException t) {
  15. logger.error(t.getMessage(), t);
  16. exception = t;
  17. }
  18. }
  19. }
  20. if (exception != null) {
  21. throw exception;
  22. }
  23. }
  24. }
  25. }


  1. @Override
  2. public void subscribe(URL url, NotifyListener listener) {
  3. super.subscribe(url, listener);
  4. removeFailedSubscribed(url, listener);
  5. try {
  6. // Sending a subscription request to the server side
  7. doSubscribe(url, listener);
  8. } catch (Exception e) {
  9. Throwable t = e;
  10. List<URL> urls = getCacheUrls(url);
  11. if (CollectionUtils.isNotEmpty(urls)) {
  12. notify(url, listener, urls);
  13. logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t);
  14. } else {
  15. // If the startup detection is opened, the Exception is thrown directly.
  16. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  17. && url.getParameter(Constants.CHECK_KEY, true);
  18. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  19. if (check || skipFailback) {
  20. if (skipFailback) {
  21. t = t.getCause();
  22. }
  23. throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
  24. } else {
  25. logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  26. }
  27. }
  28. // Record a failed registration request to a failed list, retry regularly
  29. addFailedSubscribed(url, listener);
  30. }
  31. }


  1. @Override
  2. public void subscribe(URL url, NotifyListener listener) {
  3. if (url == null) {
  4. throw new IllegalArgumentException("subscribe url == null");
  5. }
  6. if (listener == null) {
  7. throw new IllegalArgumentException("subscribe listener == null");
  8. }
  9. if (logger.isInfoEnabled()) {
  10. logger.info("Subscribe: " + url);
  11. }
  12. // ConcurrentMap<URL, Set<NotifyListener>> subscribed集合用来记录消费者对应的通知监听器
  13. Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
  14. listeners.add(listener);
  15. }


  1. @Override
  2. public void doSubscribe(final URL url, final NotifyListener listener) {
  3. try {
  4. checkDestroyed();
  5. if (ANY_VALUE.equals(url.getServiceInterface())) {
  6. String root = toRootPath();
  7. boolean check = url.getParameter(CHECK_KEY, false);
  8. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
  9. ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChildren) -> {
  10. for (String child : currentChildren) {
  11. child = URL.decode(child);
  12. if (!anyServices.contains(child)) {
  13. anyServices.add(child);
  14. subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
  15. Constants.CHECK_KEY, String.valueOf(check)), k);
  16. }
  17. }
  18. });
  19. zkClient.create(root, false);
  20. List<String> services = zkClient.addChildListener(root, zkListener);
  21. if (CollectionUtils.isNotEmpty(services)) {
  22. for (String service : services) {
  23. service = URL.decode(service);
  24. anyServices.add(service);
  25. subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
  26. Constants.CHECK_KEY, String.valueOf(check)), listener);
  27. }
  28. }
  29. } else {
  30. CountDownLatch latch = new CountDownLatch(1);
  31. try {
  32. List<URL> urls = new ArrayList<>();
  33. //接口级默认的路径有3个我们暂时需要关注的:
  34. // 提供者:对应dubbo/link.elastic.dubbo.entity.DemoService/providers
  35. //配置:dubbo/link.elastic.dubbo.entity.DemoService/configurators
  36. //路由:/dubbo/link.elastic.dubbo.entity.DemoService/routers
  37. for (String path : toCategoriesPath(url)) {
  38. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
  39. //这里有个监听器RegistryChildListenerImpl
  40. ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
  41. if (zkListener instanceof RegistryChildListenerImpl) {
  42. ((RegistryChildListenerImpl) zkListener).setLatch(latch);
  43. }
  44. //创建非临时节点,不存在时候会创建 比如/dubbo/link.elastic.dubbo.entity.DemoService/providers
  45. ///dubbo/link.elastic.dubbo.entity.DemoService/configurators
  46. zkClient.create(path, false);
  47. //服务目录创建完毕之后创建一个监听器用来监听子目录,同时要返回一个path目录的子子节点,比如providers下面的提供者节点列表,如果有多个可以返回多个,下面以1个的情况举例子
  48. //dubbo%3A%2F%2F192.168.1.169%3A20880%2Flink.elastic.dubbo.entity.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26background%3Dfalse%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dlink.elastic.dubbo.entity.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D51534%26release%3D3.0.10%26service-name-mapping%3Dtrue%26side%3Dprovider%26timestamp%3D1659860685159
  49. List<String> children = zkClient.addChildListener(path, zkListener);
  50. if (children != null) {
  51. //urls存储的是当前服务对应服务配置的路径比如提供者
  52. urls.addAll(toUrlsWithEmpty(url, path, children));
  53. }
  54. }
  55. //通知方法(服务订阅)
  56. notify(url, listener, urls);
  57. } finally {
  58. // tells the listener to run only after the sync notification of main thread finishes.
  59. latch.countDown();
  60. }
  61. }
  62. } catch (Throwable e) {
  63. throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  64. }
  65. }



  1. dubbo://,configurators,routers&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&service-name-mapping=true&side=provider&sticky=false


  1. empty://,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false&timestamp=1659863678563


  1. empty://,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false&timestamp=1659863678563



  1. @Override
  2. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  3. if (url == null) {
  4. throw new IllegalArgumentException("notify url == null");
  5. }
  6. if (listener == null) {
  7. throw new IllegalArgumentException("notify listener == null");
  8. }
  9. try {
  10. doNotify(url, listener, urls);
  11. } catch (Exception t) {
  12. // Record a failed registration request to a failed list
  13. logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
  14. }
  15. }


  1. protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
  2. super.notify(url, listener, urls);
  3. }

AbstractRegistry类型的notify 服务通知模版方法

  1. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  2. if (url == null) {
  3. throw new IllegalArgumentException("notify url == null");
  4. }
  5. if (listener == null) {
  6. throw new IllegalArgumentException("notify listener == null");
  7. }
  8. if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {
  9. logger.warn("Ignore empty notify urls for subscribe url " + url);
  10. return;
  11. }
  12. if (logger.isInfoEnabled()) {
  13. logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
  14. }
  15. // keep every provider's category.
  16. Map<String, List<URL>> result = new HashMap<>();
  17. //这个例子中的urls会有3个可以看前面的说的
  18. for (URL u : urls) {
  19. //这里重点关注isMatch方法
  20. //这个match方法判断了消费者和提供者的分区+服务接口是否一致 或者其中一个配置为泛化配置:*
  21. if (UrlUtils.isMatch(url, u)) {
  22. //获取当前分类
  23. String category = u.getCategory(DEFAULT_CATEGORY);
  24. List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
  25. categoryList.add(u);
  26. }
  27. }
  28. if (result.size() == 0) {
  29. return;
  30. }
  31. Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
  32. //下面这个循环很重要会将所有的注册中心注册的数据推给notify方法包含配置,路由,服务提供者等
  33. for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
  34. String category = entry.getKey();
  35. List<URL> categoryList = entry.getValue();
  36. categoryNotified.put(category, categoryList);
  37. //RegistryDirectory类型的notify
  38. listener.notify(categoryList);
  39. // We will update our cache file after each notification.
  40. // When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.
  41. if (localCacheEnabled) {
  42. //缓存当前服务信息:consumer://,configurators,routers&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=36046&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false&timestamp=1660987214249
  43. saveProperties(url);
  44. }
  45. }
  46. }


  1. @Override
  2. public synchronized void notify(List<URL> urls) {
  3. if (isDestroyed()) {
  4. return;
  5. }
  6. Map<String, List<URL>> categoryUrls = urls.stream()
  7. .filter(Objects::nonNull)
  8. .filter(this::isValidCategory)
  9. .filter(this::isNotCompatibleFor26x)
  10. .collect(Collectors.groupingBy(this::judgeCategory));
  11. List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
  12. this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
  13. List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
  14. toRouters(routerURLs).ifPresent(this::addRouters);
  15. // providers
  16. List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
  17. // 3.x added for extend URL address
  18. ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
  19. List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
  20. if (supportedListeners != null && !supportedListeners.isEmpty()) {
  21. for (AddressListener addressListener : supportedListeners) {
  22. providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);
  23. }
  24. }
  25. refreshOverrideAndInvoker(providerURLs);
  26. }


  1. private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
  2. // mock zookeeper://xxx?mock=return null
  3. refreshInvoker(urls);
  4. }


  1. private void refreshInvoker(List<URL> invokerUrls) {
  2. Assert.notNull(invokerUrls, "invokerUrls should not be null");
  3. if (invokerUrls.size() == 1
  4. && invokerUrls.get(0) != null
  5. && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  6. this.forbidden = true; // Forbid to access
  7. routerChain.setInvokers(BitList.emptyList());
  8. destroyAllInvokers(); // Close all invokers
  9. } else {
  10. this.forbidden = false; // Allow to access
  11. if (invokerUrls == Collections.<URL>emptyList()) {
  12. invokerUrls = new ArrayList<>();
  13. }
  14. //使用本地引用以避免NPE。destroyAllInvokers()将cachedInvokerUrls设置为空。
  15. // use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().
  16. Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;
  17. if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {
  18. logger.warn("Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");
  19. invokerUrls.addAll(localCachedInvokerUrls);
  20. } else {
  21. localCachedInvokerUrls = new HashSet<>();
  22. //缓存的调用器URL,便于比较
  23. localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
  24. this.cachedInvokerUrls = localCachedInvokerUrls;
  25. }
  26. if (invokerUrls.isEmpty()) {
  27. return;
  28. }
  29. ////使用本地引用以避免NPE。urlInvokerMap将在destroyAllInvokers()处同时设置为null。
  30. // use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
  31. Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
  32. //无法使用本地引用,因为oldUrlInvokerMap的映射可能会在toInvokers()处直接删除。
  33. // can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
  34. Map<URL, Invoker<T>> oldUrlInvokerMap = null;
  35. if (localUrlInvokerMap != null) {
  36. // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
  37. oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
  38. localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
  39. }
  40. //将URL转换为Invoker 这里会做一些协议的指定过滤操作
  41. Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
  42. /**
  43. * If the calculation is wrong, it is not processed.
  44. *
  45. * 1. The protocol configured by the client is inconsistent with the protocol of the server.
  46. * eg: consumer protocol = dubbo, provider only has other protocol services(rest).
  47. * 2. The registration center is not robust and pushes illegal specification data.
  48. *
  49. */
  50. if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
  51. logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
  52. .toString()));
  53. return;
  54. }
  55. List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
  56. this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
  57. // pre-route and build cache
  58. routerChain.setInvokers(this.getInvokers());
  59. this.urlInvokerMap = newUrlInvokerMap;
  60. try {
  61. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
  62. } catch (Exception e) {
  63. logger.warn("destroyUnusedInvokers error. ", e);
  64. }
  65. // notify invokers refreshed
  66. this.invokersChanged();
  67. }
  68. }


  • 如果URL已转换为invoker,它将不再被重新引用并直接从缓存中获取,请注意,URL中的任何参数更改都将被重新引用。
  • 如果传入调用器列表不为空,则表示它是最新的调用器列表。
  • 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。




  1. private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
  2. Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
  3. if (urls == null || urls.isEmpty()) {
  4. return newUrlInvokerMap;
  5. }
  6. //这个配置怎么来呢,这个配置是就是要过滤的协议,如果我们指定了当前接口的协议比如dubbo.reference.<interface>.protocol这样的指定协议配置就可以在这里过滤出合法的协议
  7. String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
  8. //遍历所有提供者列表进行转换
  9. for (URL providerUrl : urls) {
  10. // If protocol is configured at the reference side, only the matching protocol is selected
  11. if (queryProtocols != null && queryProtocols.length() > 0) {
  12. boolean accept = false;
  13. String[] acceptProtocols = queryProtocols.split(",");
  14. for (String acceptProtocol : acceptProtocols) {
  15. if (providerUrl.getProtocol().equals(acceptProtocol)) {
  16. accept = true;
  17. break;
  18. }
  19. }
  20. if (!accept) {
  21. continue;
  22. }
  23. }
  24. //空协议 直接跳过
  25. if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
  26. continue;
  27. }
  28. //查询扩展是否存在,这个要注意如果是自定义扩展或者像webservice这样的扩展一般需要额外引入扩展包才可以
  29. if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
  30. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
  31. " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
  32. " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
  33. getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));
  34. continue;
  35. }
  36. //这个合并很重要决定了一个服务的配置优先级
  37. //合并url参数。顺序是:覆盖override协议配置(比如禁用服务有时候就这样做)> -D(JVM参数) > 消费者 > 提供者
  38. URL url = mergeUrl(providerUrl);
  39. // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
  40. Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);
  41. if (invoker == null) { // Not in the cache, refer again
  42. try {
  43. boolean enabled = true;
  44. if (url.hasParameter(DISABLED_KEY)) {
  45. enabled = !url.getParameter(DISABLED_KEY, false);
  46. } else {
  47. enabled = url.getParameter(ENABLED_KEY, true);
  48. }
  49. if (enabled) {
  50. //消费者引用服务提供者
  51. invoker = protocol.refer(serviceType, url);
  52. }
  53. } catch (Throwable t) {
  54. logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
  55. }
  56. if (invoker != null) { // Put new invoker in cache
  57. newUrlInvokerMap.put(url, invoker);
  58. }
  59. } else {
  60. newUrlInvokerMap.put(url, invoker);
  61. }
  62. }
  63. return newUrlInvokerMap;
  64. }


  1. if (enabled) {
  2. //消费者引用服务提供者
  3. invoker = protocol.refer(serviceType, url);
  4. }


  1. @Override
  2. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  3. checkDestroyed();
  4. return protocolBindingRefer(type, url);
  5. }


  1. @Override
  2. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
  3. checkDestroyed();
  4. //optimizer配置 序列化优化配置
  5. optimizeSerialization(url);
  6. // create rpc invoker
  7. //DubboInvoker调用器对象创建 这里要重点关注的方法是getClients.
  8. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  9. invokers.add(invoker);
  10. return invoker;
  11. }

注意这里分了两步,1)获取网络客户端 2)封装为DubboInvoker

  1. private ExchangeClient[] getClients(URL url) {
  2. // whether to share connection
  3. boolean useShareConnect = false;
  4. int connections = url.getParameter(CONNECTIONS_KEY, 0);
  5. List<ReferenceCountExchangeClient> shareClients = null;
  6. // if not configured, connection is shared, otherwise, one connection for one service
  7. if (connections == 0) {
  8. useShareConnect = true;
  9. /*
  10. * The xml configuration should have a higher priority than properties.
  11. */
  12. String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
  13. connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY,
  14. DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
  15. shareClients = getSharedClient(url, connections);
  16. }
  17. ExchangeClient[] clients = new ExchangeClient[connections];
  18. for (int i = 0; i < clients.length; i++) {
  19. if (useShareConnect) {
  20. clients[i] = shareClients.get(i);
  21. } else {
  22. clients[i] = initClient(url);
  23. }
  24. }
  25. return clients;
  26. }


  1. @SuppressWarnings("unchecked")
  2. private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
  3. String key = url.getAddress();
  4. Object clients = referenceClientMap.get(key);
  5. if (clients instanceof List) {
  6. List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
  7. if (checkClientCanUse(typedClients)) {
  8. batchClientRefIncr(typedClients);
  9. return typedClients;
  10. }
  11. }
  12. List<ReferenceCountExchangeClient> typedClients = null;
  13. synchronized (referenceClientMap) {
  14. for (; ; ) {
  15. clients = referenceClientMap.get(key);
  16. if (clients instanceof List) {
  17. typedClients = (List<ReferenceCountExchangeClient>) clients;
  18. if (checkClientCanUse(typedClients)) {
  19. batchClientRefIncr(typedClients);
  20. return typedClients;
  21. } else {
  22. referenceClientMap.put(key, PENDING_OBJECT);
  23. break;
  24. }
  25. } else if (clients == PENDING_OBJECT) {
  26. try {
  27. referenceClientMap.wait();
  28. } catch (InterruptedException ignored) {
  29. }
  30. } else {
  31. referenceClientMap.put(key, PENDING_OBJECT);
  32. break;
  33. }
  34. }
  35. }
  36. try {
  37. // connectNum must be greater than or equal to 1
  38. //长连接数量 默认为1
  39. connectNum = Math.max(connectNum, 1);
  40. // If the clients is empty, then the first initialization is
  41. if (CollectionUtils.isEmpty(typedClients)) {
  42. //!!!!主要看这一行 构建客户端
  43. typedClients = buildReferenceCountExchangeClientList(url, connectNum);
  44. } else {
  45. for (int i = 0; i < typedClients.size(); i++) {
  46. ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
  47. // If there is a client in the list that is no longer available, create a new one to replace him.
  48. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
  49. typedClients.set(i, buildReferenceCountExchangeClient(url));
  50. continue;
  51. }
  52. referenceCountExchangeClient.incrementAndGetCount();
  53. }
  54. }
  55. } finally {
  56. synchronized (referenceClientMap) {
  57. if (typedClients == null) {
  58. referenceClientMap.remove(key);
  59. } else {
  60. //这里key位IP:端口 值为当前客户端 一个IP:端口的服务提供者会缓存一个连接列表
  61. referenceClientMap.put(key, typedClients);
  62. }
  63. referenceClientMap.notifyAll();
  64. }
  65. }
  66. return typedClients;
  67. }

DubboProtocol类型的 buildReferenceCountExchangeClientList方法: 创建网络交换器客户端

  1. private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
  2. List<ReferenceCountExchangeClient> clients = new ArrayList<>();
  3. for (int i = 0; i < connectNum; i++) {
  4. clients.add(buildReferenceCountExchangeClient(url));
  5. }
  6. return clients;
  7. }


  1. private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
  2. //初始化客户端对象
  3. ExchangeClient exchangeClient = initClient(url);
  4. //包装交换器客户端对象
  5. ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
  6. // read configs
  7. int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
  8. client.setShutdownWaitTime(shutdownTimeout);
  9. return client;
  10. }


  1. /**
  2. * Create new connection
  3. *
  4. * @param url
  5. */
  6. private ExchangeClient initClient(URL url) {
  7. /**
  8. * Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
  9. * which means params are shared among different services. Since client is shared among services this is currently not a problem.
  10. */
  11. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
  12. // BIO is not allowed since it has severe performance issue.
  13. if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
  14. throw new RpcException("Unsupported client type: " + str + "," +
  15. " supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
  16. }
  17. ExchangeClient client;
  18. try {
  19. // Replace InstanceAddressURL with ServiceConfigURL.
  20. url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
  21. url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
  22. // enable heartbeat by default
  23. url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
  24. // connection should be lazy
  25. if (url.getParameter(LAZY_CONNECT_KEY, false)) {
  26. client = new LazyConnectExchangeClient(url, requestHandler);
  27. } else {
  28. client = Exchangers.connect(url, requestHandler);
  29. }
  30. } catch (RemotingException e) {
  31. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
  32. }
  33. return client;
  34. }


  1. public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handler == null) {
  6. throw new IllegalArgumentException("handler == null");
  7. }
  8. //交换器扩展对象获取默认为HeaderExchanger类型
  9. return getExchanger(url).connect(url, handler);
  10. }


  1. @Override
  2. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  3. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
  4. }


  1. public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. ChannelHandler handler;
  6. if (handlers == null || handlers.length == 0) {
  7. handler = new ChannelHandlerAdapter();
  8. } else if (handlers.length == 1) {
  9. handler = handlers[0];
  10. } else {
  11. handler = new ChannelHandlerDispatcher(handlers);
  12. }
  13. //默认的传输器是netty4
  14. return getTransporter(url).connect(url, handler);
  15. }


  1. @Override
  2. public Client connect(URL url, ChannelHandler handler) throws RemotingException {
  3. return new NettyClient(url, handler);
  4. }


  1. public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
  2. // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
  3. // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
  4. super(url, wrapChannelHandler(url, handler));
  5. }



  1. public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
  2. super(url, handler);
  3. // set default needReconnect true when channel is not connected
  4. needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
  5. //初始化线程池 默认为FixedThreadPool
  6. initExecutor(url);
  7. try {
  8. //启动netty的核心代码初始化Bootstrap
  9. //默认走的是NioSocketChannel
  10. //初始化默认连接超时时间为3秒
  11. doOpen();
  12. } catch (Throwable t) {
  13. close();
  14. throw new RemotingException(url.toInetSocketAddress(), null,
  15. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  16. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
  17. }
  18. try {
  19. // connect.
  20. //前面是初始化netty的客户端启动类Bootstrap 这里是执行连接的代码:bootstrap.connect(getConnectAddress());
  21. //等待3秒连接失败则抛出异常
  22. connect();
  23. if (logger.isInfoEnabled()) {
  24. logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
  25. }
  26. } catch (RemotingException t) {
  27. // If lazy connect client fails to establish a connection, the client instance will still be created,
  28. // and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
  29. if (url.getParameter(LAZY_CONNECT_KEY, false)) {
  30. logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
  31. " connect to the server " + getRemoteAddress() +
  32. " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
  33. t.getMessage(), t);
  34. return;
  35. }
  36. //默认必须连接,无法连接则抛出异常
  37. if (url.getParameter(Constants.CHECK_KEY, true)) {
  38. close();
  39. throw t;
  40. } else {
  41. logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  42. + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
  43. }
  44. } catch (Throwable t) {
  45. close();
  46. throw new RemotingException(url.toInetSocketAddress(), null,
  47. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  48. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
  49. }
  50. }


  1. public AbstractEndpoint(URL url, ChannelHandler handler) {
  2. super(url, handler);
  3. //DubboCodec是实现RPC调用的Request和Response对象的编码和解码类,RPC调用实现的核心传输也就是这两个类对象。
  4. //这里是封装了DubboCodec类型的DubboCountCodec
  5. this.codec = getChannelCodec(url);
  6. //默认连接超时时间为3000毫秒
  7. this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
  8. }


  1. public AbstractPeer(URL url, ChannelHandler handler) {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handler == null) {
  6. throw new IllegalArgumentException("handler == null");
  7. }
  8. this.url = url;
  9. this.handler = handler;
  10. }