基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
上文我们学习了Dubbo3的应用级服务发现订阅refreshServiceDiscoveryInvoker方法的源码,下面我们将会学习应用级服务发现订阅后半部分的源码,即在获取到服务应用名之后,通过subscribeURLs方法进行应用级别的服务url订阅的源码。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
1subscribeURLs订阅应用级服务url
在获取到服务应用名之后,该方法进行应用级别的服务url订阅。
- 从缓存serviceListeners中根据服务名获取对应ServiceInstancesChangedListener服务实例变更监听器,如果没有则创建。每个服务应用下可以有多个服务接口,以及多个服务实例。
- 从注册中心获取服务应用名下面的所有服务实例的信息ServiceInstance,例如服务名、ip、端口等基本信息。即/services/{serviceName}下面的所有节点。
- 如果找到了服务实例,那么立即执行ServiceInstancesChangedListener#onEvent触发监听器通知事件,将会继续执行应用级服务发现订阅,对服务实例按照元数据版本revision分组并获取服务实例的元数据信息。
- 如果已创建或者创建ServiceInstancesChangedListener成功,那么直接调用serviceInstancesChangedListener#addListenerAndNotify方法添加监听器并且通知ServiceDiscoveryRegistryDirectory。
每个服务应用下可能有多个服务实例,每个服务实例下面可能有多个服务接口。而由于serviceListeners缓存key为服务名,所以说,对于同服务应用名下的多个实例的多个服务接口的引用,将无需再次创建ServiceInstancesChangedListener,也就无需再次获取该服务名的实例信息以及获取服务的元数据信息,提升了服务发现速度。
/**
* ServiceDiscoveryRegistry的方法
* <p>
* 应用级别的服务url订阅
*
* @param url 订阅者url
* @param listener 通知监听器,ServiceDiscoveryRegistryDirectory
* @param serviceNames 订阅服务名集合
*/
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
//将服务名通过,拼接
String serviceNamesKey = toStringKeys(serviceNames);
//服务key, {group}/{serviceInterface}:{version}
String serviceKey = url.getServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener 注册ServiceInstancesChangedListener
//服务订阅锁
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
//加锁
appSubscriptionLock.lock();
//从缓存获取服务实例变更监听器,每个服务名对应同一个serviceInstancesChangedListener
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
/*
* 创建一个ServiceInstancesChangedListener,根据服务应用名拉取服务实例信息,并且触发服务实例改变事件
*/
if (serviceInstancesChangedListener == null) {
//创建当前服务名的监听器ServiceInstancesChangedListener
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
//遍历服务提供者应用名
for (String serviceName : serviceNames) {
/*
* 从注册中心获取服务应用名下面的所有服务实例的信息,例如服务名、ip、端口等基本信息
*/
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
//如果找到了服务实例
if (CollectionUtils.isNotEmpty(serviceInstances)) {
/*
* 手动触发监听器通知事件,将会继续执行应用级服务发现订阅
*/
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
//存入缓存,每个服务名对应同一个serviceInstancesChangedListener
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
/*
* serviceInstancesChangedListener已创建完毕或者已存在的情况
*/
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
//添加为服务监听器
listener.addServiceListener(serviceInstancesChangedListener);
/*
* 添加监听器并且通知ServiceDiscoveryRegistryDirectory
*/
serviceInstancesChangedListener.addListenerAndNotify(url, listener);
//为注册中心对应节点注册监听器
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
2 serviceDiscovery.getInstances获取服务实例信息
对于新创建的ServiceInstancesChangedListener,将会通过该方法根据查询到的服务实例名字去注册中心获取对应的服务名下面的全部实例的基本信息,例如服务名、ip、端口、服务元数据等基本信息,具体路径为/services/{serviceName}。
以zookeeper作为注册中心为例子,服务实例在注册中心的节点和基本信息结构如下。请注意,到现在,我们已经获取到了全部的服务实例的信息,但是仍然没有获取到服务接口的信息。
3 ServiceInstancesChangedListener#onEvent服务实例改变事件
当服务实例发生改变时,例如发现新实例时,将会触发该通知, 继续执行应用级服务发现订阅。该方法的代码非常的多,重要的步骤为:
- 调用refreshInstance方法,刷新内存中的服务实例数据,将实例信息存入ServiceInstancesChangedListener内部的allInstances缓存。
- 遍历所有服务名对应的实例列表,基于revision,将实例进行分组。
- 对于同一组甚至不同组的应用服务实例,很多情况下暴露的接口其实都是一样的,在进行服务(Instance)与接口(Interface)映射的时候会有许多重复的冗余数据。因此可以使用类似对元数据信息进行 MD5 计算的方式来对实例本身加上版本号revision,如果多个实例的版本号revision一致可以认为它们的元数据信息也一致。那么只需要随机选择一台实例来获取服务接口的元数据信息即可,这样可以减少元数据中心的数据冗余量,减轻压力。
- 获取所有不同revision的服务实例的MetadataInfo元数据信息,MetadataInfo内部包含应用级服务导出的所有服务接口信息。这里将的获取方式默认local类型,会先远程引入MatadataService服务并发起远程rpc调用getMetadataInfo方法获取服务元数据信息。随后初始化服务元数据信息。
- 最终构建 服务接口全路径名 到 ProtocolServiceKeyWithUrls的映射map并设置为serviceUrls属性,ProtocolServiceKeyWithUrls中包含protocol、interfaceName、group、version,以及urls集合,一般而言多少个服务实例,就有多少个url用于。serviceUrls后续继续应用级服务发现。
/**
* ServiceInstancesChangedListener的方法
* <p>
* 服务实例改变事件
*/
public void onEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
//调用doOnEvent
doOnEvent(event);
}
private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
/*
* 1 刷新内存中的服务实例数据
* 将实例信息存入ServiceInstancesChangedListener内部的allInstances缓存
*/
refreshInstance(event);
if (logger.isDebugEnabled()) {
logger.debug(event.getServiceInstances().toString());
}
//实例版本号对应的实例
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
//本地服务版本号
Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
// grouping all instances of this app(service name) by revision
/*
* 2 遍历所有服务名对应的实例列表,基于revision,将实例进行分组
*
* 对于同一组甚至不同组的应用服务实例,很多情况下暴露的接口其实都是一样的,在进行服务(Instance)与接口(Interface)映射的时候会有许多重复的冗余数据。
* 因此可以使用类似对元数据信息进行 MD5 计算的方式来对实例本身加上版本号revision,如果多个实例的版本号revision一致可以认为它们的元数据信息也一致
* 那么只需要随机选择一台实例来获取服务接口的元数据信息即可,这样可以减少元数据中心的数据冗余量,减轻压力
*/
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
//获取该实例的元数据版本号
String revision = getExportedServicesRevision(instance);
if (revision == null || EMPTY_REVISION.equals(revision)) {
if (logger.isDebugEnabled()) {
logger.debug("Find instance without valid service metadata: " + instance.getAddress());
}
continue;
}
//基于revision,将实例进行分组
List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
}
}
/*
* 3 获取所有不同revision的服务实例的MetadataInfo元数据信息
*/
// get MetadataInfo with revision
for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
//revision
String revision = entry.getKey();
//同一个revision的多个服务实例
List<ServiceInstance> subInstances = entry.getValue();
//多个实例的版本号revision一致可以认为它们的元数据信息也一致,选择第一台实例来获取服务接口的元数据信息即可
MetadataInfo metadata = subInstances.stream()
//尝试获取服务元数据信息
.map(ServiceInstance::getServiceMetadata)
.filter(Objects::nonNull)
.filter(m -> revision.equals(m.getRevision()))
.findFirst()
//如果没有服务元数据信息serviceMetadata,那么通过ServiceDiscovery#getRemoteMetadata方法获取远程元数据信息
.orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances));
/*
* 解析元数据
* 将解析后的结果存入localServiceToRevisions,key为MetadataInfo的所有ServiceInfo,value为目前的revision的set集合
*/
parseMetadata(revision, metadata, localServiceToRevisions);
// update metadata into each instance, in case new instance created.
//如果创建了新实例,则将元数据更新到每个服务实例中
for (ServiceInstance tmpInstance : subInstances) {
//获取ServiceMetadata
MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
//将获取的metadata设置到服务实例的serviceMetadata属性中
tmpInstance.setServiceMetadata(metadata);
}
}
}
//没有元数据信息的处理
int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) {// retry every 10 seconds
hasEmptyMetadata = true;
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("Error submitting async retry task.");
}
logger.warn("Address refresh try task submitted");
}
// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
// 1-17 - Address refresh failed.
logger.error("1-17", "metadata Server failure", "",
"Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");
return;
}
}
hasEmptyMetadata = false;
///协议到 端口号到 版本号集合 到 服务接口url的映射
Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
//服务接口到 ProtocolServiceKeyWithUrls的映射
Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
//遍历服务接口信息到版本号的映射map
for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
//服务接口信息
ServiceInfo serviceInfo = entry.getKey();
//版本号集合
Set<String> revisions = entry.getValue();
//协议到 版本号集合到服务接口url的映射
Map<Integer, Map<Set<String>, Object>> portToRevisions = protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
//版本号到服务接口url的映射
Map<Set<String>, Object> revisionsToUrls = portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
Object urls = revisionsToUrls.get(revisions);
if (urls == null) {
//获取服务接口对应的url,一般而言多少个服务实例,就有多少个url
urls = getServiceUrlsCache(revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort());
revisionsToUrls.put(revisions, urls);
}
//
List<ProtocolServiceKeyWithUrls> list = newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
}
//设置为新的服务url映射
this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
}
最终,从provider获取的服务元数据解析成的serviceUrls的结构如下(两个服务提供者实例的情况下):
3.1 refreshInstance刷新内存服务实例
该方法将appName及其实例列表存入监听器ServiceInstancesChangedListener内部的allInstances缓存映射中。
/**
* ServiceInstancesChangedListener的方法
*
* 刷新内存中的服务实例数据
* @param event
*/
private void refreshInstance(ServiceInstancesChangedEvent event) {
//失败时的重试任务忽略
if (event instanceof RetryServiceInstancesChangedEvent) {
return;
}
//获取服务名
String appName = event.getServiceName();
//获取全部服务实例
List<ServiceInstance> appInstances = event.getServiceInstances();
logger.info("Received instance notification, serviceName: " + appName + ", instances: " + appInstances.size());
//通过ServiceInstanceNotificationCustomizer自定义服务实例,默认实现为SpringCloudServiceInstanceNotificationCustomizer,一般用于SPRING_CLOUD
for (ServiceInstanceNotificationCustomizer serviceInstanceNotificationCustomizer : serviceInstanceNotificationCustomizers) {
serviceInstanceNotificationCustomizer.customize(appInstances);
}
//将appName及其实例列表存入监听器内部的allInstances缓存映射中
allInstances.put(appName, appInstances);
//更新最新刷新时间戳
lastRefreshTime = System.currentTimeMillis();
}
3.2 AbstractServiceDiscovery#getRemoteMetadata获取远程服务接口配置元数据信息
如果服务实例中带有服务元数据信息serviceMetadata,那么获取第一条serviceMetadata,但通常都没有。如果没有服务元数据信息serviceMetadata,那么通过ServiceDiscovery#getRemoteMetadata方法获取远程元数据信息。
假设注册中心为zookeeper,那么对应的ServiceDiscovery为ZookeeperServiceDiscovery, getRemoteMetadata方法是通过父类AbstractServiceDiscovery方法实现的。
/**
* AbstractServiceDiscovery的方法
*
* 获取服务元数据信息
*
* @param revision 元数据版本
* @param instances 服务实例列表
* @return
*/
@Override
public MetadataInfo getRemoteMetadata(String revision, List<ServiceInstance> instances) {
//首先尝试从本地缓存获取元数据信息
MetadataInfo metadata = metaCacheManager.get(revision);
//返回缓存的数据
if (metadata != null && metadata != MetadataInfo.EMPTY) {
metadata.init();
// metadata loaded from cache
if (logger.isDebugEnabled()) {
logger.debug("MetadataInfo for revision=" + revision + ", " + metadata);
}
return metadata;
}
//加锁
synchronized (metaCacheManager) {
// try to load metadata from remote.
int triedTimes = 0;
//最多拉取3次
while (triedTimes < 3) {
/*
* 从远程获取服务元数据信息
*/
metadata = MetadataUtils.getRemoteMetadata(revision, instances, metadataReport);
//获取成功
if (metadata != MetadataInfo.EMPTY) {// succeeded
//初始化
metadata.init();
break;
} else {// failed
//获取失败
if (triedTimes > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Retry the " + triedTimes + " times to get metadata for revision=" + revision);
}
}
//尝试次数自增
triedTimes++;
try {
//睡眠1s之后继续尝试
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
if (metadata == MetadataInfo.EMPTY) {
logger.error("Failed to get metadata for revision after 3 retries, revision=" + revision);
} else {
metaCacheManager.put(revision, metadata);
}
}
return metadata;
}
3.2.1 MetadataUtils#getRemoteMetadata获取远程接口配置元数据
该方法用于获取远程服务的元数据信息,根据服务类型的不同会有不同的获取逻辑,大概步骤为:
- 由于服务的列表的元数据版本revision相同,那么通过selectInstance方法随机选取一个服务实例。
- 获取服务元数据类型metadataType,这是应用级服务发现接口配置元数据传递方式,是以 Provider 视角而言的,Consumer 侧配置无效,默认值local。可选值有:
- 如果是remote类型,表示Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取。
- 那么通过MetadataUtils#getMetadata方法从远程元数据中心获取。
- 如果是local类型,表示Provider 把 metadata 放在本地,Consumer 从 Provider 处直接获取,相当于远程调用一次服务提供者的rpc方法获取元数据;
- 那么首先调用MetadataUtils.referProxy方法,基于服务实例引入给定服务提供者的MetadataService服务。
- 随后通过MetadataService服务代理对象调用远程服务getMetadataInfo方法获取服务元数据信息。
- 如果是remote类型,表示Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取。
/**
* MetadataUtils的方法
* <p>
* 从远程获取服务元数据信息
*
* @param revision 元数据版本
* @param instances 服务实例列表
* @param metadataReport 元数据中心
* @return
*/
public static MetadataInfo getRemoteMetadata(String revision, List<ServiceInstance> instances, MetadataReport metadataReport) {
//随机选取一个服务实例
ServiceInstance instance = selectInstance(instances);
//元数据类型,local 或 remote,默认local
//local表示元数据存放在本地,consumer将从Provider获取
//remote表示元数据存放在元数据中心,consumer将从元数据中心获取
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
MetadataInfo metadataInfo;
try {
if (logger.isDebugEnabled()) {
logger.debug("Instance " + instance.getAddress() + " is using metadata type " + metadataType);
}
//如果是remote
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
metadataInfo = MetadataUtils.getMetadata(revision, instance, metadataReport);
}
//如果是local,这是默认类型
else {
// change the instance used to communicate to avoid all requests route to the same instance
//更改用于通信的实例,以避免所有请求路由到同一实例
ProxyHolder proxyHolder = null;
try {
/*
* 引入给定服务提供者的MetadataService服务
*/
proxyHolder = MetadataUtils.referProxy(instance);
/*
* 通过MetadataService服务代理对象调用远程服务获取服务元数据信息
*/
metadataInfo = proxyHolder.getProxy().getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
} finally {
//销毁代理
MetadataUtils.destroyProxy(proxyHolder);
}
}
} catch (Exception e) {
logger.error("Failed to get app metadata for revision " + revision + " for type " + metadataType + " from instance " + instance.getAddress(), e);
metadataInfo = null;
}
if (metadataInfo == null) {
metadataInfo = MetadataInfo.EMPTY;
}
return metadataInfo;
}
3.2.2 MetadataUtils.referProxy引入MetadataService服务
dubbo服务提供者在启动的时候,除了导出我们手动编写的业务服务之后,还会在内部自动导出一个MetadataService服务,提供了专门用于获取对应服务的元数据信息的接口,这个服务我们一般是用不到的,一般在内部服务之间调用。
该方法将会对给定服务提供者实例的MetadataService服务进行引入。
- 通过protocol.refer方法根据url获取MetadataService的invoker。内部是调用DubboProtocol#refer方法,即进行远程服务引用,创建DubboInvoker。DubboInvoker将包含一个nettyClient用于发起远程rpc请求。
- 随后通过proxyFactory.getProxy方法根据invoker创建服务接口代理对象,这样通过代理对象就能发起rpc请求了。
可以看到,MetadataService也是走的通用服务引入逻辑,但是没有了directory和cluster多余的封装。
/**
* MetadataUtils的方法
* <p>
* 引入给定服务提供者的MetadataService服务
*
* @param instance 服务提供者实例
* @return MetadataService服务代理持有者
*/
public static ProxyHolder referProxy(ServiceInstance instance) {
MetadataServiceURLBuilder builder;
ExtensionLoader<MetadataServiceURLBuilder> loader = instance.getApplicationModel()
.getExtensionLoader(MetadataServiceURLBuilder.class);
//获取元数据
Map<String, String> metadata = instance.getMetadata();
// METADATA_SERVICE_URLS_PROPERTY_NAME is a unique key exists only on instances of spring-cloud-alibaba.
//Dubbo Spring Cloud支持
String dubboUrlsForJson = metadata.get(METADATA_SERVICE_URLS_PROPERTY_NAME);
if (metadata.isEmpty() || StringUtils.isEmpty(dubboUrlsForJson)) {
//默认逻辑,获取StandardMetadataServiceURLBuilder
builder = loader.getExtension(StandardMetadataServiceURLBuilder.NAME);
} else {
builder = loader.getExtension(SpringCloudMetadataServiceURLBuilder.NAME);
}
//基于服务提供者实例,构建MetadataService的url信息
List<URL> urls = builder.build(instance);
if (CollectionUtils.isEmpty(urls)) {
throw new IllegalStateException("Introspection service discovery mode is enabled "
+ instance + ", but no metadata service can build from it.");
}
//获取第一个url即可
//url格式例如:dubbo://10.253.45.126:20881/org.apache.dubbo.metadata.MetadataService?connections=1&corethreads=2&dubbo=2.0.2&group=demo-provider&port=20881&protocol=dubbo&retries=0&side=provider&threadpool=cached&threads=100&timeout=5000&version=1.0.0
URL url = urls.get(0);
// Simply rely on the first metadata url, as stated in MetadataServiceURLBuilder.
ApplicationModel applicationModel = instance.getApplicationModel();
ModuleModel internalModel = applicationModel.getInternalModule();
//消费者模型
ConsumerModel consumerModel = applicationModel.getInternalModule().registerInternalConsumer(MetadataService.class, url);
//协议
Protocol protocol = applicationModel.getExtensionLoader(Protocol.class).getAdaptiveExtension();
url.setServiceModel(consumerModel);
/*
* 根据url获取MetadataService的invoker,内部是调用DubboProtocolrefer方法,即进行远程服务引用
* invoker将包含nettyClient用于发起远程rpc请求
*/
Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, url);
//代理工厂
ProxyFactory proxyFactory = applicationModel.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/*
* 通过invoker生成服务接口代理对象
*/
MetadataService metadataService = proxyFactory.getProxy(invoker);
consumerModel.getServiceMetadata().setTarget(metadataService);
consumerModel.getServiceMetadata().addAttribute(PROXY_CLASS_REF, metadataService);
consumerModel.setProxyObject(metadataService);
consumerModel.initMethodModels();
//返回ProxyHolder,内部持有引入的服务接口代理对象
return new ProxyHolder(consumerModel, metadataService, internalModel);
}
3.2.3 provider接收getMetadataInfo请求
上面consumer通过远程rpc请求调用provider的MetadataService#getMetadataInfo方法,真正的调用链路现在不讲后面会讲,现在我们来简单看看provider的MetadataService#getMetadataInfo方法的具体实现。
provider的MetadataService接口的实现类为MetadataServiceDelegation,下面是它的getMetadataInfo方法实现。
从该方法的源码可知,Consumer 从 Provider 处直接获取的MetadataInfo实际上就是从Provider端中的所有应用级服务注册表ServiceDiscoveryRegistry内部的ServiceDiscovery,例如ZookeeperServiceDiscovery中找到revision版本号一致的那个ServiceDiscovery,然后返回内部的MetadataInfo元数据对象。
在此前学习provider服务导出时,在讲到生产者register服务注册的时候,我们就讲过对于应用级服务注册ServiceDiscoveryRegistry的注册,内部最终是调用ServiceDiscovery的register方法完成注册。假设注册中心协议为zookeeper,那么serviceDiscovery就是ZookeeperServiceDiscovery。
而ZookeeperServiceDiscovery的register方法实际上是AbstractServiceDiscovery的register方法实现,内部调用metadataInfo#addService方法。
metadataInfo#addService方法会根据服务提供者url创建ServiceInfo并添加到services集合中,随后将url加入到metadataInfo内部的exportedServiceURLs缓存中,最后将更新标识位updated改为true。
到这里,我们的服务引入就和此前学习的服务导出对应上了!
/**
* MetadataServiceDelegation的方法
* <p>
* 根据版本获取服务元数据信息
*
* @param revision 版本号
* @return 服务元数据信息
*/
@Override
public MetadataInfo getMetadataInfo(String revision) {
if (StringUtils.isEmpty(revision)) {
return null;
}
//获取当前provider内存中的所有应用级服务注册表ServiceDiscoveryRegistry内部的ServiceDiscovery,例如ZookeeperServiceDiscovery
for (ServiceDiscovery sd : registryManager.getServiceDiscoveries()) {
//获取ServiceDiscovery内部的metadataInfo
MetadataInfo metadataInfo = sd.getLocalMetadata();
//如果版本号一致,那么返回该metadataInfo
if (revision.equals(metadataInfo.getRevision())) {
return metadataInfo;
}
}
if (logger.isWarnEnabled()) {
logger.warn("metadata not found for revision: " + revision);
}
return null;
}
3.2.4 Metadata#init初始化元数据信息
当MetadataInfo在用于RPC调用之前从消费者端的反序列化创建时,需要初始化。主要是初始化metadata内部的services中的serviceInfo,serviceInfo是在provider端根据服务提供者url创建的。
/**
* MetadataInfo的方法
*
* Initialize is needed when MetadataInfo is created from deserialization on the consumer side before being used for RPC call.
* 当MetadataInfo在用于RPC调用之前从消费者端的反序列化创建时,需要初始化。
*/
public void init() {
if (!initiated.compareAndSet(false, true)) {
return;
}
if (CollectionUtils.isNotEmptyMap(services)) {
//serviceInfo是在provider端根据服务提供者url创建的
services.forEach((_k, serviceInfo) -> {
//初始化serviceInfo
serviceInfo.init();
// create duplicate serviceKey(without protocol)->serviceInfo mapping to support metadata search when protocol is not specified on consumer side.
//创建重复的serviceKey(不带协议)->serviceInfo映射,以支持在消费者端未指定协议时的元数据搜索。
if (subscribedServices == null) {
subscribedServices = new HashMap<>();
}
//key为ServiceKey,,value为ServiceInfo的set集合
Set<ServiceInfo> serviceInfos = subscribedServices.computeIfAbsent(serviceInfo.getServiceKey(), _key -> new HashSet<>());
serviceInfos.add(serviceInfo);
});
}
}
ServiceInfo#init方法在消费者端反序列化之后立即初始化必要的缓存。
/**
* ServiceInfo的方法
* 在消费者端反序列化之后立即初始化必要的缓存
* Initialize necessary caches right after deserialization on the consumer side
*/
protected void init() {
//构建matchKey,格式为:{group}/{serviceInterface}:{version}:{protocol}
buildMatchKey();
//构建serviceKey,格式为:{group}/{serviceInterface}:{version}
buildServiceKey(name, group, version);
// init method params
//初始化服务接口内部的方法的参数,参数从params中获取,因此不一定准确
this.methodParams = URLParam.initMethodParameters(params);
// Actually, consumer params is empty after deserialized on the consumer side, so no need to initialize.
// Check how InstanceAddressURL operates on consumer url for more detail.
// this.consumerMethodParams = URLParam.initMethodParameters(consumerParams);
// no need to init numbers for it's only for cache purpose
}
3.3 parseMetadata解析服务元数据
在获取到服务元数据之后,将会解析服务元数据,将解析后的结果存入localServiceToRevisions,key为MetadataInfo的所有ServiceInfo,value为目前的revision的set集合。
/**
* ServiceInstancesChangedListener的方法
*
* 解析服务元数据
*
* @param revision 元数据版本号
* @param metadata 服务元数据实例
* @param localServiceToRevisions 本地服务到版本号的映射
* @return
*/
protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
//获取元数据内部的全部服务接口信息
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
//遍历服务接口
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
//key为每个服务接口信息,value是当前的revision的set集合
Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), _k -> new TreeSet<>());
set.add(revision);
}
//返回localServiceToRevisions
return localServiceToRevisions;
}
3.4 getServiceUrlsCache获取服务提供者url缓存
在构建serviceUrls的时候,ProtocolServiceKeyWithUrls内部的urls通过getServiceUrlsCache方法获取。
获取服务接口对应的服务提供者url,一般而言多少个服务实例,就有多少个url。注意:最终获取到的url的port都会被设置为当前服务接口信息的port,但是内部的MetadataInfo是服务实例自身的数据,每个url内部还持有对应的服务实例。即如果两个服务端口分别为20880和20881,但是选择20880的服务获取元数据,那么最终获取的url的port都会被设置为20881,这将会影响后面将url转换为invoker的时候的nettyClient的选择或者创建。
最底层的urls集合中的url如下(两个服务提供者的情况下):
[DefaultServiceInstance{serviceName='demo-provider', host='10.253.45.126', port=20881, enabled=true, healthy=true, metadata={dubbo.endpoints=[{"port":81,"protocol":"rest"},{"port":20881,"protocol":"dubbo"},{"port":50052,"protocol":"tri"}], dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20881","protocol":"dubbo"}, dubbo.metadata.revision=4133cb1337c40592501d4c0ee6523637, dubbo.metadata.storage-type=local, timestamp=1667889947956}}, null, DefaultServiceInstance{serviceName='demo-provider', host='10.253.45.126', port=20881, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20880","protocol":"dubbo"}, dubbo.endpoints=[{"port":80,"protocol":"rest"},{"port":20880,"protocol":"dubbo"},{"port":50051,"protocol":"tri"}], dubbo.metadata.revision=4133cb1337c40592501d4c0ee6523637, dubbo.metadata.storage-type=local, timestamp=1667895079773}}, null]
/**
* ServiceInstancesChangedListener的方法
*
* 获取服务url缓存
*
* @param revisionToInstances 版本号到服务实例的映射
* @param revisions 版本号
* @param protocol 服务接口信息的协议
* @param port 服务接口信息的端口
* @return
*/
protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol, int port) {
List<URL> urls = new ArrayList<>();
for (String r : revisions) {
//遍历服务实例
for (ServiceInstance i : revisionToInstances.get(r)) {
//如果端口大于0
if (port > 0) {
if (i.getPort() == port) {
//如果当前服务实例的端口和服务接口信息的端口一致,说明服务接口信息就是从当前服务实例获取的
//格式例如:DefaultServiceInstance{serviceName='demo-provider', host='10.253.45.126', port=20880, enabled=true, healthy=true, metadata={dubbo.endpoints=[{"port":80,"protocol":"rest"},{"port":20880,"protocol":"dubbo"},{"port":50051,"protocol":"tri"}], dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20880","protocol":"dubbo"}, dubbo.metadata.revision=4133cb1337c40592501d4c0ee6523637, dubbo.metadata.storage-type=local, timestamp=1667895079773}}, null
urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel()));
} else {
//端口不一致,说明是统一饿服务集群的其他服务实例,将会替换port为当前port
//格式例如:DefaultServiceInstance{serviceName='demo-provider', host='10.253.45.126', port=20880, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20881","protocol":"dubbo"}, dubbo.endpoints=[{"port":81,"protocol":"rest"},{"port":20881,"protocol":"dubbo"},{"port":50052,"protocol":"tri"}], dubbo.metadata.revision=4133cb1337c40592501d4c0ee6523637, dubbo.metadata.storage-type=local, timestamp=1667889947956}}, null
urls.add(((DefaultServiceInstance) i).copyFrom(port).toURL(protocol).setScopeModel(i.getApplicationModel()));
}
continue;
}
// different protocols may have ports specified in meta
if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
if (endpoint != null && endpoint.getPort() != i.getPort()) {
urls.add(((DefaultServiceInstance) i).copyFrom(endpoint).toURL(endpoint.getProtocol()));
continue;
}
}
urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel()));
}
}
return urls;
}
4 addListenerAndNotify通知监听器
已创建或者创建ServiceInstancesChangedListener成功,那么调用serviceInstancesChangedListener.addListenerAndNotify方法添加监听器并且通知ServiceDiscoveryRegistryDirectory。
- 该方法首先为当前服务接口添加监听器。
- 然后调用getAddresses方法获取当前服务消费者与服务提供者匹配的服务提供者url集合。
- 最后调用ServiceDiscoveryRegistryDirectory#notify方法进行通知。
/**
* ServiceInstancesChangedListener的方法
* <p>
* 添加监听器并进行通知
*
* @param url 订阅者url
* @param listener 通知监听器,ServiceDiscoveryRegistryDirectory
*/
public synchronized void addListenerAndNotify(URL url, NotifyListener listener) {
if (destroyed.get()) {
return;
}
//存入通知监听器,key为当前服务接口
Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(url.getServiceKey(), _k -> new ConcurrentHashSet<>());
//获取消费者url协议,默认consumer
String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY, url.getProtocol());
//构建ProtocolServiceKey
ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(url.getServiceInterface(), url.getVersion(), url.getGroup(),
!CommonConstants.CONSUMER.equals(protocol) ? protocol : null); //consumer协议设置为null
//创建新监听器加入集合
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, listener);
notifyListeners.add(listenerWithKey);
// Aggregate address and notify on subscription.
/*
* 获取服务消费者与服务提供者匹配的服务提供者url集合
*/
List<URL> urls = getAddresses(protocolServiceKey, listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urls)) {
logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", protocolServiceKey, listener, urls.size()));
/*
* 进行通知,ServiceDiscoveryRegistryDirectory#notify
*/
listener.notify(urls);
}
}
4.1 getAddresses获取匹配的地址
该方法获取服务消费者与服务提供者匹配的服务提供者url集合,此前我们讲过的获取服务元数据的时候构建的serviceUrls在这里就派上了用场。
只有服务消费者的ProtocolServiceKey和服务提供者的ProtocolServiceKey的interface、version、group、protocol都匹配,才算作匹配。
/**
* ServiceInstancesChangedListener的方法
* <p>
* 获取服务消费者与服务提供者匹配的服务提供者url集合
*
* @param protocolServiceKey 消费者url协议构建的protocolServiceKey
* @param consumerURL 消费者url协议
* @return 服务消费者与服务提供者匹配的服务提供者url集合
*/
protected List<URL> getAddresses(ProtocolServiceKey protocolServiceKey, URL consumerURL) {
//根据接口名从serviceUrls获取全部版本的ProtocolServiceKeyWithUrls
List<ProtocolServiceKeyWithUrls> protocolServiceKeyWithUrlsList = serviceUrls.get(protocolServiceKey.getInterfaceName());
List<URL> urls = new ArrayList<>();
if (protocolServiceKeyWithUrlsList != null) {
//遍历
for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls : protocolServiceKeyWithUrlsList) {
//如果服务消费者的ProtocolServiceKey和服务提供者的ProtocolServiceKey匹配
//要求interface、version、group、protocol都必须匹配
if (ProtocolServiceKey.Matcher.isMatch(protocolServiceKey, protocolServiceKeyWithUrls.getProtocolServiceKey())) {
//如果匹配成功,那么获取服务提供者urls加入到urls集合汇总
urls.addAll(protocolServiceKeyWithUrls.getUrls());
}
}
}
//如果包含通配符,那么也加入到urls集合
if (serviceUrls.containsKey(CommonConstants.ANY_VALUE)) {
for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls : serviceUrls.get(CommonConstants.ANY_VALUE)) {
urls.addAll(protocolServiceKeyWithUrls.getUrls());
}
}
return urls;
}
5 ServiceDiscoveryRegistryDirectory#notify通知服务更新
当第一次订阅服务节点或者服务节点更新时,例如新的producer上下线,将会调用notify服务通知更新的方法,会更新本地缓存的数据。这里和接口级别服务发现是同样的逻辑,这也代表着我们的应用级的服务发现终于走到了最底层,他们的方法代码都差不多,这里我们简单介绍下。
可以看到,该方法内部同样是调用refreshInvoker方法刷新invoker。
/**
* ServiceDiscoveryRegistryDirectory的方法
* <p>
* 服务变更通知
*
* @param instanceUrls 服务提供者实例列表
*/
@Override
public synchronized void notify(List<URL> instanceUrls) {
if (isDestroyed()) {
return;
}
// Set the context of the address notification thread.
RpcServiceContext.getServiceContext().setConsumerUrl(getConsumerUrl());
// 3.x added for extend URL address 3.x 添加扩展URL地址
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
//获取AddressListener,默认空集合
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
instanceUrls = addressListener.notify(instanceUrls, getConsumerUrl(), this);
}
}
/*
* 将服务提供者url转换为invoker,进行服务提供者的更新
*/
refreshOverrideAndInvoker(instanceUrls);
}
/**
* ServiceDiscoveryRegistryDirectory的方法
* <p>
* 将服务提供者url转换为invoker,进行服务提供者的更新
*
* @param instanceUrls 服务提供者实例列表
*/
// RefreshOverrideAndInvoker will be executed by registryCenter and configCenter, so it should be synchronized.
private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) {
// mock zookeeper://xxx?mock=return null
refreshInvoker(instanceUrls);
}
5.1 refreshInvoker刷新invoker
该方法将服务提供者实例url转换为invoker,进行服务提供者的更新,这在consumer对producer的信息更新部分是非常重要的一个方法。
该方法的源码和接口级别的服务发现RegistryDirectory#refreshInvoker方法源码差不多,在此不再赘述。
/**
* ServiceDiscoveryRegistryDirectory的方法
* <p>
* 将服务提供者实例url转换为invoker,进行服务提供者的更新
*
* @param invokerUrls 服务提供者实例列表
*/
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null, use EMPTY url to clear current addresses.");
//保留原始url
this.originalUrls = invokerUrls;
//如果只有一个协议为empty的url,表示没有找到任何该服务提供者url信息
if (invokerUrls.size() == 1 && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
logger.warn("Received url with EMPTY protocol, will clear all available addresses.");
//设置为禁止访问
this.forbidden = true; // Forbid to access
//设置routerChain的服务提供者invoker集合为一个空集合
routerChain.setInvokers(BitList.emptyList());
//关闭urlInvokerMap中的所有服务提供者invoker
destroyAllInvokers(); // Close all invokers
}
//表明存在服务提供者实例url
else {
//允许访问
this.forbidden = false; // Allow accessing
if (CollectionUtils.isEmpty(invokerUrls)) {
logger.warn("Received empty url list, will ignore for protection purpose.");
return;
}
// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
//使用本地引用来避免NPE。urlInvokerMap将在destroyAllInvokers()方法设置为空。
Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
//不能使用本地引用,因为oldUrlInvokerMap的映射可能会直接在toInvokers()中删除。
Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap = null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
/*
* 将URL转换为Invoker
*/
Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
// pre-route and build cache
//invoker集合存入routerChain的invokers属性
routerChain.setInvokers(this.getInvokers());
//设置urlInvokerMap为新的urlInvokerMap
this.urlInvokerMap = newUrlInvokerMap;
if (oldUrlInvokerMap != null) {
try {
//销毁无用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
// 通知invoker刷新
this.invokersChanged();
}
5.2 toInvokers将URL转换为Invoker
将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。该方法的大概流程和接口级别服务发现RegistryDirectory#toInvokers方法流程差不多,在此不再赘述,但是还有有些区别。
例如,UrlInvokerMap的key变成了ProtocolServiceKeyWithAddress对象,其存储着服务提供者的ProtocolServiceKey和服务提供者实例address(即ip:port)。
而在将新的invoker存入newUrlInvokerMap缓存的时候,会调用ProtocolServiceKeyWithAddress的equals方法比较key是否重复,该方法仅仅会比较address。也就是说,如果存在相同地址但不同端口的服务实例,newUrlInvokerMap将最终只会存储一个条目(因为不同服务实例的url在此前的getServiceUrlsCache方法中被设置为了相同的url)。
/**
* ServiceDiscoveryRegistryDirectory的方法
* <p>
* 将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。
*
* @param oldUrlInvokerMap 此前的url到invoker的映射
* @param urls 最新服务提供者url集合
* @return invokers 最新的url到invoker的映射
*/
private Map<ProtocolServiceKeyWithAddress, Invoker<T>> toInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
//新的映射map
Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
//遍历最新服务提供者url集合,和接口级别的url携带着服务接口信息不同,该URL是指的服务提供者实例url,没有携带服务接口信息
//应用级别的url例如:DefaultServiceInstance{serviceName='demo-provider', host='10.253.45.126', port=20880, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20881","protocol":"dubbo"}, dubbo.endpoints=[{"port":81,"protocol":"rest"},{"port":20881,"protocol":"dubbo"},{"port":50052,"protocol":"tri"}], dubbo.metadata.revision=4133cb1337c40592501d4c0ee6523637, dubbo.metadata.storage-type=local, timestamp=1667980457881}}, service{name='org.apache.dubbo.demo.DemoService',group='null',version='null',protocol='dubbo',port='20880',params={side=provider, application.version=1, methods=sayHello,sayHelloAsync, deprecated=false, dubbo=2.0.2, interface=org.apache.dubbo.demo.DemoService, service-name-mapping=true, timeout=3000, generic=false, delay=5000, application=demo-provider, background=false, dynamic=true, REGISTRY_CLUSTER=registry1, anyhost=true},}
//接口级别的url例如:dubbo://10.253.45.126:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&application.version=1&background=false&category=providers,configurators,routers&check=false&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=10832&service-name-mapping=true&side=provider&sticky=false&timeout=3000&unloadClusterRelated=false
for (URL url : urls) {
InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
continue;
}
//校验是否支持服务提供者实例url的协议,不支持则跳过
//服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃
if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
// 4-1 - Unsupported protocol
logger.error("4-1", "protocol extension does not installed", "", "Unsupported protocol.",
new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
" in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//设置服务提供者优先的属性:release、methods、dubbo、timestamp、dubbo.tag
instanceAddressURL.setProviderFirstParams(providerFirstParams);
// 如果支持配置信息监听,那么尝试用远程配置目录的配置覆盖服务提供者实例url的配置
if (enableConfigurationListen) {
instanceAddressURL = overrideWithConfigurator(instanceAddressURL);
}
//用当前消费者协议服务key,去服务提供者url内部的services中筛选所有可用的服务的ProtocolServiceKey列表(版本通配符、组通配符、协议通配符)
int port = instanceAddressURL.getPort();
List<ProtocolServiceKey> matchedProtocolServiceKeys = instanceAddressURL.getMetadataInfo()
.getMatchedServiceInfos(consumerProtocolServiceKey)
.stream()
.filter(serviceInfo -> serviceInfo.getPort() <= 0 || serviceInfo.getPort() == port)
.map(MetadataInfo.ServiceInfo::getProtocolServiceKey)
.collect(Collectors.toList());
// see org.apache.dubbo.common.ProtocolServiceKey.isSameWith
// check if needed to override the consumer url
//是否包装,默认false
boolean shouldWrap = matchedProtocolServiceKeys.size() != 1 || !consumerProtocolServiceKey.isSameWith(matchedProtocolServiceKeys.get(0));
for (ProtocolServiceKey matchedProtocolServiceKey : matchedProtocolServiceKeys) {
//根据服务提供者的ProtocolServiceKey和服务提供者实例address(即ip:port)构建ProtocolServiceKeyWithAddress
ProtocolServiceKeyWithAddress protocolServiceKeyWithAddress = new ProtocolServiceKeyWithAddress(matchedProtocolServiceKey, instanceAddressURL.getAddress());
//从原来的缓存中获取该protocolServiceKeyWithAddress对应的invoker
Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(protocolServiceKeyWithAddress);
//如果缓存没有该url对应的invoker,或者rul发生了改变,那么将会重新引用该invoker
if (invoker == null || urlChanged(invoker, instanceAddressURL, matchedProtocolServiceKey)) { // Not in the cache, refer again
try {
boolean enabled;
if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
} else {
enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
}
//如果启动服务
if (enabled) {
if (shouldWrap) {
URL newConsumerUrl = customizedConsumerUrlMap.computeIfAbsent(matchedProtocolServiceKey,
k -> consumerUrl.setProtocol(k.getProtocol())
.addParameter(CommonConstants.GROUP_KEY, k.getGroup())
.addParameter(CommonConstants.VERSION_KEY, k.getVersion()));
RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
invoker = new InstanceWrappedInvoker<>(protocol.refer(serviceType, instanceAddressURL), newConsumerUrl, matchedProtocolServiceKey);
} else {
//再次通过Protocol$Adaptive的refer方法引用该服务提供者
//在最开始我们就是通过refer方法引用服务的,在再次见到这个方法,只不过这里的url已经变成了某个服务提供者的url了
invoker = protocol.refer(serviceType, instanceAddressURL);
}
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
//在存入newUrlInvokerMap缓存的时候,会调用protocolServiceKeyWithAddress的equals方法比较,该方法仅仅会比较address。
//也就是说,如果存在相同地址但不同端口的服务实例,newUrlInvokerMap将最终只会存储一个条目(因为不同服务实例的url在此前的getServiceUrlsCache方法中被设置为了相同的url)。
newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
}
} else {
newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
oldUrlInvokerMap.remove(protocolServiceKeyWithAddress, invoker);
}
}
}
return newUrlInvokerMap;
}
6 总结
对于应用级别的服务引入,总体的服务引入过程和接口级别的服务引入是一致的,核心流程都是RegistryProtocol#doCreateInvoker方法实现的,区别是应用级调用的方法中registry参数底层对象为ServiceDiscoveryRegistry类型,directory参数为ServiceDiscoveryRegistryDirectory类型,接口级调用的方法中registry参数底层对象为ZookeeperRegistry类型,directory参数为RegistryDirectory类型。
应用级服务引入的过程中,没有使用注册中心的接口级别的服务注册信息,也就是并没有引用dubbo/{serviceInterface}/providers目录下面的服务提供者接口url,而是通过另一种方式实现的服务引入。
Dubbo2接口级别服务引入和Dubbo3应用级别服务引入构建Invoker的整体流程:
- 接口级别的服务引入:
- 根据consumer引入的服务接口名去注册中心获取该接口下的服务提供者信息url列表(dubbo/{serviceInterface}/providers目录下)。
- 然后根据服务接口提供者url构建invoker,继续后续处理。
- 应用级别的服务引入:
- 根据consumer引入的服务接口名去元数据中心服务映射目录下获取发布了该服务接口的服务名(dubbo/mapping/{serviceInterface}目录下)。
- 然后根据服务名去注册中心获取该服务名下的全部服务实例信息(services/{serviceInterface}目录下)。
- 随后将服务实例根据元数据版本revision分组,revision一致可以认为它们的元数据信息也一致。那么只需要随机选择一台服务实例来获取服务接口元数据信息,这里有两种获取方式,根据服务端的metadataType配置来选择:
- 默认metadataType=local,那么获取元数据信息的时候Consumer 从 Provider 处直接获取,即引入选择的服务实例的MetadataService服务并且发起远程rpc调用getMetadataInfo方法获取服务元数据信息。服务元数据信息内部包含服务的全部接口列表、接口定义、接口级参数配置等信息。
- 如果配置metadataType=remote,那么服务端启动时会将接口配置元数据发布到元数据中心,此时客户端会直接从元数据中心获取。
- 根据服务元数据信息以及服务实例信息构建服务提供者实例urls列表,然后根据服务提供者实例url构建invoker,继续后续处理。
可以看到,Dubbo2接口级别服务引入和Dubbo3应用级别服务引入构建Invoker的整体流程区别还是很大的,至于Dubbo3为什么要使用应用级别服务引入?实际上就是为了减轻注册中心的IO通信和数据量的压力,更够更好的应对超大型的百万级应用服务发现,我们Dubbo3源码学习完毕的最后进行一个最终的总结,到时候会详细说明!