Spring 和 Lettuce 源码分析 Redis 节点状态检查与失败重连的工作原理

发布于:2025-08-17 ⋅ 阅读:(15) ⋅ 点赞:(0)

关键步骤:

  1. Spring Boot 启动时创建 LettuceConnectionFactory

  2. 根据配置类型(集群/哨兵/单机)初始化客户端

  3. 对于集群模式:

    • 创建 RedisClusterClient

    • 调用 setOptions(getClusterClientOptions(configuration)) 应用配置

2. 节点状态检查机制

拓扑刷新选项加载:

java

private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {
    ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();
    return clusterClientOptions != null ? 
           clusterClientOptions.getTopologyRefreshOptions() : 
           FALLBACK_OPTIONS;
}
状态检查触发条件:

java

private boolean isEnabled(RefreshTrigger refreshTrigger) {
    return getClusterTopologyRefreshOptions()
           .getAdaptiveRefreshTriggers()
           .contains(refreshTrigger);
}

3. 失败重连原理

重连事件处理:

java

@Override
public void onReconnectAttempt(int attempt) {
    if (isEnabled(RefreshTrigger.PERSISTENT_RECONNECTS) &&
        attempt >= getRefreshTriggersReconnectAttempts()) {
        if (indicateTopologyRefreshSignal()) {
            emitAdaptiveRefreshScheduledEvent();
        }
    }
}
工作流程:
  1. 当发生网络断开时,Lettuce 自动尝试重连

  2. 每次重连尝试都会调用 onReconnectAttempt 方法

  3. 检查是否启用了 PERSISTENT_RECONNECTS 触发器

  4. 检查重连次数是否达到阈值(refreshTriggersReconnectAttempts

  5. 如果满足条件,触发拓扑刷新事件

4. 拓扑刷新执行过程

刷新激活机制:

java

private void activateTopologyRefreshIfNeeded() {
    if (getOptions() instanceof ClusterClientOptions) {
        ClusterClientOptions options = (ClusterClientOptions) getOptions();
        ClusterTopologyRefreshOptions topologyRefreshOptions = 
            options.getTopologyRefreshOptions();
        
        if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || 
            clusterTopologyRefreshActivated.get()) {
            return;
        }
        
        if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {
            // 创建定时刷新任务
            ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(
                clusterTopologyRefreshScheduler,
                options.getRefreshPeriod().toNanos(),
                options.getRefreshPeriod().toNanos(),
                TimeUnit.NANOSECONDS
            );
            clusterTopologyRefreshFuture.set(scheduledFuture);
        }
    }
}
刷新过程:
  1. 检查周期性刷新是否启用

  2. 确保只有一个刷新任务被激活(原子操作)

  3. 创建定时任务,按配置的时间间隔执行刷新

  4. 刷新时重新获取集群拓扑信息

  5. 更新客户端内部节点路由表

5. 其他触发机制

除了重连触发外,还有多种触发条件:

java

public enum RefreshTrigger {
    MOVED_REDIRECT,    // MOVED 重定向
    ASK_REDIRECT,      // ASK 重定向
    PERSISTENT_RECONNECTS, // 持续重连
    UNCOVERED_SLOT,    // 未覆盖的槽位
    UNKNOWN_NODE       // 未知节点
}

对应的事件处理方法:

java

@Override
public void onMovedRedirection() {
    if (isEnabled(RefreshTrigger.MOVED_REDIRECT)) {
        if (indicateTopologyRefreshSignal()) {
            emitAdaptiveRefreshScheduledEvent();
        }
    }
}

6. 配置生效关键点

在 Spring Boot 中确保配置生效的关键:

  1. 集群模式必须启用:仅当使用集群配置时相关逻辑才会执行

  2. 正确设置 ClientOptions:通过 LettuceClientConfigurationBuilderCustomizer 注入

  3. 配置拓扑刷新选项:必须包含 PERSISTENT_RECONNECTS 触发器

  4. 设置合理的重连阈值adaptiveRefreshTriggersReconnectAttempts

7. 完整工作流程

图表

代码

最佳实践建议:

  1. 多触发器组合配置:同时启用多种触发器提高集群感知能力

    java

    .enableAdaptiveRefreshTrigger(
        RefreshTrigger.PERSISTENT_RECONNECTS,
        RefreshTrigger.MOVED_REDIRECT,
        RefreshTrigger.UNKNOWN_NODE
    )
  2. 合理设置重连阈值:根据网络环境调整

    java

    .adaptiveRefreshTriggersReconnectAttempts(3) // 生产环境建议 3-5
  3. 配置超时保护:防止拓扑刷新阻塞

    java

    .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
  4. 启用周期性刷新:作为故障恢复的补充

    java

    .enablePeriodicRefresh(Duration.ofMinutes(10))

通过以上机制,Lettuce 能够在集群节点故障或网络问题时自动更新拓扑信息,保持客户端与集群的同步,实现高可用连接。

##配置重连

import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Configuration;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@Configuration
public class RedisClusterConfig {

    @Bean
    public LettuceClientConfiguration lettuceClientConfiguration() {

        // 👇 配置集群拓扑刷新选项
        ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(true)  // 开启周期刷新
                .refreshPeriod(Duration.ofSeconds(30))  // 每30秒刷新一次
                .enableAllAdaptiveRefreshTriggers()
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.ASK_REDIRECT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNKNOWN_NODE)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
                .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
                .refreshTriggersReconnectAttempts(5)
                .build();

        return LettuceClientConfiguration.builder()
                .clientOptions(ClusterClientOptions.builder()
                        .topologyRefreshOptions(topologyRefreshOptions)
                        .build())
                .build();
    }

    @Bean
    public LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties, LettuceClientConfiguration lettuceClientConfiguration) {
        RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
        RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());
        if (clusterProperties.getMaxRedirects() != null) {
            config.setMaxRedirects(clusterProperties.getMaxRedirects());
        }
        if (redisProperties.getPassword() != null) {
            config.setPassword(RedisPassword.of(redisProperties.getPassword()));
        }
        return new LettuceConnectionFactory(config, lettuceClientConfiguration);
    }

    // 可选:配置 RedisTemplate
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }

}

##源码

@Bean
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	LettuceConnectionFactory redisConnectionFactory(
			ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
			ClientResources clientResources) throws UnknownHostException {
		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
				getProperties().getLettuce().getPool());
		return createLettuceConnectionFactory(clientConfig);
	}
	
private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
		if (getSentinelConfig() != null) {
			return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
		}
		if (getClusterConfiguration() != null) {
			return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
		}
		return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
	}

public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
			LettuceClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null!");

		this.configuration = clusterConfiguration;
	}

//createClient创建client 
public void afterPropertiesSet() {

		this.client = createClient();

		this.connectionProvider = createConnectionProvider(client, CODEC);
		this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);

		if (isClusterAware()) {

			this.clusterCommandExecutor = new ClusterCommandExecutor(
					new LettuceClusterTopologyProvider((RedisClusterClient) client),
					new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
					EXCEPTION_TRANSLATION);
		}

		if (getEagerInitialization() && getShareNativeConnection()) {
			initConnection();
		}
	}

//clusterClient设置configuration配置  clusterClient.setOptions(getClusterClientOptions(configuration));
protected AbstractRedisClient createClient() {

		if (isStaticMasterReplicaAware()) {

			RedisClient redisClient = clientConfiguration.getClientResources() //
					.map(RedisClient::create) //
					.orElseGet(RedisClient::create);

			clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);

			return redisClient;
		}

		if (isRedisSentinelAware()) {

			RedisURI redisURI = getSentinelRedisURI();
			RedisClient redisClient = clientConfiguration.getClientResources() //
					.map(clientResources -> RedisClient.create(clientResources, redisURI)) //
					.orElseGet(() -> RedisClient.create(redisURI));

			clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
			return redisClient;
		}

		if (isClusterAware()) {

			List<RedisURI> initialUris = new ArrayList<>();
			ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;
			for (RedisNode node : configuration.getClusterNodes()) {
				initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));
			}

			RedisClusterClient clusterClient = clientConfiguration.getClientResources() //
					.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //
					.orElseGet(() -> RedisClusterClient.create(initialUris));

			clusterClient.setOptions(getClusterClientOptions(configuration));

			return clusterClient;
		}

		RedisURI uri = isDomainSocketAware()
				? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket())
				: createRedisURIAndApplySettings(getHostName(), getPort());

		RedisClient redisClient = clientConfiguration.getClientResources() //
				.map(clientResources -> RedisClient.create(clientResources, uri)) //
				.orElseGet(() -> RedisClient.create(uri));
		clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);

		return redisClient;
	}


private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {

        ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();

        if (clusterClientOptions != null) {
            return clusterClientOptions.getTopologyRefreshOptions();
        }

        return FALLBACK_OPTIONS;
    }

public ClusterTopologyRefreshOptions getTopologyRefreshOptions() {
        return topologyRefreshOptions;
    }



private boolean isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {
        return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);
    }




@Override
    public void onReconnectAttempt(int attempt) {

        if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
                && attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {
            if (indicateTopologyRefreshSignal()) {
                emitAdaptiveRefreshScheduledEvent();
            }
        }
    }

@Override
    public void onMovedRedirection() {

        if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)) {
            if (indicateTopologyRefreshSignal()) {
                emitAdaptiveRefreshScheduledEvent();
            }
        }
    }

private void activateTopologyRefreshIfNeeded() {

        if (getOptions() instanceof ClusterClientOptions) {
            ClusterClientOptions options = (ClusterClientOptions) getOptions();
            ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();

            if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {
                return;
            }

            if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {
                ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,
                        options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);
                clusterTopologyRefreshFuture.set(scheduledFuture);
            }
        }
    }




    
    

		


网站公告

今日签到

点亮在社区的每一天
去签到