[spring-cloud: 负载均衡]-源码分析

发布于:2025-08-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

获取服务列表

ServiceInstanceListSupplier

ServiceInstanceListSupplier 接口是一个提供 ServiceInstance 列表的供应者,返回一个响应式流 Flux<List<ServiceInstance>>,用于服务发现。

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {

	String getServiceId();

	default Flux<List<ServiceInstance>> get(Request request) {
		return get();
	}

	static ServiceInstanceListSupplierBuilder builder() {
		return new ServiceInstanceListSupplierBuilder();
	}

}

DelegatingServiceInstanceListSupplier

DelegatingServiceInstanceListSupplier 是一个抽象类,继承自 ServiceInstanceListSupplier,它通过委托给另一个 ServiceInstanceListSupplier 实例来实现其功能,同时支持选定服务实例的回调、初始化和销毁操作。

public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, SelectedInstanceCallback, InitializingBean, DisposableBean {

	protected final ServiceInstanceListSupplier delegate;

	public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
		Assert.notNull(delegate, "delegate may not be null");
		this.delegate = delegate;
	}

	public ServiceInstanceListSupplier getDelegate() {
		return delegate;
	}

	@Override
	public String getServiceId() {
		return delegate.getServiceId();
	}

	@Override
	public void selectedServiceInstance(ServiceInstance serviceInstance) {
		if (delegate instanceof SelectedInstanceCallback selectedInstanceCallbackDelegate) {
			selectedInstanceCallbackDelegate.selectedServiceInstance(serviceInstance);
		}
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		if (delegate instanceof InitializingBean) {
			((InitializingBean) delegate).afterPropertiesSet();
		}
	}

	@Override
	public void destroy() throws Exception {
		if (delegate instanceof DisposableBean) {
			((DisposableBean) delegate).destroy();
		}
	}

}

负载均衡实现

ReactorLoadBalancer

ReactorLoadBalancer 是基于 Reactor 实现的响应式负载均衡器,通过 Mono<Response<T>> 异步选择服务实例。

public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {

	@SuppressWarnings("rawtypes")
	Mono<Response<T>> choose(Request request);

	default Mono<Response<T>> choose() {
		return choose(REQUEST);
	}

}

ReactorServiceInstanceLoadBalancer

ReactorServiceInstanceLoadBalancer 是一个标记接口,继承自 ReactorLoadBalancer,专门用于选择 ServiceInstance 对象的负载均衡器。

// RandomLoadBalancer, RoundRobinLoadBalancer
public interface ReactorServiceInstanceLoadBalancer extends ReactorLoadBalancer<ServiceInstance> {}

核心代码逻辑

推荐阅读:[spring-cloud: @LoadBalanced & @LoadBalancerClient]-源码分析

1. BlockingLoadBalancerInterceptor

// LoadBalancerInterceptor, RetryLoadBalancerInterceptor 
public interface BlockingLoadBalancerInterceptor extends ClientHttpRequestInterceptor {}

LoadBalancerInterceptor

public class LoadBalancerInterceptor implements BlockingLoadBalancerInterceptor {

	private final LoadBalancerClient loadBalancer;

	private final LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	// 重点!
	@Override
	public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
			throws IOException {
		URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		return loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}

}

2. BlockingLoadBalancerClient

ServiceInstanceChooser

ServiceInstanceChooser 接口用于通过负载均衡器选择与指定服务ID对应的服务实例,支持带请求上下文的选择。

public interface ServiceInstanceChooser {

	ServiceInstance choose(String serviceId);

	<T> ServiceInstance choose(String serviceId, Request<T> request);

}

LoadBalancerClient

LoadBalancerClient 接口用于客户端负载均衡,选择服务实例并执行请求,同时提供将逻辑服务名重构为实际服务实例的 URI 的功能。

public interface LoadBalancerClient extends ServiceInstanceChooser {

	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	URI reconstructURI(ServiceInstance instance, URI original);

}
// BlockingLoadBalancerClientAutoConfiguration
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {
	// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
	// LoadBalancerClientFactory
	private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

	public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
		this.loadBalancerClientFactory = loadBalancerClientFactory;
	}

	// 重点!
	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
		String hint = getHint(serviceId);
		LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, buildRequestContext(request, hint));
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
		supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
		// 通过 choose 方法来选择一个合适的 ServiceInstance
		ServiceInstance serviceInstance = choose(serviceId, lbRequest);
		if (serviceInstance == null) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		return execute(serviceId, serviceInstance, lbRequest);
	}

	private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {
		if (delegate instanceof HttpRequestLoadBalancerRequest) {
			HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();
			if (request != null) {
				RequestData requestData = new RequestData(request);
				return new RequestDataContext(requestData, hint);
			}
		}
		return new DefaultRequestContext(delegate, hint);
	}

	// 通过生命周期钩子函数来管理负载均衡请求的开始与结束,并处理可能的异常,确保负载均衡的执行过程有序
	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
		if (serviceInstance == null) {
			throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);
		}
		
		DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
		
		Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
		supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
		try {
			T response = request.apply(serviceInstance);
			Object clientResponse = getClientResponse(response);
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
			return response;
		}
		catch (IOException iOException) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
			throw iOException;
		}
		catch (Exception exception) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
			ReflectionUtils.rethrowRuntimeException(exception);
		}
		return null;
	}

	private <T> Object getClientResponse(T response) {
		ClientHttpResponse clientHttpResponse = null;
		if (response instanceof ClientHttpResponse) {
			clientHttpResponse = (ClientHttpResponse) response;
		}
		if (clientHttpResponse != null) {
			try {
				return new ResponseData(clientHttpResponse, null);
			}
			catch (IOException ignored) {
			}
		}
		return response;
	}

	private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
		return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
				loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
				DefaultRequestContext.class, Object.class, ServiceInstance.class);
	}

	@Override
	public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
		return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
	}

	@Override
	public ServiceInstance choose(String serviceId) {
		return choose(serviceId, REQUEST);
	}

	// 重点!通过负载均衡器同步选择一个服务实例并返回
	@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
			return null;
		}
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
			return null;
		}
		return loadBalancerResponse.getServer();
	}

	private String getHint(String serviceId) {
		LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
		String defaultHint = properties.getHint().getOrDefault("default", "default");
		String hintPropertyValue = properties.getHint().get(serviceId);
		return hintPropertyValue != null ? hintPropertyValue : defaultHint;
	}

}

3. LoadBalancerRequestFactory

LoadBalancerRequestFactory 类用于创建封装负载均衡请求的 LoadBalancerRequest 实例,支持请求转换器和负载均衡客户端的配置。

public class LoadBalancerRequestFactory {

	private final LoadBalancerClient loadBalancer;

	private final List<LoadBalancerRequestTransformer> transformers;

	public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
			List<LoadBalancerRequestTransformer> transformers) {
		this.loadBalancer = loadBalancer;
		this.transformers = transformers;
	}

	public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
		this.loadBalancer = loadBalancer;
		transformers = new ArrayList<>();
	}

	public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) {
		return new BlockingLoadBalancerRequest(loadBalancer, transformers,
				new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));
	}

}
LoadBalancerRequestTransformer

LoadBalancerRequestTransformer 接口允许在负载均衡过程中根据不同的服务实例自定义转换 HttpRequest,如修改请求头、URL 等,同时通过 @Order 注解控制其执行顺序。

@Order(LoadBalancerRequestTransformer.DEFAULT_ORDER)
public interface LoadBalancerRequestTransformer {

	int DEFAULT_ORDER = 0;

	HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}

4. BlockingLoadBalancerRequest

BlockingLoadBalancerRequest 类实现了负载均衡请求接口,负责将原始 HTTP 请求封装为负载均衡请求,并支持应用请求转换器和执行负载均衡操作。

class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {

	private final LoadBalancerClient loadBalancer;

	private final List<LoadBalancerRequestTransformer> transformers;

	private final ClientHttpRequestData clientHttpRequestData;

	BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,
			ClientHttpRequestData clientHttpRequestData) {
		this.loadBalancer = loadBalancer;
		this.transformers = transformers;
		this.clientHttpRequestData = clientHttpRequestData;
	}

	@Override
	public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
		HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);
		if (this.transformers != null) {
			for (LoadBalancerRequestTransformer transformer : this.transformers) {
				serviceRequest = transformer.transformRequest(serviceRequest, instance);
			}
		}
		return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);
	}

	@Override
	public HttpRequest getHttpRequest() {
		return clientHttpRequestData.request;
	}

	static class ClientHttpRequestData {

		private final HttpRequest request;

		private final byte[] body;

		private final ClientHttpRequestExecution execution;

		ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {
			this.request = request;
			this.body = body;
			this.execution = execution;
		}

	}

}

网站公告

今日签到

点亮在社区的每一天
去签到