Eureka服务注册源码

发布于:2024-12-22 ⋅ 阅读:(14) ⋅ 点赞:(0)

spring-cloud-starter-netflix-eureka-client 版本是3.0.3
核心装备类:
EurekaClientAutoConfiguration
EurekaDiscoveryClientConfiguration
核心类,以及引用的关系如下

EurekaRegistration 
 - EurekaInstanceConfigBean 实例配置bean
 - ApplicationInfoManager 应用信息管理器
 - CloudEurekaClient 客户端
 - EurekaHealthCheckHandler 健康检查处理器
 
EurekaAutoServiceRegistration
 - EurekaServiceRegistry 服务注册
 - EurekaRegistration 

EurekaAutoServiceRegistration 实现了SmartLifecycle,在容器启动会调用它的start方法
start方法里面调用了 EurekaServiceRegistry.register(EurekaRegistration reg)
stop方法里面调用了 EurekaServiceRegistry.deregister(EurekaRegistration reg)

public void start() {
   // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
   if (this.port.get() != 0) {
      if (this.registration.getNonSecurePort() == 0) {
         this.registration.setNonSecurePort(this.port.get());
      }

      if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
         this.registration.setSecurePort(this.port.get());
      }
   }

   // only initialize if nonSecurePort is greater than 0 and it isn't already running
   // because of containerPortInitializer below
   if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
      // 服务注册
      this.serviceRegistry.register(this.registration);

      this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
      this.running.set(true);
   }
}

public void stop() {
   // 下线 
   this.serviceRegistry.deregister(this.registration);
   this.running.set(false);
}

EurekaServiceRegistry.register(EurekaRegistration reg)注册方法里面有两块逻辑

// 修改状态,会发送StatusChangeEvent事件,通知相关listener
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 注册健康检查处理器
reg.getHealthCheckHandler()
        .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));

这里setInstanceStatus方法,假如状态变更时,会通知监听器,执行notify方法

public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {
        return;
    }

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
        for (StatusChangeListener listener : listeners.values()) {
            try {
                listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

继续追踪listener是如何放入ApplicationInfoManager的
1.ApplicationInfoManager.registerStatusChangeListener方法注册监听器
2.registerStatusChangeListener方法又是在initScheduledTasks方法里面执行的
3.initScheduledTasks方法在DiscoveryClient构造方法中执行的
4.CloudEurekaClient又是DiscoveryClient的子类,所以这块是在自动装配DiscoveryClient执行的
从DiscoveryClient构造器开始分析,由于代码太多,只保留核心逻辑

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    ......
    try {
        // 调度线程池
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        // 心跳线程池
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        // 缓存刷新线程池
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        // eureka跟服务器交互的客户端
        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    if (clientConfig.shouldFetchRegistry()) {
        try {
            // 拉取服务信息
            boolean primaryFetchRegistryResult = fetchRegistry(false);
            if (!primaryFetchRegistryResult) {
                logger.info("Initial registry fetch from primary servers failed");
            }
            boolean backupFetchRegistryResult = true;
            if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                backupFetchRegistryResult = false;
                logger.info("Initial registry fetch from backup servers failed");
            }
            if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
            }
        } catch (Throwable th) {
            logger.error("Fetch registry error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    ...

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();

    ...
}

着重看下initScheduledTasks方法

  1. 开启缓存刷新定时调度
  2. 开启心跳续约定时调度
  3. 创建instanceInfoReplicator实例信息复制任务
  4. 创建匿名内部实现类ApplicationInfoManager$StatusChangeListener,notify方法调用了instanceInfoReplicator.onDemandUpdate();
  5. 将statusChangeListener实例注册到applicationInfoManager
  6. 调用instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());开启instanceInfoReplicator定时调度
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                logger.info("Saw local status change event {}", statusChangeEvent);
                // 状态变更,如果满足条件则执行注册服务实例信息
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        
        // 开启定时调度推送最新服务实例信息
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

所以当状态变更时,applicationInfoManager会执行listener的notify方法,也就是执行了instanceInfoReplicator.onDemandUpdate()方法
通过调度器提交了一个任务,任务里面执行了当前InstanceInfoReplicator任务的run方法,最终完成服务注册。

public void run() {
    try {
    // 刷新实例信息 
    discoveryClient.refreshInstanceInfo();
    
    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
    if (dirtyTimestamp != null) {
    // 服务注册
    discoveryClient.register();
    instanceInfo.unsetIsDirty(dirtyTimestamp);
    }
    } catch (Throwable t) {
    logger.warn("There was a problem with the instance info replicator", t);
    } finally {
    // 固定间隔时间继续执行当前任务
    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
    scheduledPeriodicRef.set(next);
    }
}

所以eureka是基于ScheduledExecutorService来定时刷新服务缓存,心跳续约,以及服务重新注册(如果服务实例信息和eureka服务器不一致的时候)


网站公告

今日签到

点亮在社区的每一天
去签到