🪁🍁 希望本文能给您带来帮助,如果有任何问题,欢迎批评指正!🐅🐾🍁🐥
导航参见:
Docker:超详细的Docker基本概念介绍、命令总结与实战
Nacos:Nacos和MySQL在Docker上超详细的安装与部署步骤
Nacos:Nacos服务注册与服务发现超详细的源码解析(二)
一、背景
前面有一篇文章介绍了Nacos配置中心的源码内容,那么本篇文章来进入到其服务注册与发现的源码介绍,这几大功能也是它被用在现在微服务开发中的核心原因。为了学习其中很多优秀的设计思想以及应对使用过程中可能遇到的问题,本篇文章来深入解析nacos作为服务注册与发现中心背后的实现原理。
二、环境与依赖
本篇Nacos服务发现与注册中心源码分析文章的服务端环境基于1.4.3版本,Github地址:https://github.com/alibaba/nacos/releases
而客户端环境基于2021.1版:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.1</version>
</dependency>
三、服务注册与服务发现总流程图
服务注册与服务发现的原理大纲:
1. 服务注册:
当微服务实例启动时,它会将自己的信息(如 IP 地址、端口号、服务名称等)注册到注册中心。这通常需要发送一个注册请求到注册中心来完成,会调用 Nacos Server POST /nacos/v1/ns/instance
接口请求来完成注册。注册中心接收到服务实例的注册信息后,会将其存储在注册表中,注册表是注册中心的核心组件,用于保存所有微服务实例的信息。在 Nacos 服务端是存储在下面这样一个数据结构中:
// Map(namespace, Map(group::serviceName, Service))
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
3. 服务发现:
微服务在调用某个服务时,会优先从本地缓存中获取它的信息,如果缓存中没有相关信息,则会发起请求调用 Nacos Server GET /nacos/v1/ns/instance/list
接口完成服务发现。注册中心拿到请求中的服务名称等信息后,第一步会先注册一个监听器,即标识该A服务对B服务数据感兴趣,如果后续B服务数据变更,会利用UDP连接主动推送变更后的信息;第二步是从注册表中查找对应的服务实例信息,并返回给请求方。在客户端微服务侧服务缓存数据会存储在下面这样一个数据结构中:
// Map(group@@serviceName@@Clusters, ServiceInfo)
private final Map<String, ServiceInfo> serviceInfoMap = new ConcurrentHashMap<>();
4. 心跳检测:
为了确保注册表中的服务实例信息的准确性,注册中心会定期向各个服务实例发送心跳检测请求。服务实例在接收到心跳检测请求后,会返回一个响应,表明它仍然在线。如果注册中心在一段时间内没有收到某个服务实例的响应,就会将其从注册表中移除。Nacos Client 会启动一个定时任务每 5 秒
发送一次心跳,最终是调用 Nacos PUT /nacos/v1/ns/instance/beat
接口请求完成心跳发送。Nacos Server 会开启一个定时任务来检测注册服务的健康情况,对于超过 15 秒
没收到客户端心跳的实例,会设置为不健康状态
,即 healthy=false,超过 30 秒
没收到心跳,则会剔除该实例
。Nacos Client 可以通过再次发送心跳恢复。
5. 服务下线:
当微服务实例停止运行时,它会向注册中心发送一个下线请求。注册中心在接收到下线请求后,会将该服务实例从注册表中移除。最终是调用 Nacos Server DELETE /nacos/v1/ns/instance 接口请求来完成下线。
6. 服务变更通知:
如果注册表中的服务实例信息发生变化(如新增、下线、IP地址变更等),首先Nacos Server 会更新本地的注册表数据,然后会利用进行服务变更通知的核心类PushService
去发布ServiceChangeEvent事件
,然后Nacos Server会利用与各微服务建立的UDP连接进行主动的变更信息推送。这样一来既保证了微服务做远程调用时从Nacos远程拉取的是最新的服务实例信息,也保证了服务实例变更时的,微服务能快速的感知到变化。
四、服务注册源码
4.1 客户端
4.1.1 分析注册源码入口
我们应该怎么找源码的入口嘞?
还是把握好SpringBoot引入第三方组件的核心原理:自动装配或SPI机制。
我们知道SpringCloud是基于SpringBoot的,我们这里引入了这个nacos客户端nacos-discovery的依赖,我们就直接去找这个jar包下的spring.factories文件:
进入到这个NacosServiceRegistryAutoConfiguration自动配置类后会发现这里会往容器中注入三个bean,看bean的名字好像都是和nacos注册相关的,按照经验来说,带有AutoXXXX这类方法一般都比较重要,再加上第三个方法还用到了上面两个bean,所以核心方法很大概率就是这个第三个方法。
NacosServiceRegistryAutoConfiguration注入了3个Bean:
- NacosServiceRegistry:负责具体的服务注册。
- NacosRegistration:收集当前实例的信息,如端口、IP、服务名等。
- NacosAutoServiceRegistration:管理服务自动注册的时机。
进入到该类的构造方法之后,我们一般可以查看这个类的继承与实现结构,这样可以让我们更充分的了解该类
从上图中可以发现,NacosAutoServiceRegistration这个类还实现了ApplicationListener
接口,这个接口我们都知道是Spring发布事件相关的接口,我们再找这个方法的具体实现,会发现NacosAutoServiceRegistration
这个类它没有重写该方法,那么就要在它的父类中找该方法的实现:
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
// 方法调用,进入到bind(...)方法
/** @deprecated */
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
}
public void start() {
if (!this.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
// 核心方法,可以发现在调用这个方法之前和调用方法之后都发布了一个事件
// 正好我们要找的也就是注册相关的方法
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
接下来在点进register()
方法,跳转几次之后就会进入到NamingService
这个关键接口的registerInstance(...)
方法中去,而在调用之前是先通过反射和双重检验锁单例模式初始化了NacosNamingService
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
} else {
// 反射+双重检验锁单例构造出NacosNamingService
NamingService namingService = this.namingService();
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
Instance instance = this.getNacosInstanceFromRegistration(registration);
try {
// 开启服务注册
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var7) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
ReflectionUtils.rethrowRuntimeException(var7);
}
}
}
反射初始化NacosNamingService,NacosNamingService是NamingService的实现类
4.1.1 发起服务注册请求
关键接口NamingService#registerInstance方法就是服务注册。程序刚开始会进入到该接口的实现类NacosNamingService的registerInstance(String serviceName, String groupName, Instance instance)
方法中
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否是临时实例
if (instance.isEphemeral()) {
// 开启心跳机制
// 这一步是构建心跳请求
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
// 开启心跳任务,会在5秒后执行,也就是会让注册操作先执行
this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 调用该方法进行微服务注册,instance对象中保存着微服务的各种信息,比如ip、端口、访问权重、健康状态、是否上线等等
this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
我们这里先跳过心跳检测内容,下文会有专门的章节去详细介绍。先看服务注册部分,注册的instance对象保存的内容如下图所示
把instance对象中的数据取出来,封装成一个HashMap,然后发送一个post的http请求发送给NacosServer,进行服务注册:
调用API请求注册 NamingProxy#reqApi
- 单机注册中心,失败重试(
默认3次,前提是nacos异常
) - 集群注册中心,
随机挑选一个注册,失败则轮询其他注册中心
- 最终调用callServer方法 (API_URL:
IP:PORT/nacos/v1/ns/instance
)
4.2 服务端
Nacos配置中心其实就是一个SpringBoot应用,它的启动就是SpringBoot启动,因此重要组件加载时机就是容器的refresh时期,而我们要分析服务端源码就可以直接从controller层着手分析。
4.2.1 注册表
我们先了解一下服务端保存实例信息的结构,文中都简称注册表:
上图对应源码中的ServiceManager#serviceMap
字段
点进去Servce类中,可以看到集群定义:
点进集群Cluster类中,是用Set存的实例,分为两种实例,持久实例和临时实例:
4.2.2 注册请求处理入口
根据请求URL/nacos/v1/ns/instance
可以很容易找到能处理它的handler
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 1.获取namespace 为空 则默认 public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 2.获取服务名 为空抛出异常("Param '" + key + "' is required.")
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 3.根据"@@"分割服务名,不符合规则则抛出异常
NamingUtils.checkServiceNameFormat(serviceName);
// 4.将request转化为实例
final Instance instance = parseInstance(request);
// 5.服务注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
4.2.3 注册流程(以临时实例为例)
ServiceManager#registerInstance
- 创建一个空的service放入注册表,为其 开启一个心跳检测,并将这个service加入监听列表
- 拿到创建好的service
- 完成实例的注册表更新,并完成nacos集群同步
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的service(如果时第一次来注册实例,要先创建一个空service出来,放入注册表)
// 此时不包含实例信息
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 拿到建好的service
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加要注册的实例到service中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
4.2.4 创建空的service加入注册表
ServiceManager#createEmptyService
会先从服务注册表获取服务,如果能获取到直接返回,否则创建一个空的service,加入到注册表中,并开启心跳检测
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 1.尝试从注册表中获取服务 第一次一般都是空
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
// 2.实例化新服务
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
// 根据@@来划分,取第一个数为groupName
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 3.将服务放入新的注册表 以及 加入监听列表,并开启心跳检测
putServiceAndInit(service);
if (!local) {
// 不是临时实例的话,将服务放入 raft 集群中
addOrReplaceService(service);
}
}
}
ServiceManager#putServiceAndInit
将一个空的服务加入到注册表,并初始化服务,此时服务中并没有实例。
private void putServiceAndInit(Service service) throws NacosException {
// 1.利用双重校验锁把空的服务放入注册表
putService(service);
// 2.从注册表中获取服务
service = getService(service.getNamespaceId(), service.getName());
// 3.服务开启心跳检测
service.init();
// 4.把服务加入监听列表
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
// 第一次检查是在同步块外部,如果 serviceMap 已经包含了指定的命名空间 ID,则不需要进入同步块,从而减少了获取锁的开销。
if (!serviceMap.containsKey(service.getNamespaceId())) {
// synchronized (putServiceLock) 确保在同一时刻只有一个线程可以执行该同步块内的代码。
// 这里的 putServiceLock 是一个对象,通常是一个私有的、静态的最终字段
// (private static final Object putServiceLock = new Object();),用来作为同步锁的对象。
synchronized (putServiceLock) {
// 双重检查锁定(Double-Checked Locking)模式
// 这是为了防止多个线程同时通过了第一次检查后导致的竞争条件,即只有第一个进入同步块的线程会真正执行 put 操作。
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
// 这行代码的作用是在 serviceMap 中根据命名空间 ID 获取对应的 Map,
// 然后尝试将服务名(service.getName())和服务实例(service)作为键值对插入到该 Map 中,但仅当该键(服务名)不存在时才进行插入。
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
public void init() {
// NacosClient客户端的心跳检测任务
// 这里就会开启一个线程去执行任务,clientBeatCheckTask属性就是一个task,run()方法中会进行实例的健康检查,后面会详细介绍
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
4.2.5 添加实例
将要注册的实例添加到建好的service中:
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据ephemeral的值来决定生成上面key,默认情况下NacosClient传递过来的都是true,一般微服务的实例都是临时实例,不是持久化实例
// 如果是持久化实例就没有下面的ephemeral这个字符串拼接
// key = 一些字符串常量 + “ephemeral” + namespaceId + “##” + serviceName
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
// 同步锁,避免并发修改的安全问题
synchronized (service) {
// 1) 添加微服务的实例,并返回当前服务所有的实例
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 2) 封装实例列表到Instances对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 3) 完成 注册表更新 以及 Nacos集群的数据同步
consistencyService.put(key, instances);
}
}
里面最重要的就是consistencyService.put(key, instances)
方法
consistencyService有很多种实现,根据实例的类型来判断具体走哪种实现方式,默认是AP协议,使用的是临时实例,临时实例实现主要看DistroConsistencyServiceImpl
临时实例的注册方法
DistroConsistencyServiceImpl#put
@Override
public void put(String key, Record value) throws NacosException {
// 1.临时实例 本地注册表更新
onPut(key, value);
// 2. 将实例变更同步给其他节点
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
4.2.6 临时实例本地注册表的更新
DistroConsistencyServiceImpl#onPut
我们先来看临时实例本地更新走了哪些流程:
public void onPut(String key, Record value) {
// 放入缓存
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 实例变更数据存dataStore
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 添加实例变更的任务
notifier.addTask(key, DataOperation.CHANGE);
}
Notifier#addTask
实例变更的任务最后添加到一个阻塞队列中,那么这个任务什么时候从阻塞队列中取出并执行呢?
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
// 加入services,防止重复添加任务
services.put(datumKey, StringUtils.EMPTY);
}
// 哪里取这些任务?
tasks.offer(Pair.with(datumKey, action));
}
到这里整个实例的注册过程已完成,但是并没有看到instance放入到service中的过程,这个过程在下面的异步处理中。
Notifier#run
Notifier是DistroConsistencyServiceImpl的一个成员变量,在DistroConsistencyServiceImpl#init
方法使用线程池来执行Notifier,所以Notifier肯定实现了Runable接口。
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
那么我们来看看Notifier#run
怎么执行的?从阻塞队列中一个一个的取出并处理。
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
// 阻塞队伍中获取
Pair<String, DataOperation> pair = tasks.take();
// 处理实例变更
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
// datumKey的构成: com.alibaba.nacos.naming.domains.meta. + namespaceId + ## + serviceName
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
......
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
// 处理变更
// 根据datumKey从dataStore获取实例变更的数据
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
// 处理删除
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
......
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
Service#onChange
最后会回到Service.onChange
方法更新实例,内部调用updateIPs方法,这里面需要注意更新后会触发一个服务变更事件(后面有用)
public void onChange(String key, Instances value) throws Exception {
.......
// 真正的更新注册表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
Service#updateIPs
将缓存中获取的实例列表按clusterName进行分组,最后以cluster为维度进行更新注册表。
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 准备一个Map,key是clusterName,值是集群下的Instance集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 获取服务的所有cluster名称
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
.......
// 判断实例是否包含clusterName,没有的话用默认cluster
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断cluster是否存在,不存在则创建新的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取当前cluster实例的集合,不存在则创建新的
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 添加新的实例到 Instance 集合
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 将实例集合更新到 clusterMap(注册表)
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 触发服务变更事件
getPushService().serviceChanged(this);
.......
}
Cluster#updateIps
最终完成实例的注册更新发生在Cluster#updateIps
方法里,该方法采取了写时复制
的思想,读写分离的方式来保证这个并发的问题。
/**
* Update instance list.
* 下面的方法中有CopyOnWrite的实现思想,真正存储实例的集合是ephemeralInstances,
* 但是这里面基本上都是在围绕oldIpMap这个复制出来的副本集合进行相应的操作,最后拿最新的集合复制给ephemeralInstances
*
* @param ips instance list
* @param ephemeral whether these instances are ephemeral
*/
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 先保存一份现有的实例列表
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
// 各个服务实例的ip和端口不一样,所以这里生成的key也不一样
oldIpMap.put(ip.getDatumKey(), ip);
}
// 接下来的很多操作就是拿老的实例集合和新的实例集合做一些数据比对,如果是新产生的实例那么就进行添加操作,如果是已经存在了的实例则是修改操作
// 最后将最终的结果赋值给ephemeralInstances或persistentInstances
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
.......
}
}
// 将现在的和原来的注册表实例进行一个对比,其就是一个更新和添加操作,如修改了配置文件等
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
.......
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
// 将某些服务进行一个剔除的操作,如某些实例超时,不健康等
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
.......
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
// 最后将这个副本作为新的注册表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
4.2.7 临时实例的集群节点数据同步
Nacos针对临时实例数据在集群之间的同步开发了Distro一致性协议,Distro一致性协议是弱一致性协议,用来保证Nacos注册中心的可用性,当临时实例注册到Nacos注册中心时,集群的实例数据并不是一致的,当通过Distro协议同步之后才最终达到一致性,所以Distro协议保证了Nacos注册中心的AP(可用性)。
当有新的客户端注册到Nacos集群中的一个节点时,本节点已经完成了数据更新,但这个节点就需要将新的实例数据同步给其他节点。
distroProtocol#sync临时实例集群同步更新:
- 遍历集群中其他节点
- 定义一个
DistroDelayTask
异步任务放入一个ConcurrentHashMap中,会有一个ScheduledExecutorService线程池定时从这个map中取任务执行
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历nacos集群中除自己以外的其他节点
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 定义一个Distor的同步任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 交给线程池去执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
线程池的定义在NacosDelayTaskExecuteEngine中:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
上诉线程池执行的任务就是NacosDelayTaskExecuteEngine#processTasks:
/**
* process tasks in execute engine.
* 执行引擎中的流程任务。
*/
protected void processTasks() {
// 获取任务map中所有的key
Collection<Object> keys = getAllTaskKeys();
//遍历key 并执行任务
for (Object taskKey : keys) {
// 取一个任务便从map中移除一个任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
// 尝试执行同步任务,如果失败会重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
// 如果失败会重试
retryFailedTask(taskKey, task);
}
}
}
DistroDelayTaskProcessor#process
会发现DistroDelayTask异步任务会在DistroDelayTaskProcessor
中process方法里被执行:
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
// 又被塞到一个不知名封装好的地方(是一个阻塞队列,同样有地方取出来执行,我们直接看这个任务的执行)
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
DistroSyncChangeTask#run
syncData
方法最终会到NamingProxy.syncData
方法执行HTTP请求同步数据,请求映射路径为/nacos/v1/ns/distro/datum
- 如果失败了,则又会调用NacosDelayTaskExecuteEngine#addTask方法重新将DistroDelayTask任务放进ConcurrentHashMap中,重复上述的processTasks方法
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
// 1.syncData方法最终会到NamingProxy.syncData方法,执行HTTP请求,同步数据
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
// 2.如果失败了,则又会调用NacosDelayTaskExecuteEngine.addTask()方法重新将DistroDelayTask任务放进ConcurrentHashMap中,重复上述的processTasks方法
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
// 失败处理
handleFailedTask();
}
}
private void handleFailedTask() {
String type = getDistroKey().getResourceType();
DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
if (null == failedTaskHandler) {
Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
return;
}
failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE);
}
DistroController#onSyncDatum
下面看看远程服务器的接口/nacos/v1/ns/distro/datum
收到请求时怎么处理:
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
// 同步数据的入口
if (dataMap.isEmpty()) {
Loggers.DISTRO.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
// 收到数据进行处理
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
}
public boolean onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
DistroConsistencyServiceImpl#processData
processData()方法将同步过来的数据进行反序列化,然后调用onPut()方法进行临时数据缓存并添加实例变更的任务,后续逻辑与实例注册后的处理逻辑一致。
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
// 异步处理数据
onPut(datum.key, datum.value);
return true;
}
4.3 总结
- 客户端:启动则获取自身配置信息,发起http请求注册,临时实例同时会开启心跳机制(下面会说),服务端是单机的情况下请求失败会重试三次,服务端是单机的集群的情况下请求失败会轮询请求
- 服务端:
- 本地通过一个Map保存所有服务信息,注册的实质就是往map里面添加信息
- 会先创建空的服务,后更新服务中的实例信息
- 服务创建后会初始化服务,启动心跳检测
- 往服务中添加实例的时候会判断实例是永久实例还是临时实例,不同类型的实例有不同的处理方式
- 注册后同时会发布服务变更事件(后面说,先记着这个事件)
设计点1: 客户端注册会先开启心跳后发起注册请求
1.防止注册后服务不可用(避免空窗期)
问题场景:如果先注册后开启心跳,服务注册成功后到第一次心跳之间会存在一个时间窗口(空窗期)。此时服务虽然注册成功,但可能因未及时心跳而被服务端误判为不健康/下线状态。
解决方案:先启动心跳线程,确保服务端在收到注册请求时立即具备健康检测能力,避免健康状态误判。
2.降低注册失败风险
网络抖动处理:在弱网环境下,注册请求可能失败。如果心跳已启动:
心跳线程会持续尝试注册(如Nacos客户端的BeatReactor)。
服务端可通过心跳包反向补全注册信息(类似TCP的Keepalive机制)。
设计点2: 高效数据结构
内存优化:
使用 ConcurrentHashMap<String, Map<String, Service>> 存储服务列表。
实例数据采用 CopyOnWriteArrayList 保证读写并发安全。
五、Nacos服务端AP架构集群节点数据的同步
5.1 新nacos节点同步实例数据
DistroProtocol#startDistroTask
如果nacos集群中有新的节点加入,新节点启动时就会从其他节点进行全量拉取数据。当DistroProtocol初始化时,调用startDistroTask
方法进行全量拉取数据:
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
// 开启数据校验任务
startVerifyTask();
// 开启加载数据任务
startLoadTask();
}
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
// 立即执行
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
DistroLoadDataTask#run
下面来看DistroLoadDataTask的run()
方法。run方法使用load方法加载从远程加载全量数据,如果检测到加载数据没有完成,则继续提交全量拉取数据的任务,否则进行任务的成功回调。如果加载数据发生了异常,则进行任务的失败回调。
public void run() {
try {
// 加载数据
load();
if (!checkCompleted()) {
// 加载不成功,延迟加载数据
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
......
}
} catch (Exception e) {
......
}
}
private void load() throws Exception {
// 在服务启动的时候,是没有其他远程服务的地址的,如果服务地址都是空的,则进行等待,直到服务地址不为空。
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
// 接着判断数据存储类型是否为空,如果为空,则进行等待,直到服务地址不为空。
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
// 遍历所有的数据存储类型,判断loadCompletedMap是否存在数据存储类型和该类型的数据是否已经加载完成,
// 如果没有则调用loadAllDataSnapshotFromRemote进行全量数据的加载:
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
// 没完成的继续加载数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
DistroLoadDataTask#loadAllDataSnapshotFromRemote
loadAllDataSnapshotFromRemote()负责从远程服务器拉取数据
- 通过http请求拉取远程服务的所有全量数据:拉取数据的接口为:
/distro/v1/ns/distro/datums
- 处理拉取回来的全量数据
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
......
// 遍历所有的远程服务地址,除了自己
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
// 从远程获取所有的数据
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 处理数据
boolean result = dataProcessor.processSnapshot(distroData);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
DistroConsistencyServiceImpl#processData
处理全量数据的方法为processData():
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
// 反序列化数据
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
最后会调用到Service.onChange()
方法,与实例的注册一样调用此方法更新注册表。
DistroController#getAllDatums
再来看看远程服务是如何处理全量拉取数据的请求的:
@GetMapping("/datums")
public ResponseEntity getAllDatums() {
// 服务端查询所有实例数据的入口
DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return ResponseEntity.ok(distroData.getContent());
}
public DistroData onSnapshot(String type) {
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
// 查询所有的实例数据
return distroDataStorage.getDatumSnapshot();
}
public DistroData getDatumSnapshot() {
// 从缓存中获取所有的实例数据
Map<String, Datum> result = dataStore.getDataMap();
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, dataContent);
}
全量数据拉取无非就是从内存dataStore中获取所有临时实例的数据,并且对数据进行序列化,然后返回给客户端。
5.2 数据校验任务
DistroProtocol#startDistroTask
Nacos AP集群为保证数据的最终一致性会开启一个数据校验的定时任务来检查各个节点之间的数据是否一致,不一致就会进行数据的同步更新。数据校验任务DistroVerifyTask与同步全量数据任务DistroLoadDataTask同样是在DistroProtocol实例化是创建的。
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
// 开启数据校验任务
startVerifyTask();
// 开启加载数据任务
startLoadTask();
}
private void startVerifyTask() {
// 5s执行一次数据校验任务
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
}
DistroVerifyTask#run
数据校验任务默认5s执行一次,会将缓存中所有的key调用接口/nacos/v1/ns/distro/checksum
发送给远程服务器。
public void run() {
try {
// 获取集群中的其他节点
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
// 校验数据
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
if (null == distroData) {
return;
}
distroData.setType(DataOperation.VERIFY);
// 遍历集群中的其他节点
for (Member member : targetServer) {
try {
// 发送数据校验的请求
distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
} catch (Exception e) {
Loggers.DISTRO.error(String
.format("[DISTRO-FAILED] verify data for type %s to %s failed.", type, member.getAddress()), e);
}
}
}
DistroController#syncChecksum
接下来看远程服务器端接受到数据校验任务的请求时是怎么处理的:
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
// 收到校验数据的请求
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
// 开始校验
distroProtocol.onVerify(distroHttpData);
return ResponseEntity.ok("ok");
}
public boolean onVerify(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
/**
* 处理校验数据
*/
return dataProcessor.processVerifyData(distroData);
}
public boolean processVerifyData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
String sourceServer = distroData.getDistroKey().getResourceKey();
Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
// 校验数据
onReceiveChecksums(verifyData, sourceServer);
return true;
}
DistroConsistencyServiceImpl#onReceiveChecksums
处理逻辑主要是拿到请求中的所有key与本地缓存中的key进行对比,如果有不相同的key就删除,如果有新增或要更新的key就根据key去发送请求的服务器端查询然后更新本地缓存和注册表。
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
......
try {
// 保存要更新的key
List<String> toUpdateKeys = new ArrayList<>();
// 保存要删除的key
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
......
if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore
.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
for (String key : dataStore.keys()) {
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
Loggers.DISTRO
.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
for (String key : toRemoveKeys) {
// 删除实例
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
// 没有要更新的key就返回了
// 说明两个服务之间的实例数据一致
return;
}
try {
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
// 从其他节点获取要更新的key对应的数据
// 调用/nacos/v1/ns/distro/datum
DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
if (null != remoteData) {
// 将数据放入缓存
processData(remoteData.getContent());
}
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
} finally {
// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}
}
5.3 同步实例数据给集群其他节点
当有新的客户端注册到Nacos集群中的一个节点时,这个节点就需要将新的实例数据同步给其他节点。这一部分内容就是前文4.2.7的章节的内容,因此这里不做赘述。
六、心跳机制源码
临时实例和持久实例的存储位置和健康检测机制是不同的:
临时实例:默认情况下,仅会注册在Nacos内存,不会持久化到Nacos磁盘,其健康检测机制为Client模式,即Client主动向Server上报健康状态。默认心跳间隔为
5秒
,在15秒
内Server未收到Client心跳,就会将其标记为不健康
状态;30
秒内收到了Client心跳,则重新恢复到 健康 状态,否则将Client从Server端内存清除
。持久实例:服务实例不仅会注册到Nacos内存,也会持久化到Nacos磁盘。健康检测机制为Server模式,Server主动检测Client的健康状态,默认
20s
检测一次,检测失败后标记为 不健康 状态,但不会被清除,因为这是持久化到磁盘的。
大多数情况下都是临时实例,因为互联网项目存在突发流量暴增的情况,Alibaba是云原生的,可以充分利用云端的弹性,流量下降就清除临时实例。持久实例销毁是很麻烦的,因此,本篇文章主要以临时实例的心跳机制做介绍。
6.1 客户端
6.1.1 发起心跳请求
1.心跳开启入口
NacosNamingService#registerInstance
buildBeatInfo就是心跳信息的封装,我们主要看addBeatInfo方法
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否是临时实例
if (instance.isEphemeral()) {
// 开启心跳机制
// 这一步是构建心跳请求
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
// 开启心跳任务,会在5秒后执行,也就是会让注册操作先执行
this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 服务注册
this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
2.心跳开启
BeatReactor#addBeatInfo
向定时调度线程池里面提交BeatTask执行,所以主要执行逻辑在BeatTask中
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// serviceName#ip#port,例如:order-service#111.25.2.1#10000
String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
this.dom2Beat.put(key, beatInfo);
// 利用定时调度线程池去异步定时执行BeatTask,即每隔一段时间执行一次心跳检查或更新操作
this.executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
3.心跳具体执行逻辑
我们直接来看BeatTask#run方法
- 发送HTTP心跳请求 URL地址为:
/nacos/v1/ns/instance/beat
- 如果
当前实例在注册中心未找到就重新注册
- 继续将任务丢到线程池里面执行,定时发起心跳(
因为这里是schedule方法,是单次执行的,所以不断提交任务
)
public void run() {
if (!this.beatInfo.isStopped()) {
long nextTime = this.beatInfo.getPeriod();
try {
// 发送心跳请求并处理响应
JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has("lightBeatEnabled")) {
// 更新轻量级心跳启用状态
lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0L) {
nextTime = interval;
}
int code = 10200;
if (result.has("code")) {
code = result.get("code").asInt();
}
// 如果实例不存在就重新注册
if (code == 20404) {
Instance instance = new Instance();
instance.setPort(this.beatInfo.getPort());
instance.setIp(this.beatInfo.getIp());
instance.setWeight(this.beatInfo.getWeight());
instance.setMetadata(this.beatInfo.getMetadata());
instance.setClusterName(this.beatInfo.getCluster());
instance.setServiceName(this.beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
} catch (Exception var10) {
}
}
} catch (NacosException var11) {
LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
}
// 执行下次心跳任务
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
NamingProxy#sendBeat方法里会具体执行过程就是会向Nacos服务端发起PUT请求
,请求路径尾缀为"/nacos/v1/ns/instance/beat"
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
......
String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT");
return JacksonUtils.toObj(result);
}
继续往里看的话它就是会发起请求,然后里面会有失败重试的机制:
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
throw new NacosException(400, "no server available");
} else {
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(this.nacosDomain)) {
int i = 0;
// 如果有异常会发起重试
while(i < this.maxRetry) {
try {
// 发起http请求
return this.callServer(api, params, body, this.nacosDomain, method);
} catch (NacosException var12) {
exception = var12;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
}
++i;
}
}
} else {
......
}
......
}
}
两种请求模式:
- 域名模式
- 使用配置的
nacosDomain
作为服务器地址 - 最多重试
maxRetry
次,默认3次 - 每次失败后记录日志并重试
- 使用配置的
- 服务器列表模式
- 随机选择一个服务器开始尝试(使用当前事件作为随机种子)
- 采用轮询方式尝试所有服务器
- 刚当前服务器失败,尝试下一个(使用取模运算实现循环)
6.2 服务端
6.2.1 处理心跳请求,实例续约
每个客户端都会向这个web服务端发送心跳,因此服务端这边需要去记录这个心跳发送的时间等,从而来判断实例是否健康,是否需要删除等操作。前文介绍到心跳请求URL为/nacos/v1/ns/instance/beat
,根据请求可以很容易找到能处理它的handler。
// com.alibaba.nacos.naming.controllers.beat
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
......
// 首先会去解析这个beat心跳,并且可以解析是哪个集群,哪个组过来的服务
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
// 如果实例不存在,就是第一次服务端给客户端发送心跳
if(instance == null){
// 那么就会将这个实例注册到这个注册中心里面
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 再去注册表中获取这个实例
Service service = serviceManager.getService(namespaceId, serviceName);
// 心跳健康检查
service.processClientBeat(clientBeat);
......
}
心跳健康检查processClientBeat方法,主要是查看这个实例是否处于这个健康状态
// com.alibaba.nacos.naming.core.service
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
//健康检查的定时任务
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
接下来看看这个ClientBeatProcessor这个线程类,主要是看里面的run方法。主要是会对这些实例进行一个续约的操作,如果一个实例不健康,则会设置成健康状态
// com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor
@Override
public void run(){
......
// 获取全部实例
List<Instance> instances = cluster.allIPs(true);
// 遍历这些实例
for (Instance instance : instances) {
// 设置这个实例上一次心跳的发送时间,进行一个续约的操作
instance.setLastBeat(System.currentTimeMillis());
// 如果实例不健康,则设置为健康状态
if (!instance.isMarked() && !instance.isHealthy()) {
instance.setHealthy(true);
// 发布服务改变事件
getPushService().serviceChanged(service);
}
}
}
6.2.2 心跳主动检测
实例如果因为宕机或者其他什么情况无法发送心跳,那么服务端自然也要对这个实例进行处理,就在服务初始化的时候,会开启会实例的心跳检测任务。
// com.alibaba.nacos.naming.core.ServiceManager
private void putServiceAndInit(Service service) throws NacosException {
// 1.把空的服务放入注册表
putService(service);
// 2.从注册表中获取服务
service = getService(service.getNamespaceId(), service.getName());
// 3.服务开启心跳检测
service.init();
// 4.把服务加入监听列表
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
在这个服务注册的时候,会有一个init的初始化方法。在服务进行初始化的时候,会去开启这个客户端的心跳健康检查的这个线程。
// com.alibaba.nacos.naming.core.Service
public void init() {
// 1.对临时实例开启 心跳超时检测
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
/**
* Schedule client beat check task with a delay.
* 将客户端心跳检查任务安排为延迟。
*
* 假设你有一个服务需要定期检查客户端的心跳状态,可以使用 ClientBeatCheckTask 来封装具体的检查逻辑,并通过上述方法将其调度为周期性任务。
* 这样可以确保每个检查任务只被调度一次,并且能够按照设定的时间间隔自动执行。
* @param task client beat check task
*/
public static void scheduleCheck(ClientBeatCheckTask task) {
// 该方法的作用是根据任务的唯一键(taskKey)来决定是否调度一个新的周期性任务。
// 如果任务尚未调度,则通过 GlobalExecutor.scheduleNamingHealth 方法进行调度,并将生成的 ScheduledFuture 对象存入 futureMap 中。
// 这种设计避免了重复调度相同任务的问题,同时利用 ConcurrentHashMap 确保了线程安全。
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
接下来进入这个ClientBeatCheckTask的这个线程类里面,主要看他的run方法:
- 找到心跳超时的实例,改变其健康状态,并发布
serviceChange
事件(后面说),还有InstanceHeartbeatTimeoutEvent
事件 - 找到满足删除条件的实例,从注册表中删除该实例信息(HTTP请求调用自己的API:/nacos/v1/ns/instance,异步删除)
- 默认
15s超时
,30s剔除
@Override
public void run() {
try {
......
// 1.找到所有的临时实例
List<Instance> instances = service.allIPs(true);
// 2.遍历所有的临时实例,找到所有心跳超时的实例(当前时间-上一次心跳时间 > 超时时间 默认15s),健康状态置为false
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
// 此时实例状态改变了,则发布serviceChange事件
getPushService().serviceChanged(service);
// 发布实例心跳超时事件
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
// 3.遍历所有的临时实例,找到所有满足删除条件的的实例(当前时间-上一次心跳时间 > 删除条件时间 默认30s),在注册表中删除实例
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
根据请求映射路径/nacos/v1/ns/instance以及put请求方式可以找到能处理请求的handler:
// com.alibaba.nacos.naming.controllers.deregister
@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
......
// 查询出Service
Service service = serviceManager.getService(namespaceId, serviceName);
......
// 删除实例
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
这里只是将注册表的实例列表取出然后删除,并放入缓存中,并没有删除注册表中的实例。
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
Service service = getService(namespaceId, serviceName);
synchronized (service) {
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
}
}
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 将缓存中的实例列表删除实例
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 更新缓存
consistencyService.put(key, instances);
}
private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips)
throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
6.3 总结
- 临时实例:
- 采用
客户端心跳检测
模式,心跳检测周期5秒
- 心跳间隔超过
15秒(默认)则标记为不健康
- 心跳间隔超过
30秒(默认)则从服务列表删除
- 采用
七、服务发现源码
服务之间互相调用,它是怎么知道要调用其他服务的哪个实例呢
两种方式:1.客户端主动获取(定时更新)
、2.服务端主动推送(长连接推送变更信息)
7.1 客户端主动获取
7.1.1 客户端
@EnableDiscoveryClient注解也是在spring-cloud-commons项目,我们本节注意下DiscoveryClient
接口以及其中声明的接口方法。SpringCloud是由几个关键项目组成的,spring-cloud-commons项目是其中这一,SpringCloud Alibaba也不是完全替代SpringCloud的,一些基本规范还是继承下来做扩展等。
Nacos是通过自己的spring-cloud-alibaba-nacos-discovery项目去集成到SpringCloud的以及基于SpringBoot的自动装配机制集成到SpringBoot项目。而服务发现是由NacosDiscoveryClient
实现了spring-cloud-commons项目的DiscoveryClient接口,即Nacos中服务发现入口是NacosDiscoveryClient类。
NacosServiceDiscovery#getInstances
NacosServiceDiscovery主要负责调用,并对结果进行转换
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = this.discoveryProperties.getGroup();
List<Instance> instances = this.namingService().selectInstances(serviceId, group, true);
return hostToServiceInstanceList(instances, serviceId);
}
NacosNamingService#selectInstances
NacosNamingService会对实例列表进行简单的选择,实际开发过程中实例的选择是由Ribbon或者LoadBalancer实现。
Nacos服务端会返回所有的实例列表,包含不健康状态的实例,具体要不要选择不健康的实例由客户端决定。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 默认订阅模式
if (subscribe) {
// 委托hostReactor处理
serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
// 选取健康实例
return this.selectInstances(serviceInfo, healthy);
}
HostReactor#getServiceInfo
主要逻辑:
1.从本地缓存获取
2.如果本地缓存没有,则发生HTTP请求从Nacos服务端获取
3.开启一个定时任务,每隔1秒从Nacos服务端获取最新的服务实例信息,更新 到本地缓存serviceInfoMap中
4.从本地缓存serviceInfoMap中获取服务实例信息
public ServiceInfo getServiceInfo(String serviceName, String clusters) {
LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
// 这里是故障转移机制,该机制会在本地生成服务信息的文件
if (this.failoverReactor.isFailoverSwitch()) {
return this.failoverReactor.getService(key);
} else {
// 从本地缓存serviceInfoMap中查
ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
// 判断缓存是否存在
if (null == serviceObj) {
// 不存在则创建一个ServiceInfo
serviceObj = new ServiceInfo(serviceName, clusters);
// 空ServiceInfo放入缓存
this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 放入待更新的服务列表中
this.updatingMap.put(serviceName, new Object());
// 去nacos-server查询
this.updateServiceNow(serviceName, clusters);
// 从待更新列表中移除
this.updatingMap.remove(serviceName);
} else if (this.updatingMap.containsKey(serviceName)) {
synchronized(serviceObj) {
try {
serviceObj.wait(5000L);
} catch (InterruptedException var8) {
LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8);
}
}
}
// 开启定时更新服务列表的功能,定时去nacos-server查询
this.scheduleUpdateIfAbsent(serviceName, clusters);
// 返回缓存中的服务信息
return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
}
}
1.即时获取
HostReactor#updateServiceNow
private void updateServiceNow(String serviceName, String clusters) {
try {
// 立即查询服务列表
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 发起http调用,这里有个参数给了一个udp端口,当有服务更新时,nacos服务端会通过这个udp端口来通知客户端
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
2.定时延迟任务
HostReactor#scheduleUpdateIfAbsent
这里先判断定时任务是否已经在异步任务列表中了,不在才会添加一个UpdateTask任务延迟执行
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
// 服务已经存在列表中就不再添加
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
// 加双重锁校验
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 加入UpdateTask任务,延迟执行
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
UpdateTask#run
主要逻辑:
1.尝试从本地缓存获取
2.本地还是没有则发送Http请求从服务端获取,并缓存到本地
3.过期服务 ,从注册中心查询;否则如果serviceName已经通过push更新,不应该覆盖它,因为push数据可能与pull through force push不同
4.刷新更新时间、重置失败数量为0等
5.下次调度刷新时间,下次执行的时间与failcount有关,failcount=0,则下次调度时间为6秒,最长为1分钟,即当无异常情况下缓存实例的刷新时间是6秒
public void run() {
// 延迟时间,默认1s
long delayTime = 1000L;
try {
// 尝试从本地获取
ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
if (serviceObj == null) {
// 本地没有,则发送http从服务端获取,并缓存到本地
HostReactor.this.updateService(this.serviceName, this.clusters);
return;
}
// 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
if (serviceObj.getLastRefTime() <= this.lastRefTime) {
HostReactor.this.updateService(this.serviceName, this.clusters);
serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
} else {
HostReactor.this.refreshOnly(this.serviceName, this.clusters);
}
// 刷新更新时间
this.lastRefTime = serviceObj.getLastRefTime();
if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
this.incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
// 重置失败数量为0
this.resetFailCount();
} catch (Throwable var7) {
this.incFailCount();
LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7);
} finally {
// 下次调度刷新时间,下次执行的时间与failcount有关
// failcount = 0,则下次调度时间为6秒,最长为1分钟,当无异常情况下缓存实例的刷新时间是6秒
HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
}
}
无论是updateService方法、refreshOnly方法,还是刚开始的直接去nacos拉取信息的方法(getServiceInfoDirectlyFromServer)都会调用NamingProxy#queryList方法,这个方法就是HTTP请求获取信息:
NamingProxy#queryList
直接发起http请求Nacos服务端,请求地址为/nacos/v1/ns/service/list
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
7.1.2 服务端
InstanceController#list
请求URL为/nacos/v1/ns/instance/list
,根据请求可以很容易找到能处理它的handler,逻辑就是做参数获取校验,然后获取服务列表
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
// 获取相关参数并校验
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
// 获取客户端 UDP端口
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 获取服务列表
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
InstanceController#doSrvIpxt
7.2 服务端主动推送
既然是主动推送那么就需要两个条件:1.建立长连接2.触发推送的事件
7.2.1 服务端
上面
InstanceController.doSrvIpxt
中的pushService.addClient
就是把客户端UDP、IP等信息封装成PushClient对象
存储在PushService
类中,方便以后服务变更后推送消息
PushService类实现ApplicationListener接口,监听ServiceChangeEvent(服务变更事件),上文中服务注册内容里有介绍到客户端调用Nacos服务注册时会触发ServiceChangeEvent事件的发布。
由于PushService监听了ServiceChangeEvent事件,因此ServiceChangeEvent事件的发布会触发回调执行onEvent方法,而onEvent方法里最终执行的内容就是让服务端通过与客户端建立的UDP链接主动向客户端推送服务变更信息。
7.2.2 客户端
服务端主动推送时,客户端接收逻辑是在PushReceiver类里面,这个类是个Runnable会在HostReactor中被实例化
PushReceiver#run
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// 开启一个UDP端口,接收服务器端的服务实例信息推送
this.udpSocket = new DatagramSocket();
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
this.executorService.execute(this);
} catch (Exception var3) {
LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);
}
}
public void run() {
while(!this.closed) {
try {
byte[] buffer = new byte[65536];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 死循环,一直接受UDP请求
this.udpSocket.receive(packet);
String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();
LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = (PushPacket)JacksonUtils.toObj(json, PushPacket.class);
String ack;
if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {
if ("dump".equals(pushPacket.type)) {
ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";
} else {
ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
}
} else {
// 更新本地缓存的信息,上述客户端主动拉取的时候也会调用这个方法更新
this.hostReactor.processServiceJson(pushPacket.data);
ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
}
this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
} catch (Exception var6) {
if (this.closed) {
return;
}
LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);
}
}
}
HostReactor#processServiceJson
中间一大段省略了哈,最重要的就是那几步:
- 更新本地缓存
- 发布实例变更事件
- 写入磁盘(故障转移机制)
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class);
ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceInfo.getKey());
if (this.pushEmptyProtection && !serviceInfo.validate()) {
return oldService;
} else {
boolean changed = false;
if (oldService != null) {
......
} else {
changed = true;
LogUtils.NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts()));
this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, this.cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size());
if (changed) {
LogUtils.NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts()));
}
return serviceInfo;
}
}
7.3 总结
服务的发现有两种方式
客户端主动获取:
- 会先读取缓存,缓存内读取不到则会去服务端获取,同时开启一个定时任务定时更新
- 定时任务1s一次,异常时会延长时间最长60s
- 拉取URL:
/nacos/v1/ns/instance/list
服务端主动推送:
- 服务端和客户端在启动后会建立一个长连接
- 服务端服务变更后会发布服务变更事件ServiceChangeEvent,会通过长连接将变更后的信息发送给客户端
- 客户端更新的方式是hostReactor#processServiceJson方法,会写入缓存、发布实例变更事件、写入磁盘
八、总结
通过详细分析Nacos的基本使用及源码,希望大家对Nacos有更深刻的理解,其中文中不仅在代码注释中解析了Nacos作为服务注册与发现中心的源码,还专门制作了流程图,这样更加容易梳理出其调用过程与设计思想。本篇文章主要解析的是Nacos 1.x的源码,而现在已经有了Nacos 2.x,而Nacos 2.x版本核心差异是推出了GRPC作调用,有兴趣的同学可以研究一下。