Spring Cloud LoadBalancer 核心原理

发布于:2025-09-06 ⋅ 阅读:(12) ⋅ 点赞:(0)

Spring Cloud 从 2020.0.0 版本(Ilford 版本)开始正式移除了对 Ribbon 的支持,在这个版本及之后的版本中,Ribbon 不再作为默认的客户端负载均衡器包含在 Spring Cloud 的发布版中,取而代之的是 Spring Cloud LoadBalancer 。

主要特点

• 轻量级:作为一个较新的项目,Spring Cloud Load Balancer 在设计上更加现代化且轻量,旨在提供一个更简单、更易于使用的替代方案来取代 Netflix Ribbon。
• 内置多种负载均衡策略:除了默认提供的轮询、随机等策略外,还允许用户自定义策略。
• 无缝集成 Spring 生态系统:作为 Spring Cloud 的一部分,它与 Spring Boot 和其他 Spring 项目有着天然的整合优势,可以方便地与其他 Spring 组件( Eureka 等)协同工作。

简单使用示例

pom.xml 添加依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

在 application.yml 中配置注册到 Eureka:

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

在 Spring Boot 应用中创建一个配置类,用于定义带 @LoadBalanced 注解的 RestTemplate,使 RestTemplate 支持服务发现和负载均衡。

import org.springframework.cloud.client.loadbalanced.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@Configuration
public class RestTemplateConfig {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

通过服务名来调用微服务:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class MyRestService {

    private final RestTemplate restTemplate;

    @Autowired
    public MyRestService(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    public String getDataFromService(String serviceId) {
        // serviceId 是在服务注册中心注册的服务名称,而不是实际的主机名或IP地址
        // LoadBalancer 将自动解析服务ID到具体的服务实例,并执行负载均衡策略。
        return restTemplate.getForObject("http://" + serviceId + "/your-endpoint", String.class);
    }
}

实现原理

以下源码基于 spring-cloud-starter-loadbalancer:2.2.9.RELEASE

核心组件
• LoadBalancerInterceptor:
• 作用:拦截 RestTemplate 请求,实现客户端负载均衡。
• 初始化:由 LoadBalancerAutoConfiguration 自动配置并绑定到 RestTemplate进行。
• 拦截逻辑:
• 检查请求 URI 是否为服务名(如 http://service-name)。
• 通过 LoadBalancerClient 选择实例并重构 URI。
• LoadBalancerClient:
• 作用:封装负载均衡逻辑,负责重构 URL 并执行请求。
• 实现类:
• BlockingLoadBalancerClient:同步请求的负载均衡客户端(默认)。
• ReactorLoadBalancerClient:响应式请求的负载均衡客户端。
• ReactiveLoadBalancer:
• 作用:定义负载均衡的抽象接口,负责选择服务实例。
• 实现类:
• RoundRobinLoadBalancer:轮询策略。
• RandomLoadBalancer:随机策略。
• ReactorServiceInstanceLoadBalancer:服务实例的负载均衡接口,支持自定义策略。
• ServiceInstanceListSupplier:
• 作用:提供服务实例列表,通常是通过服务发现组件(如 Eureka、Nacos)获取。
• 默认实现:DiscoveryClientServiceInstanceListSupplier,从注册中心动态获取服务实例。
• 生命周期管理:通过 refreshInterval 配置定期刷新实例列表。

工作流程
• 初始化阶段:
• 自动配置:Spring Boot 启动时加载 LoadBalancerAutoConfiguration,创建 LoadBalancerInterceptor 和 LoadBalancerClientFactory。LoadBalancerInterceptor 被注入到 RestTemplate 的拦截器链中。
• 服务发现集成:ServiceInstanceListSupplier 从注册中心(如 Eureka、Nacos)获取服务实例列表,并缓存到本地,并通过 DiscoveryClient 动态监听服务实例的变化(如实例上线 / 下线)。
• 负载均衡策略初始化:默认使用 RoundRobinLoadBalancer,可通过配置切换为 RandomLoadBalancer 或自定义策略。
• 请求处理阶段:
• 拦截请求:LoadBalancerInterceptor 拦截 RestTemplate 的请求,检查 URI 是否为服务名(如 http://service-name)。如果是服务名,则调用 LoadBalancerClient 进行负载均衡。
• 选择服务实例:LoadBalancerClient 调用负载均衡器 ReactiveLoadBalancer 的 choose 方法选择服务实例。负载均衡器 ReactiveLoadBalancer 的具体负载均衡策略实现有轮询(RoundRobinLoadBalancer)、随机(RandomLoadBalancer)等。实际上,ReactiveLoadBalancer 内部会基于服务实例列表提供者 ServiceInstanceListSupplier 去获取所有可用服务实例列表,比如 DiscoveryClientServiceInstanceListSupplier 会通过服务发现( eureka 等)获取服务实例列表。
• 执行服务请求:获取到服务实例后,就可以将服务名替换为实际的 IP 和端口,然后通过 HTTP 发送请求到目标服务实例。
• 故障处理与重试:
• 失败重试:如果请求失败(如超时、5xx 错误),可配置重试策略,选择另一个实例重新尝试。
• 健康检查:结合服务发现组件的健康检查机制,确保只选择健康的实例。

// 拦截 RestTemplate 的请求并将其路由到适当的服务实例
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    // (省略其他)...

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, 
final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        // 获取服务名称
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        // 通过 LoadBalancerClient,基于服务名称选择合适的服务实例,并执行实际的 HTTP 请求
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

public class BlockingLoadBalancerClient implements LoadBalancerClient {
    private final LoadBalancerClientFactory loadBalancerClientFactory;

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
            throws IOException {
        // 根据服务名称,选取一个服务实例
        ServiceInstance serviceInstance = choose(serviceId);
        if (serviceInstance == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        // 将请求转发给指定的服务实例
        return execute(serviceId, serviceInstance, request);
    }

    @Override
    public ServiceInstance choose(String serviceId) {
        // 获取负载均衡器
        ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory
                .getInstance(serviceId);
        if (loadBalancer == null) {
            returnnull;
        }
        // 通过负载均衡器选择服务实例,将响应式逻辑转换为同步调用
        Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose())
                .block();
        if (loadBalancerResponse == null) {
            returnnull;
        }
        // 返回服务实例
        return loadBalancerResponse.getServer();
    }

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        try {
            // 执行实际的 HTTP 请求
            return request.apply(serviceInstance);
        }
        catch (IOException iOException) {
            throw iOException;
        }
        catch (Exception exception) {
            ReflectionUtils.rethrowRuntimeException(exception);
        }
        return null;
    }


}

// 负载均衡器 以轮询策略为例
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    // (省略其他)...

    // 提供服务实例列表的供应者。
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        // 获取一个可用的 ServiceInstanceListSupplier 实例(如 DiscoveryClientServiceInstanceListSupplier)
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
        // 通过 supplier 获取所有可用服务实例列表,然后通过轮询选取一个实例
        return supplier.get().next()
                .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
    }

    private Response<ServiceInstance> processInstanceResponse(
            ServiceInstanceListSupplier supplier,
            List<ServiceInstance> serviceInstances) {
        // 从服务实例列表中选择一个实例
        Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
        // 如果 supplier 实现了 SelectedInstanceCallback 接口,并且当前选择了有效的服务实例,则调用回调通知所选实例
        if (supplier instanceof SelectedInstanceCallback
                && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier)
                    .selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    // 从服务实例列表中选择一个实例
    private Response<ServiceInstance> getInstanceResponse(
            List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        // 计算下一个位置索引,以实现轮询选择机制
        int pos= Math.abs(this.position.incrementAndGet());
        // 根据计算出的位置索引从服务实例列表中选取一个实例,并返回包含该实例的响应
        ServiceInstance instance = instances.get(pos % instances.size());
        return new DefaultResponse(instance);
    }
}

// 通过服务发现提供服务实例列表的Supplier
public class DiscoveryClientServiceInstanceListSupplier
         implements ServiceInstanceListSupplier {

    // (省略其他)...

    // 响应式流,提供服务实例列表
    private final Flux<List<ServiceInstance>> serviceInstances;

    // 服务发现客户端 DiscoveryClient 是通过 bean 自动注入的,比如 eureka 包中的 EurekaDiscoveryClient
    public DiscoveryClientServiceInstanceListSupplier (DiscoveryClient delegate, Environment environment) {
        // (省略其他)...

        // 设置服务实例列表
        this.serviceInstances = Flux
                // 使用 Flux.defer 每次订阅都重新执行 getInstances(),保证每次都是最新的实例列表
                // delegate.getInstances(serviceId):调用底层服务发现客户端(如 EurekaDiscoveryClient)获取服务实例列表
                .defer(() -> Mono.fromCallable(() -> delegate.getInstances(serviceId)))
                // 如果超过 timeout 时间未返回结果,则触发超时逻辑,记录日志并返回空列表
                .timeout(timeout, Flux.defer(() -> {
                    logTimeout();
                    return Flux.just(new ArrayList<>());
                }), Schedulers.boundedElastic())
                // 捕获所有异常,记录错误日志,并返回空列表,防止链式中断
                .onErrorResume(error -> {
                    logException(error);
                    return Flux.just(new ArrayList<>());
                });
    }

    // 获取服务实例列表
    @Override
    public Flux<List<ServiceInstance>> get() {
        return serviceInstances;
    }

}

扩展性设计
• 服务发现集成:支持多种服务发现组件(如 Eureka、Consul、Nacos),只需实现 ServiceInstanceListSupplier。示例:NacosServiceInstanceListSupplier 集成 Nacos 注册中心。
• 自定义负载均衡策略:通过实现 ReactorServiceInstanceLoadBalancer 接口,可定义基于权重、响应时间等复杂策略。
• 动态配置:通过 LoadBalancerProperties 动态调整负载均衡器的全局和特定服务的配置参数。它支持开发者通过 application.yml 或 application.properties 文件灵活地配置负载均衡行为,例如重试机制、超时时间、负载均衡策略等。


网站公告

今日签到

点亮在社区的每一天
去签到