ABP VNext + Webhook:订阅与异步回调

发布于:2025-05-25 ⋅ 阅读:(22) ⋅ 点赞:(0)

🚀 ABP VNext + Webhook:订阅与异步回调



🎯 一、背景切入:如何优雅地支持第三方回调?

在现代分布式系统中,Webhook 是实现系统解耦和异步通知的重要手段,广泛用于支付通知、审核结果返回、消息推送等场景。但在实践中,我们需要同时解决以下挑战:

  • 🔐 安全防护:如何防止伪造请求?
  • 🔄 幂等控制:如何避免重复处理同一事件?
  • ⚙️ 失败重试:如何确保最终一致性,并避免无限重试?
  • 💼 多厂商 & 多通道:如何优雅地支持不同支付/消息通道?
  • 📊 可观测 & 可运维:如何快速诊断、监控并手动补偿?

🏗 二、系统架构设计

支付厂商
🔐 签名验证
🔄 幂等校验
💾 日志持久化
💾 日志持久化
🔄 重试调度
📈 指标埋点
📈 指标埋点
📊 可视化
微信支付
支付宝
Stripe
Webhook 接收中心
签名验证服务
幂等校验服务
WebhookLog 存储
后台重试调度中心
Prometheus
Grafana

🔍 三、核心能力实现

3.1 🔐 签名验证(防伪造)

接口定义
public interface ISignatureVerifier
{
    /// <summary>从安全配置中心获取 Secret</summary>
    string GetSecret(string provider);

    /// <summary>签名 Header 名</summary>
    string HeaderName { get; }

    bool Verify(string payload, string signature);
}
实现示例(Wxpay)
public class WxSignatureVerifier : ISignatureVerifier, ITransientDependency
{
    private readonly IDynamicParameterStore _paramStore;
    public string HeaderName { get; } = "X-Wxpay-Signature";

    public WxSignatureVerifier(IDynamicParameterStore paramStore)
        => _paramStore = paramStore;

    public string GetSecret(string provider)
        => _paramStore.GetOrNullAsync($"Webhook:Secret:{provider}")
           .GetAwaiter().GetResult()
           ?? throw new BusinessException("未配置签名 Secret");

    public bool Verify(string payload, string signature)
    {
        var secret   = GetSecret("Wxpay");
        var expected = ComputeHmac(payload, secret);
        return ConstantTimeEquals(expected, signature);
    }

    private static string ComputeHmac(string data, string key)
    {
        using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
        return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(data)))
                      .ToLowerInvariant();
    }

    private static bool ConstantTimeEquals(string a, string b)
    {
        if (a.Length != b.Length) return false;
        int diff = 0;
        for (int i = 0; i < a.Length; i++)
            diff |= a[i] ^ b[i];
        return diff == 0;
    }
}

3.2 🔄 幂等控制(防重复处理)

接口与实现
public interface IIdempotencyService
{
    Task<bool> IsProcessedAsync(string eventId);
    Task<bool> TryProcessAsync(string eventId, Func<Task> handler);
}
public class IdempotencyService : IIdempotencyService, ITransientDependency
{
    private readonly IDistributedCache        _cache;
    private readonly IDistributedLockProvider _lockProvider;

    public IdempotencyService(
        IDistributedCache cache,
        IDistributedLockProvider lockProvider)
    {
        _cache        = cache;
        _lockProvider = lockProvider;
    }

    public async Task<bool> IsProcessedAsync(string eventId)
        => await _cache.GetStringAsync(Key(eventId)) != null;

    public async Task<bool> TryProcessAsync(string eventId, Func<Task> handler)
    {
        var lockName = $"webhook:lock:{eventId}";
        var locker   = _lockProvider.Create(lockName);
        using var handle = await locker.TryAcquireAsync(TimeSpan.FromSeconds(5));
        if (handle == null)
            return false; // 获取锁失败

        if (await IsProcessedAsync(eventId))
            return true;  // 已处理

        // 真正执行业务
        await handler.Invoke();

        // 缓存标记
        await _cache.SetStringAsync(
            Key(eventId),
            "1",
            new DistributedCacheEntryOptions {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(2)
            });
        return true;
    }

    private static string Key(string id) => $"webhook:processed:{id}";
}

🛠️ 3.3 多厂商处理策略

接口
public interface IPaymentWebhookHandler : ITransientDependency
{
    string Provider { get; }
    Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers);
}
策略工厂
public class WebhookHandlerFactory : ITransientDependency
{
    private readonly IEnumerable<IPaymentWebhookHandler> _handlers;

    public WebhookHandlerFactory(IEnumerable<IPaymentWebhookHandler> handlers)
        => _handlers = handlers;

    public IPaymentWebhookHandler Get(string provider)
        => _handlers
           .FirstOrDefault(h =>
               h.Provider.Equals(provider, StringComparison.OrdinalIgnoreCase))
        ?? throw new BusinessException($"不支持的厂商:{provider}");
}

🔁 四、关键流程图

4.1 请求处理流程

POST /api/webhooks/payments/:provider
失败
成功
第一次
重复
客户端请求
Controller
读取原始 Body
🔐 签名校验
返回 401 Unauthorized
提取 eventId
🔄 TryProcessAsync
执行业务 & 持久化日志
返回 Duplicate
返回 OK
📈 记录 Prometheus 指标
结束

4.2 重试工作流程

成功
失败
Yes
No
定时触发 Worker
查询 Failed & RetryCount<10
遍历日志
获取对应 Handler
尝试处理
设置 Success
RetryCount++
RetryCount>5?
设置 Dead + 告警
指数退避等待
更新日志
处理下一个
完成

🛠️ 3.5 接收控制器(统一入口)

[Route("api/webhooks/payments")]
public class WebhookController : AbpController
{
    private readonly ISignatureVerifier    _verifier;
    private readonly WebhookHandlerFactory _factory;
    private readonly IIdempotencyService   _idem;
    private readonly IRepository<WebhookLog, Guid> _logRepo;

    public WebhookController(
        ISignatureVerifier verifier,
        WebhookHandlerFactory factory,
        IIdempotencyService idem,
        IRepository<WebhookLog, Guid> logRepo)
    {
        _verifier = verifier;
        _factory  = factory;
        _idem     = idem;
        _logRepo  = logRepo;
    }

    [HttpPost("{provider}")]
    public async Task<IActionResult> HandleAsync(string provider)
    {
        // 1️⃣ 读取原始 Body
        using var sr = new StreamReader(Request.Body);
        var payload  = await sr.ReadToEndAsync();

        // 2️⃣ 签名校验
        var signature = Request.Headers[_verifier.HeaderName].FirstOrDefault();
        if (signature == null || !_verifier.Verify(payload, signature))
            return Unauthorized(new { code = 1001, message = "Invalid signature" });

        // 3️⃣ 提取 EventId
        string eventId;
        try
        {
            var obj     = JObject.Parse(payload);
            eventId     = obj["eventId"]?.ToString() ?? throw new FormatException();
        }
        catch
        {
            return BadRequest(new { code = 1002, message = "Invalid payload" });
        }

        // 4️⃣ 幂等 & 业务处理
        var success = await _idem.TryProcessAsync(eventId, async () =>
        {
            var handler = _factory.Get(provider);
            var result  = await handler.HandleAsync(
                payload,
                Request.Headers.ToDictionary(
                    h => h.Key, h => h.Value.FirstOrDefault()));

            // 5️⃣ 持久化日志
            await _logRepo.InsertAsync(new WebhookLog
            {
                Provider   = provider,
                Payload    = payload,
                EventId    = eventId,
                RetryCount = 0,
                Status     = result.Success
                             ? WebhookStatus.Success
                             : WebhookStatus.Failed
            });
        });

        // 6️⃣ 指标埋点
        Metrics.WebhookProcessed
               .WithLabels(provider, success ? "ok" : "duplicate")
               .Inc();

        return Ok(new { code = 0, message = success ? "OK" : "Duplicate" });
    }
}

📈 五、DevOps & 监控

1. Prometheus 指标

   public static class Metrics
   {
       public static readonly Counter WebhookProcessed =
           Metrics.CreateCounter(
             "webhook_processed_total",
             "Webhook 处理总数",
             new CounterConfiguration {
               LabelNames = new [] { "provider", "status" }
             });
   }
   services.AddPrometheusMetrics();
   app.UseMetricServer();    // /metrics
   app.UseHttpMetrics();     // HTTP 请求指标

2. 健康检查

   services.AddHealthChecks()
       .AddCheck<RedisHealthCheck>("redis")
       .AddSqlServer(connStr, name: "sql")
       .AddCheck<CustomWebhookHealthCheck>("webhook_receive");
   app.UseHealthChecks("/health");
  1. 容器部署(docker-compose.yml
   version: '3.8'
   services:
     api:
       image: yourrepo/webhook-api:latest
       ports:
         - "5000:80"
       healthcheck:
         test: ["CMD", "curl", "-f", "http://localhost/health"]
         interval: 30s
         retries: 3
     redis:
       image: redis:6
     db:
       image: mcr.microsoft.com/mssql/server:2019-latest
       environment:
         - ACCEPT_EULA=Y
         - SA_PASSWORD=Your_password123

✅ 六、测试

6.1 单元测试

6.1.1 签名校验测试(xUnit)
public class SignatureVerifierTests
{
    private readonly ISignatureVerifier _verifier;

    public SignatureVerifierTests()
    {
        // 这里用测试版本的 DynamicParameterStore 返回固定 secret
        var paramStore = A.Fake<IDynamicParameterStore>();
        A.CallTo(() => paramStore.GetOrNullAsync("Webhook:Secret:Wxpay"))
         .Returns(Task.FromResult<string>("test-secret"));

        _verifier = new WxSignatureVerifier(paramStore);
    }

    [Theory]
    [InlineData("payload", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")]  // 示例 hash
    public void Verify_ValidSignature_ReturnsTrue(string payload, string signature)
    {
        var result = _verifier.Verify(payload, signature);
        Assert.True(result);
    }

    [Fact]
    public void Verify_InvalidSignature_ReturnsFalse()
    {
        var result = _verifier.Verify("payload", "bad-signature");
        Assert.False(result);
    }
}
6.1.2 幂等服务并发安全测试
public class IdempotencyServiceTests
{
    [Fact]
    public async Task TryProcessAsync_FirstConcurrency_OnlyOnceExecuted()
    {
        var cache = new MemoryDistributedCache(new OptionsWrapper<MemoryDistributedCacheOptions>(new MemoryDistributedCacheOptions()));
        var lockProvider = new DefaultDistributedLockProvider(); // 假设已实现
        var service = new IdempotencyService(cache, lockProvider);

        int executeCount = 0;
        Func<Task> handler = async () =>
        {
            await Task.Delay(50);
            Interlocked.Increment(ref executeCount);
        };

        // 并发 10 次调用
        var tasks = Enumerable.Range(0, 10)
            .Select(_ => service.TryProcessAsync("evt-1", handler))
            .ToArray();

        await Task.WhenAll(tasks);

        // handler 只应执行一次
        Assert.Equal(1, executeCount);
    }
}
6.1.3 重试 Worker 测试
public class WebhookRetryWorkerTests
{
    [Fact]
    public async Task DoWorkAsync_FailedLogs_ExponentialBackoffAndDeadLetter()
    {
        // 准备内存仓库
        var logs = new List<WebhookLog>
        {
            new WebhookLog { Id = Guid.NewGuid(), Provider="Wxpay", Payload="p", EventId="1", RetryCount=5, Status=WebhookStatus.Failed }
        };
        var repo = new InMemoryRepository<WebhookLog, Guid>(logs);
        var fakeFactory = A.Fake<WebhookHandlerFactory>();
        // 模拟每次抛异常
        A.CallTo(() => fakeFactory.Get(A<string>._))
         .Returns(new FailingHandler());

        var worker = new WebhookRetryWorker(repo, fakeFactory);
        using var cts = new CancellationTokenSource();
        await worker.DoWorkAsync(cts.Token);

        var updated = logs.Single();
        Assert.Equal(WebhookStatus.Dead, updated.Status);
        Assert.Equal(6, updated.RetryCount);
    }

    private class FailingHandler : IPaymentWebhookHandler
    {
        public string Provider => "Wxpay";
        public Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers)
            => throw new Exception("fail");
    }
}

6.2 集成测试(Testcontainers)

public class WebhookIntegrationTests : IAsyncLifetime
{
    private readonly TestcontainerDatabase _redisContainer;
    private readonly TestcontainerDatabase _sqlContainer;

    public WebhookIntegrationTests()
    {
        _redisContainer = new TestcontainersBuilder<TestcontainersDatabase>()
            .WithDatabase(new RedisTestcontainerConfiguration())
            .Build();

        _sqlContainer = new TestcontainersBuilder<TestcontainersDatabase>()
            .WithDatabase(new MsSqlTestcontainerConfiguration
            {
                Password = "Your_password123"
            })
            .Build();
    }

    public async Task InitializeAsync()
    {
        await _redisContainer.StartAsync();
        await _sqlContainer.StartAsync();
        // 这里可以动态构建 IConfiguration 并启动 TestServer
    }

    public async Task DisposeAsync()
    {
        await _redisContainer.DisposeAsync();
        await _sqlContainer.DisposeAsync();
    }

    [Fact]
    public async Task FullWebhookFlow_ReturnsOk()
    {
        // 使用 TestServer 调用 API
        var client = TestWebApplicationFactory.CreateClient(new Dictionary<string, string>
        {
            ["ConnectionStrings:Redis"] = _redisContainer.ConnectionString,
            ["ConnectionStrings:Default"] = _sqlContainer.ConnectionString
        });

        var payload   = "{\"eventId\":\"evt-100\",\"data\":{}}";
        var signature = ComputeTestSignature(payload, "test-secret");
        var response  = await client.PostAsync(
            "/api/webhooks/payments/Wxpay",
            new StringContent(payload, Encoding.UTF8, "application/json"));
        Assert.Equal(HttpStatusCode.OK, response.StatusCode);
        // 再次调用应返回 Duplicate
        response = await client.PostAsync(
            "/api/webhooks/payments/Wxpay",
            new StringContent(payload, Encoding.UTF8, "application/json"));
        var json = await response.Content.ReadAsStringAsync();
        Assert.Contains("Duplicate", json);
    }

    private static string ComputeTestSignature(string payload, string secret)
    {
        using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));
        return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(payload)))
                      .ToLowerInvariant();
    }
}

说明

  • 使用 DotNet.Testcontainers 启动 Redis 和 SQL Server;
  • 通过 TestWebApplicationFactory 启动完整 ASP.NET Core 应用;
  • 验证首次处理和幂等结果。


网站公告

今日签到

点亮在社区的每一天
去签到