Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker

发布于:2024-12-22 ⋅ 阅读:(9) ⋅ 点赞:(0)

基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。

此前我们学习了MigrationRuleHandler这个处理器,它用于通过动态更改规则来控制迁移行为。MigrationRuleListener的onrefer方法是Dubbo2.x 接口级服务发现与Dubbo3.x应用级服务发现之间迁移的关键。

我们最后讲到了MigrationRuleHandler的refreshInvoker方法,该方法除了刷新invoker迁移新规则之外,还负责远程服务发现订阅的逻辑,即消费者能发现远程服务提供方的地址列表,而应用级的服务引入订阅则是通过refreshServiceDiscoveryInvoker方法实现的。

我们此前学习了接口级服务引入的方法refreshInterfaceInvoker的源码,应用级的服务引入和此方法有很多相似之处,对于相同的方法,我们不再赘述。

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)

1 refreshServiceDiscoveryInvoker刷新应用级inovker

该方法具有应用级别的远程服务发现、引入、订阅能力,大概逻辑为:

  1. 首先判断是否需要刷新serviceDiscoveryInvoker,即重新创建真实的服务级invoker:如果真实serviceDiscoveryInvoker不存在,或者已被销毁,或者内部没有Directory,则需要刷新。
  2. 一般情况下,当启动消费者并首次执行refer的时候,真实serviceDiscoveryInvoker为null,需要创建serviceDiscoveryInvoker。
  3. 通过注册中心操作类registryProtocol#getServiceDiscoveryInvoker方法来引入服务提供者serviceDiscoveryInvoker,这是消费者进行应用级别服务发现订阅的核心逻辑。
/**
 * MigrationInvoker的方法
 *
 * 刷新应用invoker
 * @param latch 倒计数器
 */
protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {
    /*
     * 1 如果MigrationInvoker内部的真实serviceDiscoveryInvoker存在,那么清空真实serviceDiscoveryInvoker的directory的
     */
    clearListener(serviceDiscoveryInvoker);
    /*
     * 2 判断是否需要刷新服务级serviceDiscoveryInvoker
     * 如果真实invoker不存在,或者已被销毁,或者内部没有Directory
     * 一般情况下,当启动消费者并首次执行refer的时候,真实invoker为null,需要创建
     */
    if (needRefresh(serviceDiscoveryInvoker)) {
        if (logger.isDebugEnabled()) {
            logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
        }
        //如果不为null,则销毁
        if (serviceDiscoveryInvoker != null) {
            serviceDiscoveryInvoker.destroy();
        }
        /*
         * 3 通过注册中心操作类registryProtocol获取真实serviceDiscoveryInvoker
         *
         * 这是消费者进行应用级服务发现订阅的核心逻辑,设这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol
         */
        serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
    }
    /*
     * 设置监听器
     */
    setListener(serviceDiscoveryInvoker, () -> {
        latch.countDown();
        if (reportService.hasReporter()) {
            reportService.reportConsumptionStatus(
                reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app"));
        }
        if (step == APPLICATION_FIRST) {
            calcPreferredInvoker(rule);
        }
    });
}

2 getServiceDiscoveryInvoker获取invoker

这是默认消费者进行应用级服务发现订阅的核心逻辑,这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol。

  1. 调用父类RegistryProtocol#getRegistryUrl方法,将注册中心协议url转换为应用级服务发现协议url,即service-discovery-registry协议。
  2. 随后调用getRegistry方法,根据应用级服务发现协议url获取注册中心操作类Registry,service-discovery-registry协议对应着ServiceDiscoveryRegistry。
  3. 创建应用级动态注册心中目录ServiceDiscoveryRegistryDirectory,随后调用doCreateInvoker方法创建服务引入invoker。
/**
 * InterfaceCompatibleRegistryProtocol的方法
 * <p>
 * 获取应用级别invoker
 *
 * @param cluster  集群操作对象
 * @param registry 注册中心对象,例如ListenerRegistryWrapper(ZookeeperRegistry)
 * @param type     接口类型
 * @param url      注册中心协议url,协议是真实注册中心协议,例如zookeeper
 * @return 真实invoker
 */
@Override
public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //调用父类RegistryProtocol的getRegistryUrl方法,将注册中心协议url转换为应用级服务发现协议url,即service-discovery-registry协议
    //根据应用级服务发现协议url获取注册中心操作类Registry,service-discovery-registry对应着ServiceDiscoveryRegistry
    registry = getRegistry(super.getRegistryUrl(url));
    /*
     * 创建应用级动态注册心中目录ServiceDiscoveryRegistryDirectory
     */
    DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
    /*
     * 创建invoker
     */
    return doCreateInvoker(directory, cluster, registry, type);
}

3 ServiceDiscoveryRegistryDirectory注册中心目录

ServiceDiscoveryRegistryDirectory是基于应用级注册中心的服务发现使用的服务目录,我们在接口级服务发现订阅refreshInterfaceInvoker部分已经讲过Directory的作用了,在此不再赘述。

public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
    //父类DynamicDirectory的构造器
    super(serviceType, url);
    moduleModel = getModuleModel(url.getScopeModel());
    //服务提供者优先的属性
    Set<ProviderFirstParams> providerFirstParams = url.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances();
    if (CollectionUtils.isEmpty(providerFirstParams)) {
        this.providerFirstParams = null;
    } else {
        if (providerFirstParams.size() == 1) {
            this.providerFirstParams = Collections.unmodifiableSet(providerFirstParams.iterator().next().params());
        } else {
            Set<String> params = new HashSet<>();
            for (ProviderFirstParams paramsFilter : providerFirstParams) {
                if (paramsFilter.params() == null) {
                    break;
                }
                params.addAll(paramsFilter.params());
            }
            this.providerFirstParams = Collections.unmodifiableSet(params);
        }
    }
    //获取消费者需要查询过滤的协议
    String protocol = consumerUrl.getParameter(PROTOCOL_KEY, consumerUrl.getProtocol());
    //消费者协议服务key
    consumerProtocolServiceKey = new ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(),
        !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
}

4 doCreateInvoker创建invoker

该方法由InterfaceCompatibleRegistryProtocol的父类RegistryProtocol实现。大概步骤为:

  1. 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
  2. 调用registry.register方法将消费者注册信息url注册到注册中心。
  3. 调用directory.buildRouterChain方法构建服务调用路由链。
  4. 调用directory.subscribe方法进行服务发现、引入并订阅服务。
  5. 调用cluster.join方法进行集群容错能力包装。

接口级的服务发现同样是调用该方法,区别是应用级调用的方法中registry参数底层对象为ServiceDiscoveryRegistry类型,directory参数为ServiceDiscoveryRegistryDirectory类型,接口级调用的方法中registry参数底层对象为ZookeeperRegistry类型,directory参数为RegistryDirectory类型。


/**
 * RegistryProtocol的方法
 * 创建ClusterInvoker
 *
 * @param directory 动态目录
 * @param cluster   集群
 * @param registry  注册中心
 * @param type      服务接口类型
 * @return ClusterInvoker
 */
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
    //注册中心操作类
    directory.setRegistry(registry);
    //设置协议,Protocol$Adaptive
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY 消费者服务引用参数
    Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
    //消费者信息转消费者注册信息url
    URL urlToRegistry = new ServiceConfigURL(
        //获取protocol属性,只调用指定协议的服务提供方,其它协议忽略,默认值consumer
        parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
        //消费者ip
        parameters.remove(REGISTER_IP_KEY),
        //端口
        0,
        //服务接口路径
        getPath(parameters, type),
        //服务引用参数
        parameters
    );
    urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
    urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
    //是否应该注册,默认true
    if (directory.isShouldRegister()) {
        //设置注册的消费者url
        directory.setRegisteredConsumerUrl(urlToRegistry);
        /*
         * 1 消费者注册信息url注册到注册中心
         */
        registry.register(directory.getRegisteredConsumerUrl());
    }
    /*
     * 2 构建服务路由器链
     */
    directory.buildRouterChain(urlToRegistry);
    /*
     * 3 服务发现并订阅服务
     */
    directory.subscribe(toSubscribeUrl(urlToRegistry));
    /*
     * 4 集群容错包装
     */
    return (ClusterInvoker<T>) cluster.join(directory, true);
}

4.1 register注册应用级消费者信息

该方法的源码我们在此前学习provider****导出服务并且应用级服务注册到注册中心的时候就讲过了,即注册应用级别服务消费者和提供者信息是同一个方法。

与之前讲的接口级服务引入的注册不同的是,应用级服务引入的服务消费者url将不会注册到注册中心,这样减轻了注册中心的压力。

/**
 * ServiceDiscoveryRegistry的方法
 * 
 * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
 */
@Override
public final void register(URL url) {
    //只注册提供者,如果是消费者url直接返回
    if (!shouldRegister(url)) { // Should Not Register
        return;
    }
    //执行注册
    doRegister(url);
}

4.2 subscribe应用级服务发现和订阅

该方法首先将当前RegistryDirectory实例加入到节点目录变化的回调通知监听器集合中,用以接收通知。随后调用父类DynamicDirectory的subscribe方法订阅服务。

/**
 * ServiceDiscoveryRegistryDirectory的方法
 *
 * 应用级服务发现并订阅服务
 *
 * @param url 服务消费者url
 */
@Override
public void subscribe(URL url) {
    //获取enable-configuration-listen属性,即是否支持配置监听,默认true
    if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
        //设置为true
        enableConfigurationListen = true;
        //将当前ServiceDiscoveryRegistryDirectory加入到节点目录变化的回调通知监听器集合中
        getConsumerConfigurationListener(moduleModel).addNotifyListener(this);
        //引用配置监听器
        referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url);
    } else {
        enableConfigurationListen = false;
    }
    //调用父类DynamicDirectory的subscribe方法
    super.subscribe(url);
}

DynamicDirectory的subscribe方法如下,可以看到最终还是依靠ServiceDiscoveryRegistry#subscribe方法实现应用级服务订阅的。

/**
 * DynamicDirectory的方法
 * 订阅服务
 *
 * @param url 服务消费者url
 */
public void subscribe(URL url) {
    //设置subscribeUrl属性
    setSubscribeUrl(url);
    //调用registry注册中心的subscribe方法实现服务订阅
    registry.subscribe(url, this);
}

4.2.1 ServiceDiscoveryRegistry#subscribe应用级服务订阅

ServiceDiscoveryRegistry实现了该方法,而ZookeeperRegistry没有实现改方法。

  1. 首先调用!shouldSubscribe方法判断是否不应该订阅,内部调用的!shouldRegister方法。也就是说应用级服务提供者只会注册不会订阅,而应用级的服务消费者只会订阅不会注册。
  2. 然后调用doSubscribe方法对应用级的服务消费者执行服务订阅。
/**
 * ServiceDiscoveryRegistry的方法
 *
 * @param url      订阅者url
 * @param listener 通知监听器
 */
@Override
public final void subscribe(URL url, NotifyListener listener) {
    //是否不应该订阅,内部调用的shouldRegister方法
    //也就是说应用级服务提供者只会注册不会订阅,而应用级的服务消费者只会订阅不会注册
    if (!shouldSubscribe(url)) { // Should Not Subscribe
        return;
    }
    //应用级的服务消费者执行服务订阅
    doSubscribe(url, listener);
}

4.3 ServiceDiscoveryRegistry#doSubscribe执行服务发现订阅

doSubscribe方法执行应用级服务发现订阅,大概步骤为:

  1. 通过ZookeeperServiceDiscovery#subscribe方法执行应用级服务发现订阅。这里仅仅是将订阅者url加入到metadataInfo的subscribedServiceURLs缓存中,没有真正的订阅操作。
  2. 通serviceNameMapping#getAndListen从元数据中心获取当前服务接口映射的服务提供者服务名列表。在应用级服务提供者启动过程中,会将服务接口到服务名的映射关系发布到远程元数据中心。
  3. 调用subscribeURLs方法,继续执行服务url订阅,这里才会执行真正的订阅逻辑,将会根据服务名去注册中心找到服务实例,然后对服务实例分组并通过rpc调用服务实例的MetaDataService服务获取服务元数据。最后构建服务实例url,notify通知ServiceDiscoveryRegistryDirectory,进一步的创建invoker。
/**
 * ServiceDiscoveryRegistry的方法
     * <p>
     * 执行服务发现订阅
     *
     * @param url      订阅者url
     * @param listener 通知监听器,ServiceDiscoveryRegistryDirectory
     */
    @Override
    public void doSubscribe(URL url, NotifyListener listener) {
        url = addRegistryClusterKey(url);
        /*
         * 1 通过ZookeeperServiceDiscoveryy#subscribe方法执行应用级服务发现订阅
         * 这里仅仅是将订阅者url加入到metadataInfo的subscribedServiceURLs缓存中,没有真正的订阅操作。
         */
        serviceDiscovery.subscribe(url, listener);

        boolean check = url.getParameter(CHECK_KEY, false);
        /*
         * 2 从元数据中心获取当前服务接口映射的服务提供者服务名列表
         *
         * 在应用级服务提供者启动过程中,会将服务接口到服务名的映射关系发布到远程元数据中心
         */
        //构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名
        String key = ServiceNameMapping.buildMappingKey(url);
        //获取对应的锁,每一个key对应一把锁
        Lock mappingLock = serviceNameMapping.getMappingLock(key);
        try {
            //加锁
            mappingLock.lock();
            /*
             * 尝试从缓存获取需要订阅的服务名列表,作为初始化服务名列表,默认null
             * 这是一个Dubbo实现的LUR缓存,默认最多10000个mapping缓存,原理是很简单的继承LinkedHashMap的方式
             */
            Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
            try {
                //服务映射监听器
                MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
                /*
                 * 核心方法
                 * 根据注册中心协议url,服务消费者url,服务映射监听器,从元数据中心获取当前服务接口对应的服务提供者服务名列表
                 */
                subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
                //存入监听器缓存
                mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
            } catch (Exception e) {
                logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
            }

            if (CollectionUtils.isEmpty(subscribedServices)) {
                logger.info("No interface-apps mapping found in local cache, stop subscribing, will automatically wait for mapping listener callback: " + url);
//                if (check) {
//                    throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
//                }
                return;
            }
            /*
             * 3 应用级别的服务url订阅
             *
             * 这里才会执行真正的订阅
             */
            subscribeURLs(url, listener, subscribedServices);
        } finally {
            mappingLock.unlock();
        }
    }

4.3.1 ZookeeperServiceDiscovery#subscribe基于zk的应用级服务订阅

该方法是父类AbstractServiceDiscovery实现的,内部调用metadataInfo的addService方法。

我们此前学习应用级服务注册的时候,ServiceDiscoveryRegistry#register方法同样是依靠父类AbstractServiceDiscovery实现,其内部也是调用的,内部调用metadataInfo的addService方法。

/**
 * AbstractServiceDiscovery的方法
 *
 * 执行应用级订阅
 *
 * @param url      订阅者url
 * @param listener 通知监听器,ServiceDiscoveryRegistryDirectory
 */
@Override
public void subscribe(URL url, NotifyListener listener) {
    //添加订阅url
    metadataInfo.addSubscribedURL(url);
}

addSubscribedURL方法实际上仅仅将订阅者url加入到subscribedServiceURLs缓存map中就结束了,key为serviceKey,规则为{group}/{interfaceName}:{version},除此之外没有其他的操作。

/**
 * MetadataInfo的方法
 * 添加订阅url
 * @param url 订阅者url
 */
public synchronized void addSubscribedURL(URL url) {
    //第一次添加时初始化subscribedServiceURLs集合,这里使用的是一个跳表
    if (subscribedServiceURLs == null) {
        subscribedServiceURLs = new ConcurrentSkipListMap<>();
    }
    //将url加入到subscribedServiceURLs集合
    addURL(subscribedServiceURLs, url);
}

private boolean addURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {
    //加入到serviceURLs集合, key为{group}/{interfaceName}:{version}
    SortedSet<URL> urls = serviceURLs.computeIfAbsent(url.getServiceKey(), this::newSortedURLs);
    // make sure the parameters of tmpUrl is variable
    return urls.add(url);
}

4.3.2 MetadataServiceNameMapping#getAndListen获取并订阅服务映射信息

我们此前在应用级服务提供者启动过程中讲过,在最后会将服务接口到服务名的映射关系发布到远程元数据中心。

而在应用级别消费者启动过程中,在引用服务的时候也会根据接口(服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名)去元数据中心查找当前要引入的服务接口对应的服务提供者服务名列表,并且还会进行订阅,当服务映射数据变更时会更新内存数据。

该方法的大概步骤为:

  1. 首先尝试从本地LUR缓存中获取mapping,如果没有获取到,那么将会创建一个异步监听器AsyncMappingTask,主动调用call方法同步拉取元数据中心的mapping映射信息,获取的数据不会通知监听器DefaultMappingListener立即更新缓存。
  2. 如果缓存已存在,那么将会创建一个异步监听器AsyncMappingTask提交到线程池,异步的获取的数据,并且会通知监听器DefaultMappingListener更新缓存。
/**
 * AbstractServiceNameMapping的方法
 *
 * 获取并监听服务映射
 *
 * @param registryURL   注册中心协议url
 * @param subscribedURL 服务消费者url
 * @param listener      通知监听器DefaultMappingListener,内部含有ServiceDiscoveryRegistryDirectory
 * @return 构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名
 */
@Override
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
    //构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名
    String key = ServiceNameMapping.buildMappingKey(subscribedURL);
    //首先从本地LUR缓存中获取mapping
    Set<String> mappingServices = this.getCachedMapping(key);

    // Asynchronously register listener in case previous cache does not exist or cache expired.
    //如果缓存不存在
    if (CollectionUtils.isEmpty(mappingServices)) {
        try {
            logger.info("Local cache mapping is empty");
            /*
             * 创建一个异步监听器,主动调用call方法同步拉取元数据中心的mapping映射信息,获取的数据不会通知监听器DefaultMappingListener
             */
            mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();
        } catch (Exception e) {
            // ignore
        }
        //如果注册中心数据不存在
        if (CollectionUtils.isEmpty(mappingServices)) {
            //从url获取subscribed-services参数,表示手动指定的需要订阅的服务名,以,分割多个值
            String registryServices = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);
            if (StringUtils.isNotEmpty(registryServices)) {
                logger.info(subscribedURL.getServiceInterface() + " mapping to " + registryServices + " instructed by registry subscribed-services.");
                //以,分割多个值
                mappingServices = parseServices(registryServices);
            }
        }
        //如果找到了mapping数据,那么存入LRU缓存中
        if (CollectionUtils.isNotEmpty(mappingServices)) {
            this.putCachedMapping(key, mappingServices);
        }
    }
    //如果存在本地缓存
    else {
        //获取异步mapping任务执行器mappingRefreshingExecutor
        ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
            .getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
        //创建一个异步监听器提交到线程池,获取的数据将会通知监听器DefaultMappingListener更新缓存
        executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
    }

    return mappingServices;
}
4.3.2.1 AsyncMappingTask#call从元数据中心获取mapping

AsyncMappingTask#call方法将会从元数据中心远程拉取服务映射信息。实际上内部仍然是调用MetadataServiceNameMapping的另一个重载的getAndListen方法从元数据中心获取服务接口到服务名的映射信息。

/**
 * AsyncMappingTask的方法
 *
 * 从元数据中心远程拉取服务映射信息
 */
@Override
public Set<String> call() throws Exception {
    synchronized (mappingListeners) {
        //构建空集合
        Set<String> mappedServices = emptySet();
        try {
            //key,目前是仅仅是服务接口全路径名
            String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
            //如果监听器不为空
            if (listener != null) {
                //调用getAndListen方法元数据中心远程拉取服务映射信息并且注册监听
                mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
                //讲key和对应的listener加入到外部类的mappingListeners映射中
                Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
                listeners.add(listener);
                //是否立即通知监听器,
                if (CollectionUtils.isNotEmpty(mappedServices)) {
                    if (notifyAtFirstTime) {
                        // guarantee at-least-once notification no matter what kind of underlying meta server is used.
                        // listener notification will also cause updating of mapping cache.
                        //通知更新缓存
                        listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
                    }
                }
            } else {
                //没有监听器的情况
                mappedServices = get(subscribedURL);
                //直接存入缓存
                if (CollectionUtils.isNotEmpty(mappedServices)) {
                    AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
                }
            }
        } catch (Exception e) {
            logger.error("Failed getting mapping info from remote center. ", e);
        }
        return mappedServices;
    }
}
4.3.2.2 MetadataServiceNameMapping#getAndListen获取并监听服务映射信息

metadataReport.getServiceAppMapping方法从元数据中心远程拉取获取服务接口对应的服务名映射集合并且注册监听。

/**
 * MetadataServiceNameMapping的方法
 *
 * 从元数据中心远程拉取服务映射信息并且注册监听
 *
 * @param url 消费者url
 * @param mappingListener 监听器MappingListener
 * @return 服务映射信息
 */
@Override
public Set<String> getAndListen(URL url, MappingListener mappingListener) {
    //服务接口
    String serviceInterface = url.getServiceInterface();
    // randomly pick one metadata report is ok for it's guaranteed all metadata report will have the same mapping data. 
    //随机获取一个配置的注册中心id
    String registryCluster = getRegistryCluster(url);
    //首先获取注册中心id对应的元数据中心实例,如果没有则从metadataReports列表获取第一个元数据中心实例
    MetadataReport metadataReport = metadataReportInstance.getMetadataReport(registryCluster);
    if (metadataReport == null) {
        return Collections.emptySet();
    }
    //从元数据中心获取服务接口对应的服务名映射集合
    return metadataReport.getServiceAppMapping(serviceInterface, mappingListener, url);
}

获取元数据中实例时,首先获取随机注册中心id对应的元数据中心实例,如果没有则从metadataReports列表获取第一个元数据中心实例。

/**
 * MetadataReportInstance的方法
 *
 * @param registryKey 注册中心id
 * @return 元数据中心实例
 */
public MetadataReport getMetadataReport(String registryKey) {
    //首先获取注册中心id对应的元数据中心实例
    MetadataReport metadataReport = metadataReports.get(registryKey);
    //如果没有则从metadataReports列表获取第一个元数据中心实例
    if (metadataReport == null && metadataReports.size() > 0) {
        metadataReport = metadataReports.values().iterator().next();
    }
    return metadataReport;
}
4.3.2.3 ZookeeperMetadataReport#getServiceAppMapping获取服务映射

如果元数据中心是zookeeper,那么对应ZookeeperMetadataReport。他的getServiceAppMapping方法很简单,构建节点路径默认 dubbo/mapping/{serviceKey},例如/dubbo/mapping/org.apache.dubbo.demo.DemoService,然后注册监听器监听该节点的变化,最后获取获取节点的内容,也就是服务名映射字符串,并且通过,拆分为set集合。

/**
 * ZookeeperMetadataReport的方法
 * <p>
 * 从元数据中心远程拉取获取服务接口对应的服务名映射集合并且注册监听
 *
 * @param serviceKey 服务接口
 * @param listener   监听器MappingListener
 * @param url        消费者url
 * @return 服务接口对应的服务名映射集合
 */
@Override
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
    //构建节点路径,默认 dubbo/mapping/{serviceKey},例如
    String path = buildPathKey(DEFAULT_MAPPING_GROUP, serviceKey);
    //监听节点变化
    MappingDataListener mappingDataListener = casListenerMap.computeIfAbsent(path, _k -> {
        MappingDataListener newMappingListener = new MappingDataListener(serviceKey, path);
        //添加监听器
        zkClient.addDataListener(path, newMappingListener);
        return newMappingListener;
    });
    mappingDataListener.addListener(listener);
    //获取节点的内容,也就是服务名映射字符串,通过,拆分为set集合
    return getAppNames(zkClient.getContent(path));
}

我们在应用级服务提供者注册的时候就讲过了服务映射数据在元数据中心的样子,现在再来看看。


可以很明显的看出来所谓的服务映射是什么意思,也就是一个服务接口到对应的服务名的关系节点。有了服务映射,那么应用级消费者就能通过服务接口来查询对应的服务应用名了,这在consumer应用级服务发现的时候很有用。

5 总结

本次我们学习了Dubbo3的应用级服务发现订阅refreshServiceDiscoveryInvoker方法的源码,下文我们将会学习应用级服务发现订阅后半部分的源码,即在获取到服务应用名之后,通过subscribeURLs方法进行应用级别的服务url订阅的源码。