Ignite启动流程深度解析

发布于:2025-08-13 ⋅ 阅读:(38) ⋅ 点赞:(0)
    /**
     * @param cfg Ignite configuration to use.
     * @param errHnd Error handler to use for notification about startup problems.
     * @param workerRegistry Worker registry.
     * @param hnd Default uncaught exception handler used by thread pools.
     * @throws IgniteCheckedException Thrown in case of any errors.
     */
    public void start(
        final IgniteConfiguration cfg,
        GridAbsClosure errHnd,
        WorkersRegistry workerRegistry,
        Thread.UncaughtExceptionHandler hnd,
        TimeBag startTimer
    )
        throws IgniteCheckedException {
        gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getIgniteInstanceName()));

        GridKernalGateway gw = this.gw.get();

        gw.writeLock();

        try {
            switch (gw.getState()) {
                case STARTED: {
                    U.warn(log, "Grid has already been started (ignored).");

                    return;
                }

                case STARTING: {
                    U.warn(log, "Grid is already in process of being started (ignored).");

                    return;
                }

                case STOPPING: {
                    throw new IgniteCheckedException("Grid is in process of being stopped");
                }

                case STOPPED: {
                    break;
                }
            }

            gw.setState(STARTING);
        }
        finally {
            gw.writeUnlock();
        }

        assert cfg != null;

        // Make sure we got proper configuration.
        validateCommon(cfg);

        igniteInstanceName = cfg.getIgniteInstanceName();

        this.cfg = cfg;

        log = (GridLoggerProxy)cfg.getGridLogger().getLogger(
            getClass().getName() + (igniteInstanceName != null ? '%' + igniteInstanceName : ""));

        longJVMPauseDetector = new LongJVMPauseDetector(log);

        longJVMPauseDetector.start();

        RuntimeMXBean rtBean = ManagementFactory.getRuntimeMXBean();

        // Ack various information.
        ackAsciiLogo();
        ackConfigUrl();
        ackConfiguration(cfg);
        ackDaemon();
        ackOsInfo();
        ackLanguageRuntime();
        ackRemoteManagement();
        ackLogger();
        ackVmArguments(rtBean);
        ackClassPaths(rtBean);
        ackSystemProperties();
        ackEnvironmentVariables();
        ackMemoryConfiguration();
        ackCacheConfiguration();
        ackP2pConfiguration();
        ackRebalanceConfiguration();
        ackIPv4StackFlagIsSet();

        // Ack 3-rd party licenses location.
        if (log.isInfoEnabled() && cfg.getIgniteHome() != null)
            log.info("3-rd party licenses can be found at: " + cfg.getIgniteHome() + File.separatorChar + "libs" +
                File.separatorChar + "licenses");

        // Check that user attributes are not conflicting
        // with internally reserved names.
        for (String name : cfg.getUserAttributes().keySet())
            if (name.startsWith(ATTR_PREFIX))
                throw new IgniteCheckedException("User attribute has illegal name: '" + name + "'. Note that all names " +
                    "starting with '" + ATTR_PREFIX + "' are reserved for internal use.");

        // Ack local node user attributes.
        logNodeUserAttributes();

        // Ack configuration.
        ackSpis();

        List<PluginProvider> plugins = cfg.getPluginProviders() != null && cfg.getPluginProviders().length > 0 ?
           Arrays.asList(cfg.getPluginProviders()) : U.allPluginProviders();

        // Spin out SPIs & managers.
        try {
            ctx = new GridKernalContextImpl(log,
                this,
                cfg,
                gw,
                plugins,
                MarshallerUtils.classNameFilter(this.getClass().getClassLoader()),
                workerRegistry,
                hnd,
                longJVMPauseDetector
            );

            startProcessor(new DiagnosticProcessor(ctx));

            mBeansMgr = new IgniteMBeansManager(this);

            cfg.getMarshaller().setContext(ctx.marshallerContext());

            startProcessor(new GridInternalSubscriptionProcessor(ctx));

            ClusterProcessor clusterProc = new ClusterProcessor(ctx);

            startProcessor(clusterProc);

            U.onGridStart();

            // Start and configure resource processor first as it contains resources used
            // by all other managers and processors.
            GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);

            rsrcProc.setSpringContext(rsrcCtx);

            scheduler = new IgniteSchedulerImpl(ctx);

            startProcessor(rsrcProc);

            // Inject resources into lifecycle beans.
            if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
                for (LifecycleBean bean : cfg.getLifecycleBeans()) {
                    if (bean != null)
                        rsrcProc.inject(bean);
                }
            }

            // Lifecycle notification.
            notifyLifecycleBeans(BEFORE_NODE_START);

            // Starts lifecycle aware components.
            U.startLifecycleAware(lifecycleAwares(cfg));

            startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins));

            startProcessor(new FailureProcessor(ctx));

            // Start security processors.
            startProcessor(securityProcessor());

            startProcessor(new PoolProcessor(ctx));

            // Run background network diagnostics.
            GridDiagnostic.runBackgroundCheck(igniteInstanceName, ctx.pools().getExecutorService(), log);

            // Closure processor should be started before all others
            // (except for resource processor), as many components can depend on it.
            startProcessor(new GridClosureProcessor(ctx));

            // Start some other processors (order & place is important).
            startProcessor(new GridPortProcessor(ctx));
            startProcessor(new GridJobMetricsProcessor(ctx));

            // Timeout processor needs to be started before managers,
            // as managers may depend on it.
            startProcessor(new GridTimeoutProcessor(ctx));

            // Start SPI managers.
            // NOTE: that order matters as there are dependencies between managers.
            try {
                startManager(new GridTracingManager(ctx, false));
            }
            catch (IgniteCheckedException e) {
                startManager(new GridTracingManager(ctx, true));
            }
            startManager(new GridMetricManager(ctx));
            startManager(new GridSystemViewManager(ctx));
            startManager(new GridIoManager(ctx));
            startManager(new GridCheckpointManager(ctx));

            startManager(new GridEventStorageManager(ctx));
            startManager(new GridDeploymentManager(ctx));
            startManager(new GridLoadBalancerManager(ctx));
            startManager(new GridFailoverManager(ctx));
            startManager(new GridCollisionManager(ctx));
            startManager(new GridIndexingManager(ctx));

            ackSecurity();

            // Assign discovery manager to context before other processors start so they
            // are able to register custom event listener.
            GridDiscoveryManager discoMgr = new GridDiscoveryManager(ctx);

            ctx.add(discoMgr, false);

            // Start the encryption manager after assigning the discovery manager to context, so it will be
            // able to register custom event listener.
            startManager(new GridEncryptionManager(ctx));

            startProcessor(new PdsConsistentIdProcessor(ctx));

            MaintenanceProcessor mntcProcessor = new MaintenanceProcessor(ctx);

            startProcessor(mntcProcessor);

            if (mntcProcessor.isMaintenanceMode()) {
                if (log.isInfoEnabled()) {
                    log.info(
                        "Node is being started in maintenance mode. " +
                        "Starting IsolatedDiscoverySpi instead of configured discovery SPI."
                    );
                }

                cfg.setClusterStateOnStart(ClusterState.INACTIVE);

                if (log.isInfoEnabled())
                    log.info("Overriding 'clusterStateOnStart' configuration to 'INACTIVE'.");

                ctx.config().setDiscoverySpi(new IsolatedDiscoverySpi());

                discoMgr = new GridDiscoveryManager(ctx);

                // Reinitialized discovery manager won't have a valid consistentId on creation.
                discoMgr.consistentId(ctx.pdsFolderResolver().resolveFolders().consistentId());

                ctx.add(discoMgr, false);
            }

            // Start processors before discovery manager, so they will
            // be able to start receiving messages once discovery completes.
            try {
                startProcessor(COMPRESSION.createOptional(ctx));
                startProcessor(new GridMarshallerMappingProcessor(ctx));
                startProcessor(new MvccProcessorImpl(ctx));
                startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
                startProcessor(new GridAffinityProcessor(ctx));
                startProcessor(createComponent(GridSegmentationProcessor.class, ctx));

                startTimer.finishGlobalStage("Start managers");

                startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));

                startTimer.finishGlobalStage("Configure binary metadata");

                startProcessor(createComponent(IGridClusterStateProcessor.class, ctx));
                startProcessor(new PerformanceStatisticsProcessor(ctx));
                startProcessor(new GridCacheProcessor(ctx));

                if (cfg.isAuthenticationEnabled()) {
                    IgniteSecurityProcessor sec = (IgniteSecurityProcessor)ctx.security();

                    ((IgniteAuthenticationProcessor)sec.securityProcessor()).startProcessor();
                }

                startProcessor(new IndexProcessor(ctx));
                startProcessor(new GridQueryProcessor(ctx));
                startProcessor(new ClientListenerProcessor(ctx));
                startProcessor(createServiceProcessor());
                startProcessor(new GridTaskSessionProcessor(ctx));
                startProcessor(new GridJobProcessor(ctx));
                startProcessor(new GridTaskProcessor(ctx));
                startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
                startProcessor(createComponent(IgniteRestProcessor.class, ctx));
                startProcessor(new DataStreamProcessor(ctx));
                startProcessor(new GridContinuousProcessor(ctx));
                startProcessor(new DataStructuresProcessor(ctx));
                startProcessor(createComponent(PlatformProcessor.class, ctx));
                startProcessor(new DistributedMetaStorageImpl(ctx));
                startProcessor(new DistributedConfigurationProcessor(ctx));
                startProcessor(new DurableBackgroundTasksProcessor(ctx));

                startTimer.finishGlobalStage("Start processors");

                // Start plugins.
                for (PluginProvider provider : ctx.plugins().allProviders()) {
                    ctx.add(new GridPluginComponent(provider));

                    provider.start(ctx.plugins().pluginContextForProvider(provider));

                    startTimer.finishGlobalStage("Start '" + provider.name() + "' plugin");
                }

                // Start platform plugins.
                if (ctx.config().getPlatformConfiguration() != null)
                    startProcessor(new PlatformPluginProcessor(ctx));

                mBeansMgr.registerMBeansDuringInitPhase();

                ctx.cluster().initDiagnosticListeners();

                fillNodeAttributes(clusterProc.updateNotifierEnabled());

                ctx.cache().context().database().notifyMetaStorageSubscribersOnReadyForRead();

                ((DistributedMetaStorageImpl)ctx.distributedMetastorage()).inMemoryReadyForRead();

                startTimer.finishGlobalStage("Init metastore");

                ctx.cache().context().database().startMemoryRestore(ctx, startTimer);

                ctx.recoveryMode(false);

                startTimer.finishGlobalStage("Finish recovery");
            }
            catch (Throwable e) {
                U.error(
                    log, "Exception during start processors, node will be stopped and close connections", e);

                // Stop discovery spi to close tcp socket.
                ctx.discovery().stop(true);

                throw e;
            }

            // All components exept Discovery are started, time to check if maintenance is still needed.
            mntcProcessor.prepareAndExecuteMaintenance();

            gw.writeLock();

            try {
                gw.setState(STARTED);

                // Start discovery manager last to make sure that grid is fully initialized.
                startManager(discoMgr);
            }
            finally {
                gw.writeUnlock();
            }

            startTimer.finishGlobalStage("Join topology");

            // Check whether UTF-8 is the default character encoding.
            checkFileEncoding();

            // Check whether physical RAM is not exceeded.
            checkPhysicalRam();

            // Suggest configuration optimizations.
            suggestOptimizations(cfg);

            // Suggest JVM optimizations.
            ctx.performance().addAll(JvmConfigurationSuggestions.getSuggestions());

            // Suggest Operation System optimizations.
            ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions());

            DiscoveryLocalJoinData joinData = ctx.discovery().localJoin();

            IgniteInternalFuture<Boolean> transitionWaitFut = joinData.transitionWaitFuture();

            // Notify discovery manager the first to make sure that topology is discovered.
            // Active flag is not used in managers, so it is safe to pass true.
            ctx.discovery().onKernalStart(true);

            // Notify IO manager the second so further components can send and receive messages.
            // Must notify the IO manager before transition state await to make sure IO connection can be established.
            ctx.io().onKernalStart(true);

            boolean active;

            if (transitionWaitFut != null) {
                if (log.isInfoEnabled()) {
                    log.info("Join cluster while cluster state transition is in progress, " +
                        "waiting when transition finish.");
                }

                active = transitionWaitFut.get();
            }
            else
                active = joinData.active();

            startTimer.finishGlobalStage("Await transition");

            ctx.pools().registerMetrics();

            registerMetrics();

            ctx.cluster().registerMetrics();

            // Register MBeans.
            mBeansMgr.registerMBeansAfterNodeStarted();

            boolean recon = false;

            // Callbacks.
            for (GridComponent comp : ctx) {
                // Skip discovery manager.
                if (comp instanceof GridDiscoveryManager)
                    continue;

                // Skip IO manager.
                if (comp instanceof GridIoManager)
                    continue;

                if (comp instanceof GridPluginComponent)
                    continue;

                if (!skipDaemon(comp)) {
                    try {
                        comp.onKernalStart(active);
                    }
                    catch (IgniteNeedReconnectException e) {
                        ClusterNode locNode = ctx.discovery().localNode();

                        assert locNode.isClient();

                        if (!ctx.discovery().reconnectSupported())
                            throw new IgniteCheckedException("Client node in forceServerMode " +
                                "is not allowed to reconnect to the cluster and will be stopped.");

                        if (log.isDebugEnabled())
                            log.debug("Failed to start node components on node start, will wait for reconnect: " + e);

                        recon = true;
                    }
                }
            }

            // Start plugins.
            for (PluginProvider provider : ctx.plugins().allProviders())
                provider.onIgniteStart();

            if (recon)
                reconnectState.waitFirstReconnect();

            // Lifecycle bean notifications.
            notifyLifecycleBeans(AFTER_NODE_START);
        }
        catch (Throwable e) {
            IgniteSpiVersionCheckException verCheckErr = X.cause(e, IgniteSpiVersionCheckException.class);

            if (verCheckErr != null)
                U.error(log, verCheckErr.getMessage());
            else if (X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class))
                U.warn(log, "Grid startup routine has been interrupted (will rollback).");
            else
                U.error(log, "Got exception while starting (will rollback startup routine).", e);

            errHnd.apply();

            stop(true);

            if (e instanceof Error)
                throw e;
            else if (e instanceof IgniteCheckedException)
                throw (IgniteCheckedException)e;
            else
                throw new IgniteCheckedException(e);
        }

        // Mark start timestamp.
        startTime = U.currentTimeMillis();

        String intervalStr = IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);

        // Start starvation checker if enabled.
        boolean starveCheck = !isDaemon() && !"0".equals(intervalStr);

        if (starveCheck) {
            final long interval = F.isEmpty(intervalStr) ? DFLT_PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);

            starveTask = ctx.timeout().schedule(new Runnable() {
                /** Last completed task count. */
                private long lastCompletedCntPub;

                /** Last completed task count. */
                private long lastCompletedCntSys;

                /** Last completed task count. */
                private long lastCompletedCntQry;

                @Override public void run() {
                    if (ctx.pools().getExecutorService() instanceof ThreadPoolExecutor) {
                        ThreadPoolExecutor exec = (ThreadPoolExecutor)ctx.pools().getExecutorService();

                        lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub, "public");
                    }

                    if (ctx.pools().getSystemExecutorService() instanceof ThreadPoolExecutor) {
                        ThreadPoolExecutor exec = (ThreadPoolExecutor)ctx.pools().getSystemExecutorService();

                        lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
                    }

                    if (ctx.pools().getQueryExecutorService() instanceof ThreadPoolExecutor) {
                        ThreadPoolExecutor exec = (ThreadPoolExecutor)ctx.pools().getQueryExecutorService();

                        lastCompletedCntQry = checkPoolStarvation(exec, lastCompletedCntQry, "query");
                    }

                    if (ctx.pools().getStripedExecutorService() != null)
                        ctx.pools().getStripedExecutorService().detectStarvation();
                }

                /**
                 * @param exec Thread pool executor to check.
                 * @param lastCompletedCnt Last completed tasks count.
                 * @param pool Pool name for message.
                 * @return Current completed tasks count.
                 */
                private long checkPoolStarvation(
                    ThreadPoolExecutor exec,
                    long lastCompletedCnt,
                    String pool
                ) {
                    long completedCnt = exec.getCompletedTaskCount();

                    // If all threads are active and no task has completed since last time and there is
                    // at least one waiting request, then it is possible starvation.
                    if (exec.getPoolSize() == exec.getActiveCount() && completedCnt == lastCompletedCnt &&
                        !exec.getQueue().isEmpty())
                        LT.warn(
                            log,
                            "Possible thread pool starvation detected (no task completed in last " +
                                interval + "ms, is " + pool + " thread pool size large enough?)");

                    return completedCnt;
                }
            }, interval, interval);
        }

        long metricsLogFreq = cfg.getMetricsLogFrequency();

        if (metricsLogFreq > 0) {
            metricsLogTask = ctx.timeout().schedule(new Runnable() {
                private final DecimalFormat dblFmt = doubleFormat();

                @Override public void run() {
                    ackNodeMetrics(
                        dblFmt,
                        ctx.pools().getExecutorService(),
                        ctx.pools().getSystemExecutorService(),
                        ctx.pools().getStripedExecutorService(),
                        ctx.pools().customExecutors()
                    );
                }
            }, metricsLogFreq, metricsLogFreq);
        }

        ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled());

        ctx.performance().logSuggestions(log, igniteInstanceName);

        U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}");

        if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET, true))
            ackClassPathContent();

        ackStart(rtBean);

        if (!isDaemon())
            ctx.discovery().ackTopology(ctx.discovery().localJoin().joinTopologyVersion().topologyVersion(),
                EventType.EVT_NODE_JOINED, localNode());

        startTimer.finishGlobalStage("Await exchange");
    }

这是一个 Apache Ignite 节点核心启动方法,非常关键。我们来逐步、深入地理解它。


🎯 一句话总结

这个 start(...) 方法是 Ignite 节点的“心脏起搏器” —— 它负责:

  1. 防止重复启动
  2. 验证配置
  3. 初始化日志、诊断、资源注入
  4. 依次启动各种处理器(Processor)和管理器(Manager)
  5. 触发生命周期事件(如 BEFORE_NODE_START)
  6. 最终让节点加入集群并开始提供服务

我们可以把这个方法看作是 Ignite 启动流程的“主控台”,它协调了所有子系统的初始化顺序。


🧱 一、方法签名解析

public void start(
    final IgniteConfiguration cfg,
    GridAbsClosure errHnd,
    WorkersRegistry workerRegistry,
    Thread.UncaughtExceptionHandler hnd,
    TimeBag startTimer
) throws IgniteCheckedException
参数 说明
cfg Ignite 配置对象,包含所有设置(缓存、网络、安全等)
errHnd 错误处理器,用于处理启动过程中的问题
workerRegistry 线程工作注册表,管理后台线程
hnd 默认未捕获异常处理器(给线程池用)
startTimer 启动耗时统计器,用于性能分析

🔁 二、整体流程概览(简化版)

1. 检查状态:是否已启动/正在启动?
2. 设置为 STARTING 状态
3. 验证配置
4. 初始化日志、JVM 暂停检测器
5. 打印各种启动日志(Logo、配置、JVM 参数等)
6. 检查用户属性命名合法性
7. 创建核心上下文 GridKernalContextImpl
8. 启动关键处理器(Diagnostic, Cluster, Resource 等)
9. 注入资源到 LifecycleBean
10. 触发 BEFORE_NODE_START 事件
11. 启动所有 LifecycleAware 组件(SSL、序列化器等)
12. 依次启动 SPI Managers(IO、部署、事件存储等)
13. 创建 DiscoveryManager(发现其他节点)
14. 启动缓存、MVCC、压缩、分片等核心处理器
15. 如果启用认证,启动安全模块
...
→ 最终节点加入集群

🔐 三、关键步骤详解

✅ 1. 状态检查与防重入(线程安全)

gw.writeLock();
try {
    switch (gw.getState()) {
        case STARTED: 
            U.warn(log, "Grid has already been started (ignored).");
            return;
        case STARTING: 
            U.warn(log, "Grid is already in process of being started (ignored).");
            return;
        case STOPPING: 
            throw new IgniteCheckedException("Grid is in process of being stopped");
        case STOPPED: 
            break;
    }
    gw.setState(STARTING);
} finally {
    gw.writeUnlock();
}
  • 使用 GridKernalGateway 的读写锁保证线程安全。
  • 防止重复启动或在停止过程中启动。
  • 这是典型的 状态机模式(State Machine)。

✅ 2. 配置验证与日志初始化

validateCommon(cfg); // 基本配置校验

log = cfg.getGridLogger().getLogger(...); // 创建专用日志实例
  • 确保配置合法。
  • 为当前节点创建独立的日志输出。

✅ 3. JVM 暂停检测器启动

longJVMPauseDetector.start();
  • 监控 JVM 是否出现长时间 GC 或暂停,影响集群稳定性。

✅ 4. 打印大量“ack”信息(用于诊断)

ackAsciiLogo();           // 打印 Ignite 的 ASCII 艺术 Logo
ackConfigUrl();           // 配置文件路径
ackConfiguration(cfg);    // 主要配置项摘要
ackVmArguments();         // JVM 启动参数
ackSystemProperties();    // 系统属性
ackCacheConfiguration();  // 缓存配置摘要
...

💡 这些不是“功能”,而是 运维诊断信息,帮助你确认:

  • 用了哪个配置文件?
  • JVM 参数对不对?
  • 缓存配置是否正确?
  • 是否启用了 P2P 类加载?

✅ 5. 用户属性命名检查

if (name.startsWith(ATTR_PREFIX))
    throw new IgniteCheckedException("User attribute has illegal name...");
  • 防止用户使用以 ATTR_PREFIX(如 org.apache.ignite.)开头的属性名,避免与内部属性冲突。

✅ 6. 创建核心上下文:GridKernalContextImpl

ctx = new GridKernalContextImpl(...);
  • 这是整个 Ignite 节点的“大脑”。
  • 后续所有组件都依赖这个 ctx
  • 包含日志、配置、插件、类加载器、线程池等。

✅ 7. 启动关键处理器(Processor)

Ignite 使用“Processor 模式”组织功能模块:

处理器 作用
DiagnosticProcessor 启动诊断功能
ClusterProcessor 集群拓扑管理
GridResourceProcessor 资源注入(@IgniteInstanceResource 等)
GridClosureProcessor 闭包/任务执行
GridPortProcessor 端口管理
GridTimeoutProcessor 超时调度
GridMarshallerMappingProcessor 序列化类映射
MvccProcessorImpl 多版本并发控制(事务支持)
GridAffinityProcessor 数据分片(Affinity Function)
GridCacheProcessor 缓存系统核心

⚠️ 顺序很重要! 比如:

  • ResourceProcessor 必须最先启动,因为其他组件需要依赖它做注入。
  • TimeoutProcessor 要在 Managers 之前启动,因为 Managers 可能依赖它做定时任务。

✅ 8. 资源注入到 LifecycleBean

if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
    for (LifecycleBean bean : cfg.getLifecycleBeans()) {
        if (bean != null)
            rsrcProc.inject(bean); // 注入 @IgniteInstanceResource 等
    }
}
  • 使用 GridResourceProcessor 对用户自定义的 LifecycleBean 进行依赖注入。
  • 注入完成后,这些 Bean 就可以使用 Ignite 实例了。

✅ 9. 触发生命周期事件:BEFORE_NODE_START

notifyLifecycleBeans(BEFORE_NODE_START);
  • 调用所有 LifecycleBean.onLifecycleEvent(BEFORE_NODE_START)
  • 此时 Ignite 实例尚未完全启动,不能使用 Ignite API。

✅ 10. 启动所有 LifecycleAware 组件

U.startLifecycleAware(lifecycleAwares(cfg));
  • 调用所有实现了 LifecycleAware 接口的组件的 start() 方法。
  • 包括:序列化器、SSL 工厂、日志、消息拦截器等。
  • 这是 组件级初始化

✅ 11. 启动 SPI Managers

startManager(new GridIoManager(ctx));
startManager(new GridDeploymentManager(ctx));
startManager(new GridEventStorageManager(ctx));
...
  • Managers 负责底层服务:
    • GridIoManager: 网络通信
    • GridDeploymentManager: 类加载与部署
    • GridEventStorageManager: 事件存储
    • GridCollisionManager: 任务冲突解决
    • GridIndexingManager: SQL 索引

⚠️ 顺序很重要!比如 IoManager 要早于其他 Manager,因为通信是基础。


✅ 12. DiscoveryManager:加入集群的关键

GridDiscoveryManager discoMgr = new GridDiscoveryManager(ctx);
ctx.add(discoMgr, false);
  • 这是 节点发现其他节点、形成集群的核心组件
  • 使用配置的 Discovery SPI(如 TcpDiscoverySpi、ZooKeeper 等)。
  • 只有 Discovery 启动后,节点才能“看到”集群。

✅ 13. 维护模式(Maintenance Mode)支持

if (mntcProcessor.isMaintenanceMode()) {
    cfg.setDiscoverySpi(new IsolatedDiscoverySpi()); // 孤立模式
    ...
}
  • 允许节点在“维护模式”下启动,不加入集群,用于数据修复、备份等。

✅ 14. 启动缓存系统

startProcessor(new GridCacheProcessor(ctx));
  • 这是 缓存功能的核心
  • 负责创建缓存、管理分区、处理读写请求。
  • 在 Discovery 之后启动,确保能获取集群拓扑。

✅ 15. 安全模块(如果启用)

if (cfg.isAuthenticationEnabled()) {
    startProcessor(new IgniteSecurityProcessor(ctx));
}
  • 启用用户名/密码认证、权限控制等。

🔄 四、启动顺序的重要性

Ignite 的启动是 严格有序的,因为组件之间有依赖关系:

1. ResourceProcessor     ← 提供依赖注入
   ↓
2. ClosureProcessor      ← 任务执行
   ↓
3. TimeoutProcessor      ← 定时任务
   ↓
4. IoManager             ← 网络通信
   ↓
5. DiscoveryManager      ← 发现集群
   ↓
6. CacheProcessor        ← 缓存系统
   ↓
7. SecurityProcessor     ← 安全认证
   ↓
✅ 节点启动完成,可提供服务

如果顺序错乱,可能导致 NPE 或功能异常。


📊 五、startTimer 的作用

startTimer.finishGlobalStage("Start managers");
startTimer.finishGlobalStage("Configure binary metadata");
  • 用于 性能分析和启动耗时监控
  • 记录每个阶段花了多长时间,帮助优化启动速度。

✅ 六、总结:这个方法到底干了什么?

阶段 做了什么
准备阶段 状态检查、加锁、配置验证
日志与诊断 初始化日志、JVM 暂停检测、打印各种 ack 信息
上下文创建 构建 GridKernalContext,作为“大脑”
核心组件启动 Processor 和 Manager 按顺序启动
生命周期管理 触发 BEFORE_NODE_START,启动 LifecycleAware 组件
集群接入 启动 Discovery,加入集群
功能启用 缓存、安全、SQL、事务等

🎨 类比理解

类比 说明
汽车启动 检查档位 → 打火 → 仪表盘自检 → 发动机启动 → 变速箱工作 → 上路
操作系统启动 BIOS → Bootloader → 内核初始化 → 加载驱动 → 启动服务 → 用户登录
Spring Boot 启动 SpringApplication → ApplicationContext → Bean 初始化 → 启动嵌入式容器

💡 最后建议

这个方法虽然很长,但结构清晰:

  • 防御 → 初始化 → 顺序启动 → 集群接入

你可以:

  1. 先关注 状态机控制(gw 锁)
  2. 再看 Processor/Manager 的启动顺序
  3. 最后理解 LifecycleBean 和 LifecycleAware 的调用时机

这是理解 Ignite 内部架构的“钥匙”。

需要我画一个完整的启动流程图,或者解释某个具体 Processor 的作用吗?


网站公告

今日签到

点亮在社区的每一天
去签到