12.【.NET 8 实战--孢子记账--从单体到微服务--转向微服务】--微服务基础工具与技术--Ocelot 网关--熔断与限流

发布于:2025-03-24 ⋅ 阅读:(21) ⋅ 点赞:(0)

在微服务架构中,各服务之间相互依赖,一旦某个服务发生异常或延迟,会迅速影响到整体系统的稳定性。熔断机制通过提前监测服务的健康状态,在发现故障时自动中断请求传递,防止故障蔓延,起到保护整个系统的作用。另一方面,限流机制能够有效控制瞬时流量,避免因访问量骤增导致系统资源耗尽,从而引发性能下降或服务崩溃。通过设置访问阈值,限流不仅保障了正常请求的处理,还能限制恶意攻击或异常请求对系统的冲击。两者结合使用时,既能在服务出现问题时及时隔离故障,又能在高并发场景下平衡负载,提升系统的健壮性和响应能力。因此,在微服务环境下,熔断与限流成为确保系统高可用性和稳定性的必要防护手段。

一、熔断机制

1.1 熔断的概念
  1. 什么是熔断机制
    熔断机制是一种用于提升系统稳定性和容错能力的重要技术,常应用于微服务架构中。当某个服务因故障、超时或异常错误导致调用失败率急剧上升时,继续调用该服务可能引发整个系统的连锁故障,甚至导致雪崩式崩溃。熔断机制的核心思想是提前检测服务的异常情况,并在错误达到一定阈值时自动中断对故障服务的调用,从而防止错误扩散到其他服务。

    在实际应用中,熔断器通常有三种状态:

    • 关闭状态(Closed): 正常情况下,所有请求都会被转发到下游服务;
    • 打开状态(Open): 当错误率超过预设阈值时,熔断器进入打开状态,此时对故障服务的调用将被直接拒绝或返回默认降级响应;
    • 半开状态(Half-Open): 经过一定时间后,系统会尝试对服务进行部分调用,如果调用恢复正常,则熔断器将重新闭合,否则继续保持打开状态。

    这种机制不仅能有效隔离故障,还能为下游服务提供恢复的时间和机会,从而保障整个系统的高可用性和稳定性。

  2. 熔断的触发条件
    熔断机制的触发条件主要依赖于在一段监控时间窗口内服务请求的错误情况。具体来说,常见的触发条件包括:

    • 失败次数超过阈值
      当在设定的时间窗口内连续或累计的请求失败次数达到预先设定的上限时,就会触发熔断。比如,配置中设定在允许一定数量(如10次)失败后,再出现错误就立即打开熔断器。

    • 错误率达到预设比例
      除了绝对的失败次数,部分实现会监控整个请求流量中错误请求所占的比例。当错误率(例如超过50%)达到预定的临界值时,熔断器便会介入,避免继续调用可能故障的服务。

    • 请求超时或响应延迟
      如果下游服务响应时间过长或发生超时,也会被视为一次失败。这类情况累计到一定数量后,同样会触发熔断机制,从而保护系统不受长时间等待的影响。

    • 请求量的最低阈值
      为了防止在请求量不足时误判错误率,一些系统会先判断请求总量是否达到一定标准,再进行错误率的统计。只有在请求量达到最低阈值的前提下,错误率才会被用于判断是否触发熔断。

这些触发条件共同作用,确保当服务性能出现异常或故障迹象时,系统能及时采取保护措施,防止错误继续扩散,从而保障整个系统的高可用性和稳定性。

1.2 Ocelot 熔断配置
  1. 熔断配置项介绍
    在 Ocelot 中,熔断配置主要通过在路由中配置QoSOptions 节点来实现,该机制依赖于 Polly 库。通过这种配置,可以在下游服务出现故障或响应超时等异常情况时,自动切断后续请求,从而防止故障扩散,保护整个系统的稳定性。具体配置项包括:

    • ExceptionsAllowedBeforeBreaking
      指定在熔断器触发前允许连续出现的异常次数。若达到该次数,熔断器便会打开,后续请求直接返回错误或降级响应。例如,设置为 3 时,连续发生 3 次异常即触发熔断。

    • DurationOfBreak
      定义熔断器打开状态的持续时间。在此期间,所有请求都会被短路拒绝,待休眠期结束后,系统会进入半开状态,尝试恢复服务调用。

    • TimeoutValue
      设置请求的最大响应时间,超出此时间的请求将被视为失败,计入异常次数。

  2. 配置示例
    以下是一个示例配置,展示了如何为每个路由设置熔断器和超时策略:

    {
        "Routes": [
            {
            "DownstreamPathTemplate": "/api/{everything}",
            "DownstreamScheme": "https",
            "DownstreamHostAndPorts": [
                {
                "Host": "abc.com",
                "Port": 443
                }
            ],
            "UpstreamPathTemplate": "/gateway/{everything}",
            "UpstreamHttpMethod": [ "Get", "Post" ],
            "QoSOptions": {
                "ExceptionsAllowedBeforeBreaking": 3,
                "DurationOfBreak": 1000,
                "TimeoutValue": 5000
            }
            }
        ]
    }
    

    在这个配置中,ExceptionsAllowedBeforeBreaking设置为 3,表明在熔断器触发前允许3次异常,DurationOfBreak设置为 1000 毫秒,表示熔断器触发后将保持打开状态的时间,TimeoutValue设置为 5000 毫秒,表示如果请求超过5秒将会自动超时。

1.3 熔断状态及状态转换
  1. 熔断器的三种状态

    • Closed(关闭):正常状态,所有请求都能正常通过熔断器。
    • Open(打开):熔断器处于打开状态,所有请求都会被快速失败,不会调用实际的业务逻辑。
    • Half-Open(半开):熔断器处于半开状态,部分请求会被允许通过,以检测下游服务是否恢复。
  2. 状态切换流程

    • Closed -> Open: 当连续发生一定数量的失败(例如ExceptionsAllowedBeforeBreaking次异常)时,熔断器从关闭状态切换到打开状态。
    • Open -> Half-Open: 在熔断器保持打开状态经过一段时间(例如DurationOfBreak)后,熔断器会进入半开状态。
    • Half-Open -> Closed: 如果在半开状态下,允许通过的请求成功,则熔断器切换回关闭状态。
    • Half-Open -> Open: 如果在半开状态下,允许通过的请求失败,则熔断器重新切换到打开状态。

    以下是代码展示了熔断器的状态及其转换逻辑:

    public class AdvancedCircuitController<TResult> : CircuitStateController<TResult>
    {
        // 状态转换逻辑
        public override void OnActionSuccess(Context context)
        {
            // 锁定当前状态,确保线程安全
            using var _ = TimedLock.Lock(Lock);
    
            switch (InternalCircuitState)
            {
                // 如果当前状态是半开,则重置熔断器
                case CircuitState.HalfOpen:
                    OnCircuitReset(context);
                    break;
    
                // 如果当前状态是关闭,则不做任何处理
                case CircuitState.Closed:
                    break;
    
                // 如果当前状态是打开或隔离,则不做任何处理
                case CircuitState.Open:
                case CircuitState.Isolated:
                    break;
    
                // 处理未定义的状态
                default:
                    throw new InvalidOperationException("Unhandled CircuitState.");
            }
    
            // 增加成功次数的计数
            _metrics.IncrementSuccess_NeedsLock();
        }
    
        public override void OnActionFailure(DelegateResult<TResult> outcome, Context context)
        {
            // 锁定当前状态,确保线程安全
            using var _ = TimedLock.Lock(Lock);
    
            // 记录最后一次失败结果
            LastOutcome = outcome;
    
            switch (InternalCircuitState)
            {
                // 如果当前状态是半开,则触发熔断
                case CircuitState.HalfOpen:
                    Break_NeedsLock(context);
                    return;
    
                // 如果当前状态是关闭,增加失败次数的计数,并检查是否需要触发熔断
                case CircuitState.Closed:
                    _metrics.IncrementFailure_NeedsLock();
                    var healthCount = _metrics.GetHealthCount_NeedsLock();
    
                    int throughput = healthCount.Total;
                    if (throughput >= _minimumThroughput && (double)healthCount.Failures / throughput >= _failureThreshold)
                    {
                        Break_NeedsLock(context);
                    }
    
                    break;
    
                // 如果当前状态是打开或隔离,增加失败次数的计数
                case CircuitState.Open:
                case CircuitState.Isolated:
                    _metrics.IncrementFailure_NeedsLock();
                    break;
    
                // 处理未定义的状态
                default:
                    throw new InvalidOperationException("Unhandled CircuitState.");
            }
        }
    }
    

    在前面代码,来自 Polly 库中的 CircuitBreaker 组件,用于管理高级熔断器的状态和操作,处理熔断器的状态转换逻辑,包括成功和失败时的状态变化。AdvancedCircuitController<TResult> 类继承自 CircuitStateController<TResult>,用于管理熔断器的状态和操作。

    方法 OnActionSuccess 处理操作成功时的状态转换逻辑。首先使用 TimedLock 锁定当前状态,确保线程安全。然后根据 InternalCircuitState 判断当前熔断器的状态,并进行相应处理。如果当前状态是半开状态,则调用 OnCircuitReset 方法重置熔断器;如果是关闭状态,则不做任何处理;如果是打开或隔离状态,也不做处理;如果是未定义状态,则抛出 InvalidOperationException 异常。最后,调用 _metrics.IncrementSuccess_NeedsLock 方法增加成功次数的计数。

    方法 OnActionFailure 处理操作失败时的状态转换逻辑。首先使用 TimedLock 锁定当前状态,确保线程安全。然后记录最后一次失败结果 LastOutcome。根据 InternalCircuitState 判断当前熔断器的状态,并进行相应处理。如果当前状态是半开状态,则调用 Break_NeedsLock 方法触发熔断并返回;如果是关闭状态,则增加失败次数的计数,并检查是否需要触发熔断。如果达到失败阈值,则触发熔断;如果是打开或隔离状态,则增加失败次数的计数,不做其他处理;如果是未定义状态,则抛出 InvalidOperationException 异常。

二、限流机制

2.1 限流的概念

限流机制是指在系统中通过控制单位时间内处理请求的数量,以防止因请求过多而导致系统资源耗尽或过载的一种保护性策略。简单来说,它设定了一个阈值,当请求数量超过这个预设上限时,多余的请求将被拒绝或延迟处理,从而保护后端服务不被瞬时大量流量压垮。

常见的限流算法包括:

  • 令牌桶算法:系统按固定速率生成令牌,只有持有令牌的请求才能被处理,多余的请求则被暂存或拒绝;
  • 漏桶算法:将请求按固定速率处理,超出部分则排队等待或直接丢弃;
  • 固定窗口和滑动窗口计数器:在一个固定或滑动的时间窗口内计数请求数,超过预定限制后拒绝新的请求。

通过使用限流机制,系统可以在面对突发流量或恶意攻击时保持稳定性和高可用性,同时也有助于平滑流量,防止短时过载引起的连锁反应,从而保障整体服务的响应速度和质量。

2.2 Ocelot 限流配置
  1. 限流配置项介绍
    Ocelot 的限流功能可以在每个路由的基础上进行配置,在路由配置中添加以下部分:
    "RateLimitOptions": {
        "EnableRateLimiting": true,
        "Period": "1m",
        "PeriodTimespan": 1,
        "Limit": 100
    }
    
    在上面的配置中,EnableRateLimiting 表示是否启用限流功能。Period 表示限流周期的单位,可以是秒(s)、分钟(m)、小时(h)、天(d)等。PeriodTimespan 表示周期的时间长度,例如,Period1mPeriodTimespan1 表示限流周期为 1 分钟。Limit 表示在一个周期内允许的最大请求数。
2.3 限流的实现过程

我们来看一下Ocelot实现限流的核心类 RateLimitingMiddleware,它主要包括以下几个部分:

  1. Invoke方法
    Invoke 方法是中间件的核心逻辑,用于处理每个HTTP请求。

    public async Task Invoke(HttpContext httpContext)
    {
        var downstreamRoute = httpContext.Items.DownstreamRoute();
        var options = downstreamRoute.RateLimitOptions;
    
        // 检查是否启用限流
        if (!downstreamRoute.EnableEndpointEndpointRateLimiting)
        {
            Logger.LogInformation(() => $"{nameof(DownstreamRoute.EnableEndpointEndpointRateLimiting)} is not enabled for downstream path: {downstreamRoute.DownstreamPathTemplate.Value}");
            await _next.Invoke(httpContext);
            return;
        }
    
        // 计算身份
        var identity = SetIdentity(httpContext, options);
    
        // 检查白名单
        if (IsWhitelisted(identity, options))
        {
            Logger.LogInformation(() => $"{downstreamRoute.DownstreamPathTemplate.Value} is white listed from rate limiting");
            await _next.Invoke(httpContext);
            return;
        }
    
        var rule = options.RateLimitRule;
        if (rule.Limit > 0)
        {
            // 增加计数器
            var counter = _limiter.ProcessRequest(identity, options);
    
            // 检查是否超过限制
            if (counter.TotalRequests > rule.Limit)
            {
                var retryAfter = _limiter.RetryAfter(counter, rule);
                LogBlockedRequest(httpContext, identity, counter, rule, downstreamRoute);
    
                // 返回配额超限响应
                var ds = ReturnQuotaExceededResponse(httpContext, options, retryAfter.ToString(CultureInfo.InvariantCulture));
                httpContext.Items.UpsertDownstreamResponse(ds);
                httpContext.Items.SetError(new QuotaExceededError(GetResponseMessage(options), options.HttpStatusCode));
                return;
            }
        }
    
        // 设置X-Rate-Limit头
        if (!options.DisableRateLimitHeaders)
        {
            var originalContext = _contextAccessor?.HttpContext;
            if (originalContext != null)
            {
                var headers = _limiter.GetHeaders(originalContext, identity, options);
                originalContext.Response.OnStarting(SetRateLimitHeaders, state: headers);
            }
        }
    
        await _next.Invoke(httpContext);
    }
    

    前面这段代码实现了一个限流中间件,用于控制每个客户端对下游服务的请求频率。

    首先通过 httpContext.Items.DownstreamRoute() 获取当前请求的下游路由信息,并从中提取限流配置 options。接着,检查当前路由是否启用了限流功能 EnableEndpointEndpointRateLimiting,如果未启用,则记录日志并继续执行下一个中间件。

    然后通过 SetIdentity 方法计算请求的身份信息,通常是根据请求头中的客户端标识符来确定。如果该身份在白名单中,则记录日志并继续执行下一个中间件。

    接下来从限流配置中获取限流规则 rule,检查其限制值 Limit 是否大于0。如果大于0,则调用 _limiter.ProcessRequest 方法增加请求计数器,并返回当前计数信息 counter。如果请求总数 TotalRequests 超过了限制值 rule.Limit,则计算重试时间 retryAfter,记录被阻止的请求日志,并返回配额超限响应 ReturnQuotaExceededResponse。同时,设置错误信息并终止执行。

    最后如果未禁用限流头信息 DisableRateLimitHeaders,则获取当前上下文 originalContext,并通过 _limiter.GetHeaders 方法设置限流相关的响应头信息 X-Rate-Limit。最后,执行下一个中间件。

  2. 辅助方法:

    • SetIdentity:计算请求的身份信息。

      public virtual ClientRequestIdentity SetIdentity(HttpContext httpContext, RateLimitOptions option)
      {
          var clientId = "client";
          if (httpContext.Request.Headers.Keys.Contains(option.ClientIdHeader))
          {
              clientId = httpContext.Request.Headers[option.ClientIdHeader].First();
          }
      
          return new ClientRequestIdentity(
              clientId,
              httpContext.Request.Path.ToString().ToLowerInvariant(),
              httpContext.Request.Method.ToLowerInvariant()
              );
      }
      

      这段代码的作用是从 HTTP 请求中提取客户端标识信息,并将其封装成 ClientRequestIdentity 对象。首先,定义一个默认的客户端标识符 clientId,初始值为 "client"。然后,检查请求头中是否包含指定的客户端标识头 option.ClientIdHeader,如果包含,则将 clientId 更新为请求头中的第一个值。最后,创建并返回一个 ClientRequestIdentity 对象,包含客户端标识符、请求路径和请求方法,所有这些信息都被转换为小写形式。

    • IsWhitelisted:检查请求是否在白名单中。

      public static bool IsWhitelisted(ClientRequestIdentity requestIdentity, RateLimitOptions option)
          => option.ClientWhitelist.Contains(requestIdentity.ClientId);
      

      前面的代码用于检查一个 ClientRequestIdentity 对象是否在限流选项的白名单中。方法接受两个参数:requestIdentityoption。其中,requestIdentity 是一个包含客户端身份信息的对象,而 option 是包含限流配置的对象。方法的实现非常简洁,直接使用 option.ClientWhitelist.Contains 方法检查 requestIdentity.ClientId 是否在白名单集合 ClientWhitelist 中。如果存在,则返回 true,否则返回 false

    • LogBlockedRequest:记录被限流的请求。

      public virtual void LogBlockedRequest(HttpContext httpContext, ClientRequestIdentity identity, RateLimitCounter counter, RateLimitRule rule, DownstreamRoute downstreamRoute)
      {
          Logger.LogInformation(
              () => $"Request {identity.HttpVerb}:{identity.Path} from ClientId {identity.ClientId} has been blocked, quota {rule.Limit}/{rule.Period} exceeded by {counter.TotalRequests}. Blocked by rule[{rule.Name}]"
          );
      }
      

      这段代码很简单,这段代码定义了一个虚方法 LogBlockedRequest,它用于记录被限流规则阻止的请求信息。方法接受五个参数:httpContextidentitycounterruledownstreamRoute。一般来说,在实际项目中我们需要重写这个方法,实现我们自己的记录限流请求的逻辑。

    • ReturnQuotaExceededResponse:返回配额超限的响应。

      public virtual DownstreamResponse ReturnQuotaExceededResponse(HttpContext httpContext, RateLimitOptions option, string retryAfter)
      {
          var message = GetResponseMessage(option);
          var http = new HttpResponseMessage((HttpStatusCode)option.HttpStatusCode)
          {
              Content = new StringContent(message),
          };
      
          if (!option.DisableRateLimitHeaders)
          {
              http.Headers.TryAddWithoutValidation(HeaderNames.RetryAfter, retryAfter);
              httpContext.Response.Headers[HeaderNames.RetryAfter] = retryAfter;
          }
      
          return new DownstreamResponse(http);
      }
      

      这段代码定义了一个虚方法ReturnQuotaExceededResponse,用于生成配额超限的响应信息,并返回一个 DownstreamResponse 对象。方法接受三个参数:httpContextoptionretryAfter。在方法内部,首先通过 GetResponseMessage(option) 方法获取响应消息内容,并创建一个 HttpResponseMessage 对象 http,其状态码由 option.HttpStatusCode 指定,并将响应消息内容设置为 message。接着,检查 option.DisableRateLimitHeaders 是否为 false。如果为 false,则向响应头中添加 Retry-After 头信息,值为 retryAfter。同时,将该头信息也添加到 httpContext.Response.Headers 中。最后,创建并返回一个 DownstreamResponse 对象,包含生成的 HttpResponseMessage。通常在没有特殊需求的情况下,这个方法是不需要重写的,保持原状即可。

    • GetResponseMessage:获取配额超限的响应消息。

      private static string GetResponseMessage(RateLimitOptions option)
      {
          var message = string.IsNullOrEmpty(option.QuotaExceededMessage)
              ? $"API calls quota exceeded! maximum admitted {option.RateLimitRule.Limit} per {option.RateLimitRule.Period}."
              : option.QuotaExceededMessage;
          return message;
      }
      

      上面这段代码定义了一个静态方法 GetResponseMessage,用于生成当请求配额超限时的响应消息,在这里就不多讲了。

    • SetRateLimitHeaders:设置限流的响应头。

      private static Task SetRateLimitHeaders(object state)
      {
          var limitHeaders = (RateLimitHeaders)state;
          var headers = limitHeaders.Context.Response.Headers;
          headers[RateLimitingHeaders.X_Rate_Limit_Limit] = limitHeaders.Limit;
          headers[RateLimitingHeaders.X_Rate_Limit_Remaining] = limitHeaders.Remaining;
          headers[RateLimitingHeaders.X_Rate_Limit_Reset] = limitHeaders.Reset;
          return Task.CompletedTask;
      }
      

      在上面代码中定义了一个静态方法 SetRateLimitHeaders,用于设置响应中的限流头信息。方法接受一个参数 state,并返回一个 Task 对象。

      首先,将 state 转换为 RateLimitHeaders 类型的对象 limitHeadersRateLimitHeaders 类包含了限流信息,包括 HTTP 上下文、限流的最大请求数、剩余请求数和重置时间。

      然后,通过 limitHeaders.Context.Response.Headers 获取响应头的集合 headers。接着,使用 headers 设置限流相关的头信息:

      • X-Rate-Limit-Limit:表示限流的最大请求数,设置为 limitHeaders.Limit
      • X-Rate-Limit-Remaining:表示剩余的可用请求数,设置为 limitHeaders.Remaining
      • X-Rate-Limit-Reset:表示限流重置的时间,设置为 limitHeaders.Reset

三、总结

在微服务架构中,熔断与限流机制是保障系统稳定性和高可用性的关键手段。熔断机制通过监控服务健康状态,在请求失败率或超时时间超过阈值时,自动中断对故障服务的调用,防止错误扩散。熔断器有关闭(正常)、打开(拒绝请求)和半开(尝试恢复)三种状态,确保系统在故障发生时能迅速隔离问题。限流机制通过控制单位时间内的请求数量,防止因流量激增导致系统资源耗尽或服务崩溃。常见限流算法包括令牌桶、漏桶和滑动窗口。Ocelot网关通过QoSOptions配置熔断(如异常次数、熔断持续时间、超时时间)和RateLimitOptions配置限流(如周期和最大请求数)。熔断与限流结合使用,可在服务异常时快速恢复,并在高并发场景下平衡负载,提升系统的响应能力和健壮性。