Redis + ABP vNext 构建分布式高可用缓存架构

发布于:2025-05-17 ⋅ 阅读:(19) ⋅ 点赞:(0)

🚀 Redis + ABP vNext 构建分布式高可用缓存架构


🔧 环境准备

  • 开发环境

    • .NET 8.0 SDK
    • Visual Studio 2022 / VS Code
    • Docker & Docker Compose
  • NuGet 包

    • Volo.Abp.Caching.StackExchangeRedis v8.1.5
    • Volo.Abp.DistributedLocking.StackExchangeRedis v8.1.5
    • Volo.Abp.EventBus.Distributed.Redis v8.1.5
    • Polly v7.2.3
  • Global using

    using System;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Caching.Distributed;
    using Microsoft.Extensions.Caching.Memory;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;
    using Polly;
    using StackExchange.Redis;
    using Volo.Abp;
    using Volo.Abp.Caching;
    using Volo.Abp.Caching.StackExchangeRedis;
    using Volo.Abp.DistributedLocking;
    using Volo.Abp.DistributedLocking.StackExchangeRedis;
    using Volo.Abp.EventBus.Distributed;
    using Volo.Abp.EventBus.Distributed.Redis;
    

项目结构示例

src/
├─ MyApp/
│  ├─ MyAppModule.cs
│  ├─ Program.cs
│  └─ Services/
│     └─ ProductService.cs
└─ test/
   └─ MyApp.Tests/
      └─ CacheTests.cs


1️⃣ 背景与需求 🎯

分布式微服务场景下,缓存可显著提升性能和降低数据库压力,但需解决:

  • 🔄 多节点更新 导致数据不一致
  • 💥 热点缓存失效 引发击穿
  • 🔄 节点扩缩容 后本地缓存未命中

量化场景

  • QPS:10,000 req/s
  • 读写比:95% 读 / 5% 写
  • 缓存命中率目标:>95%

需求:构建 高可用高性能可复现 的缓存系统。


2️⃣ 架构设计:ABP vNext + Redis 分布式缓存

ABP Node1
Redis
ABP Node2
Pub/Sub / EventBus
  • 🗝️ Redis 统一缓存源
  • 📣 Pub/Sub / 分布式事件广播失效
  • 🔁 本地 IMemoryCache + 分布式 IDistributedCache 二级缓存

3️⃣ 搭建 Redis 集群(Sentinel & Cluster)

🔹 Sentinel HA 示例(docker-compose.yml)

version: '3'
services:
  redis-master:
    image: redis:6.2
    volumes:
      - ./redis-master.conf:/usr/local/etc/redis/redis.conf
    command: redis-server /usr/local/etc/redis/redis.conf
    ports: ["6379:6379"]

  redis-slave:
    image: redis:6.2
    command: redis-server --slaveof redis-master 6379
    ports: ["6380:6379"]

  sentinel-1:
    image: redis:6.2
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    ports: ["26379:26379"]
# 同理配置 sentinel-2、sentinel-3 …

sentinel.conf:

sentinel monitor mymaster redis-master 6379 2
sentinel auth-pass mymaster <password>

🔹 Cluster 分片示例

version: '3'
services:
  redis-node-1:
    image: bitnami/redis-cluster:6
    environment:
      - REDIS_PASSWORD=yourpassword
    ports: ["7000:7000","7001:7001"]
  # 至少 6 个节点...

集群创建:

docker exec -it <container> \
  redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 ... \
  --cluster-replicas 1 --cluster-yes

4️⃣ Program.cs 最小示例

🚀 启动流程示意图

启动 ASP.NET Core
AddApplication
Build & InitializeApplication
Run 服务
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddApplication<MyAppModule>();

var app = builder.Build();
app.InitializeApplication();
app.Run();

5️⃣ ABP vNext 分布式部署与配置 🛠️

1. 模块依赖与预配置

[DependsOn(
  typeof(AbpCachingStackExchangeRedisModule),
  typeof(AbpDistributedLockingStackExchangeRedisModule),
  typeof(AbpEventBusDistributedRedisModule)
)]
public class MyAppModule : AbpModule
{
  public override void PreConfigureServices(ServiceConfigurationContext ctx)
  {
    PreConfigure<AbpDistributedCacheOptions>(o => o.KeyPrefix = "MyApp:");
    PreConfigure<AbpDistributedEventBusOptions>(o =>
    {
      o.IsEnabled = true;
      o.BusConfigurationOptions = ConfigurationOptions.Parse(
        ctx.Services.GetConfiguration()["Redis:Configuration"]);
    });
  }
}

2. 服务注册

public override void ConfigureServices(ServiceConfigurationContext ctx)
{
  var config = ctx.Services.GetConfiguration();

  ctx.Services.AddStackExchangeRedisCache(opts =>
  {
    opts.Configuration = config["Redis:Configuration"];
    opts.InstanceName  = config["Redis:InstanceName"];
  });

  ctx.Services.AddAbpDistributedLocking(o =>
    o.UseRedis(config["Redis:Configuration"]));

  ctx.Services.AddAbpDistributedEventBusRedis();
  ctx.Services.AddMemoryCache();
}

6️⃣ 缓存设计与二级缓存策略 🔑

🔄 服务缓存调用流程

Client 请求 GetProductAsync
IMemoryCache 有值?
直接返回本地缓存
调用 IDistributedCache.GetOrAddAsync
到数据库查询
设置分布式缓存
写入本地缓存
返回结果

缓存 Key 常量

public static class CacheKeyConsts
{
  public const string Product = "product:";
}

GetOrAddAsync + 空值占位

public async Task<ProductDto> GetProductAsync(Guid id)
{
  var key = CacheKeyConsts.Product + id;
  var dto = await _distributedCache.GetOrAddAsync(
    key,
    async () =>
    {
      var r = await _repository.FindAsync(id);
      if (r == null)
      {
        await _distributedCache.SetAsync(key, NullObject.Value,
          new DistributedCacheEntryOptions
            { AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(30) });
        return NullObject.Value;
      }
      return r;
    },
    opts => opts.SetAbsoluteExpiration(TimeSpan.FromMinutes(5))
  );

  _memoryCache.Set(key, dto, TimeSpan.FromSeconds(30));
  return dto;
}

7️⃣ 分布式锁与缓存击穿保护 🔒

🔒 分布式锁获取流程

成功
失败
调用 GetWithLockAsync
TryAcquireAsync 请求锁
执行业务逻辑 & 更新缓存
降级:直接数据库查询
释放锁
结束流程
public class ProductService
{
  private readonly IDistributedLockProvider _lock;
  private readonly IDistributedCache _cache;
  private readonly IRepository<Product, Guid> _repo;
  private readonly ILogger<ProductService> _logger;

  public ProductService(
    IDistributedLockProvider lockProvider,
    IDistributedCache cache,
    IRepository<Product, Guid> repo,
    ILogger<ProductService> logger)
  {
    _lock   = lockProvider;
    _cache  = cache;
    _repo   = repo;
    _logger = logger;
  }

  public async Task<ProductDto> GetWithLockAsync(Guid id)
  {
    var key = CacheKeyConsts.Product + id;
    using var scope = _logger.BeginScope("Lock:{Key}", key);
    await using var handle = await _lock.TryAcquireAsync(
      "lock:" + key,
      TimeSpan.FromSeconds(3),
      TimeSpan.FromSeconds(5));

    if (handle == null)
    {
      _logger.LogWarning("Lock failed for {Key}", key);
      return await _repo.FindAsync(id);
    }

    var result = await _repo.FindAsync(id);
    await Policy.Handle<RedisException>()
      .RetryAsync(3)
      .ExecuteAsync(() => _cache.SetAsync(key, result));

    return result;
  }
}

8️⃣ 缓存失效同步:Redis Pub/Sub / 分布式事件总线 📣

事件定义

public class CacheInvalidatedEvent : EventData
{
  public string Key { get; }
  public Guid Id { get; }
  public CacheInvalidatedEvent(string key, Guid id) { Key = key; Id = id; }
}

发布 & 处理

await _distributedEventBus.PublishAsync(
  new CacheInvalidatedEvent(CacheKeyConsts.Product, productId));

public class CacheInvalidatedHandler : IDistributedEventHandler<CacheInvalidatedEvent>
{
  private readonly IDistributedCache _cache;
  public CacheInvalidatedHandler(IDistributedCache cache) => _cache = cache;

  public async Task HandleEventAsync(CacheInvalidatedEvent e)
  {
    await _cache.RemoveAsync(e.Key + ":" + e.Id);
  }
}
ctx.Services.AddTransient<
  IDistributedEventHandler<CacheInvalidatedEvent>,
  CacheInvalidatedHandler>();

9️⃣ 性能调优与监控 📈

OpenTelemetry + Grafana

services.AddOpenTelemetry()
  .WithTracing(b => b
    .AddAspNetCoreInstrumentation()
    .AddRedisInstrumentation()
    .AddConsoleExporter());
{
  "panels":[{"title":"Redis RTT","type":"graph","targets":[{"expr":"redis_command_duration_seconds_bucket"}]}]
}

埋点:缓存命中/未命中、锁竞争次数、异常重试日志。


🔟 单元测试示例 🧪

public class CacheTests : AbpIntegratedTest<MyAppModule>
{
  [Fact]
  public async Task Should_Remove_Cache_On_Invalidate_Event()
  {
    var bus   = ServiceProvider.GetRequiredService<IDistributedEventBus>();
    var cache = ServiceProvider.GetRequiredService<IDistributedCache>();
    var id    = Guid.NewGuid();
    var key   = CacheKeyConsts.Product + id;

    await cache.SetAsync(key, new ProductDto { Id = id });
    await bus.PublishAsync(new CacheInvalidatedEvent(CacheKeyConsts.Product, id));

    var val = await cache.GetAsync<ProductDto>(key);
    Assert.Null(val);
  }
}

欢迎点赞 ⭐、讨论交流!


网站公告

今日签到

点亮在社区的每一天
去签到