差分隐私在运营指标:ABP 的 DP 计数器与噪声预算

发布于:2025-09-04 ⋅ 阅读:(20) ⋅ 点赞:(0)

🚦 差分隐私在运营指标:ABP 的 DP 计数器与噪声预算



0. TL;DR 🚀

  • 👤 隐私单元(Privacy Unit)选择 用户/设备级,写路径先按隐私单元聚合并剪裁贡献上界 K
  • ➕ 计数/求和 → Laplace(尺度 b = Δ/ε);📈 比例/均值 → Gaussian(生产建议解析高斯校准 σ;示例以经典上界为保守缺省)。
  • 🧩 后处理不降隐私:结果可做非负截断、舍入、分桶、平滑等业务约束。
  • 📒 预算账本:按“租户×指标×窗口开始时刻(UTC)”累计 ε/δ;Key 与窗口严格对齐;Lua 原子消费;TTL=窗口剩余秒数;超额硬阻断
  • 🔒 发布幂等:周期任务在每个窗口结束时发布上一窗(UTC);用 Redis SET NX 建立 released 锁,保证只发布一次(并可 Upsert 落库)。
  • 🧪 灰度与 A/B:按租户/功能开关;以 MAPE / P95 误差阈值放量;随时回滚。

📈 一图看懂(写入→发布→预算→加噪)

发布侧(按窗)
no
yes
no
yes
no
yes
PrevWindow(UTC)
Worker 定时触发
released 锁 SET NX
聚合(ws,we)
distinct ≥ MinUnits?
记录抑制 & 退出
TryConsume Lua 原子预算
记录超额 & 退出
加噪(Laplace/Gaussian)
后处理(非负/舍入)
发布 Upsert & 审计
RelKey TTL = 窗口剩余秒数
写入侧
按隐私单元聚合
事件/埋点
剪裁 K
存储

1. 背景与边界 🧭

  • 🔐 与加密/RLS:加密/RLS 决定“谁能看真值”;差分隐私(DP)约束“公开后能泄露多少”。
  • ✅ 适用:DAU/留存、功能调用次数、错误率、漏斗流量等群体指标;⛔ 不用于强一致计费或逐个体对账。
  • ⚠️ 风险与对策:合谋/跨窗差分/滑动窗反推 → 隐私预算最小样本门槛发布限频审计追溯

2. DP 工程口径 📐

  • 📏 敏感度 Δ:单个隐私单元在单窗口的最大贡献(经 K 剪裁后)。

  • 🔊 Laplaceb = Δ/ε;❄️ Gaussianσ = f(ε, δ, Δ)(生产建议“解析高斯”或接入 OpenDP 校准;示例用经典上界)。

  • 🧮 组合/会计

    • 拉普拉斯:ε 线性加和(基础组合)。
    • 高斯:用 RDP 按 α 网格累加,再对给定 δ 反推 ε*(δ) = min_α {RDP(α) + ln(1/δ)/(α-1)},展示“已用/剩余”。
  • 🧯 后处理不伤隐私:非负截断、舍入、分桶、平滑等结果级操作不降低 DP 保证。


3. 目标架构(ABP 集成)🏗️

Abp.DpMetrics 模块(应用层)

  • IDpCounter.Increment(tenant, metric, unitId, amount=1):写路径,按隐私单元聚合→剪裁 K→入库。
  • DpPublishWorker:周期发布;按窗口结束发布上一窗(UTC)发布幂等锁预算校验加噪后处理审计落表
  • IPrivacyAccountant(Redis/DB 实现):存 (tenant, metric, windowStartUtc) 的 ε/δ 用量与上限;提供Lua 原子 TryConsume
  • IBudgetPolicyProvider:预算上限(ε/δ cap)配置化,支持按租户/指标/窗口粒度下发。
  • IReleaseLog:存噪声参数、ε/δ、机制、真值哈希、seed 哈希、审批单。

组件关系图

ABP App
Abp.DpMetrics 模块
IDpCounter
DpPublishWorker
IBudgetPolicyProvider
Redis PrivacyAccountant
Redis
Repository(DB)
Feature Flags/Gray

4. 数据/配置模型 🗂️

MetricDefinition
string Name
TimeSpan Window
PrivacyUnit: User|Device
Mechanism: Laplace|Gaussian
double SensitivityDelta
int ClipKPerUnit
double Epsilon
double? Delta
int MinDistinctUnits
string TenantId
int? Seed // 仅测试环境
BudgetPolicy
string TenantId
string Metric
TimeSpan Window
double EpsilonCap
double? DeltaCap
int MinDistinctUnits
int MaxPublishesPerWindow
ReleaseRecord
Guid Id
string TenantId
string Metric
DateTimeOffset WindowStart, WindowEnd // UTC
double EpsilonUsed
double? DeltaUsed
string Mechanism
long TrueValueHash
double NoisyValue
string SeedHash
string Approver

5. 关键实现 🧩

代码为最小可运行模板,重点展示口径与关键边界处理;生产可替换为解析高斯校准、完善 RDP 会计、指标门槛配置中心化等。

5.1 接口统一(会计以窗口起点为主键)

public interface IDpCounter {
    Task IncrementAsync(string tenantId, string metric, string privacyUnitId, int amount = 1);
}

public interface IPrivacyAccountant {
    Task<bool> TryConsumeAsync(
        string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window,
        double epsilon, double? delta);

    Task<(double usedEps, double? usedDelta, double capEps, double? capDelta)>
        GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window);
}

public interface IBudgetPolicyProvider {
    (double capEps, double? capDel) GetCaps(string tenantId, string metric, TimeSpan window);
}

public enum DpMechanism { Laplace, Gaussian }

5.2 随机源与噪声(开区间采样 + 可缓存正态)

public interface IRandomSource { double NextUnit(); }

public sealed class CryptoRandom : IRandomSource {
    private readonly System.Security.Cryptography.RandomNumberGenerator _rng
        = System.Security.Cryptography.RandomNumberGenerator.Create();
    public double NextUnit(){
        Span<byte> b = stackalloc byte[8]; _rng.GetBytes(b);
        ulong u = BitConverter.ToUInt64(b);
        return (u >> 11) * (1.0 / (1UL << 53)); // [0,1)
    }
}

public static class RandUtil {
    public static double Open01(IRandomSource r) {
        double u; do { u = r.NextUnit(); } while (u <= double.Epsilon || u >= 1.0 - double.Epsilon);
        return u; // (0,1)
    }
}

public sealed class GaussianSampler {
    private readonly IRandomSource _r; private bool _has; private double _cache;
    public GaussianSampler(IRandomSource r){ _r = r; }
    public double Next(){
        if (_has){ _has=false; return _cache; }
        var u1 = RandUtil.Open01(_r); var u2 = RandUtil.Open01(_r);
        var mag = Math.Sqrt(-2.0 * Math.Log(u1));
        var z0 = mag * Math.Cos(2.0 * Math.PI * u2);
        var z1 = mag * Math.Sin(2.0 * Math.PI * u2);
        _cache = z1; _has = true; return z0; // 产出两个,缓存一个
    }
}

public static class DpNoise {
    public static double Laplace(double x, double b, IRandomSource r){
        double u = RandUtil.Open01(r) - 0.5; // (-0.5,0.5)
        double noise = -b * Math.Sign(u) * Math.Log(1 - 2 * Math.Abs(u));
        return Math.Max(0, x + noise); // 后处理:非负
    }
    public static double ClassicGaussianSigma(double deltaF, double eps, double delta){
        return Math.Sqrt(2 * Math.Log(1.25 / delta)) * deltaF / eps; // 经典上界
    }
}

5.3 窗口对齐(上一窗,UTC)与预算键

static (DateTimeOffset Start, DateTimeOffset End) PrevWindowUtc(IClock clock, TimeSpan w){
    var now = clock.Now.ToUniversalTime();
    long sec = (long)w.TotalSeconds;
    long end = (now.ToUnixTimeSeconds() / sec) * sec; // 当前窗结束(UTC)
    long start = end - sec; // 上一窗 [start,end)
    return (DateTimeOffset.FromUnixTimeSeconds(start), DateTimeOffset.FromUnixTimeSeconds(end));
}
static string BudgetKey(string tenant, string metric, TimeSpan w, DateTimeOffset windowStartUtc){
    return $"dp:budget:{tenant}:{metric}:{(int)w.TotalSeconds}:{windowStartUtc.ToUnixTimeSeconds()}";
}

5.4 Redis 会计(Lua 原子消费 + TTL 对齐“窗口剩余秒数” + 配置化上限)

public sealed class RedisPrivacyAccountant : IPrivacyAccountant {
    private readonly IConnectionMultiplexer _redis; private readonly IBudgetPolicyProvider _caps;
    public RedisPrivacyAccountant(IConnectionMultiplexer redis, IBudgetPolicyProvider caps){ _redis = redis; _caps = caps; }

    private const string LUA = @"
    local key     = KEYS[1]
    local epsUse  = tonumber(ARGV[1])
    local delUse  = (ARGV[2] ~= '' and tonumber(ARGV[2]) or nil)
    local capEps  = tonumber(ARGV[3])
    local capDel  = (ARGV[4] ~= '' and tonumber(ARGV[4]) or nil)
    local ttlSec  = tonumber(ARGV[5])

    local used = redis.call('GET', key)
    local usedEps, usedDel = 0, 0
    if used then
      local i = string.find(used, '|')
      usedEps = tonumber(string.sub(used, 1, i-1))
      usedDel = tonumber(string.sub(used, i+1))
    end

    if usedEps + epsUse > capEps then return 0 end
    if capDel and delUse and (usedDel + delUse > capDel) then return 0 end

    usedEps = usedEps + epsUse
    usedDel = usedDel + (delUse or 0)
    redis.call('SET', key, usedEps .. '|' .. usedDel)
    if ttlSec > 0 then redis.call('EXPIRE', key, ttlSec) end
    return 1
    ";

    public async Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window, double epsilon, double? delta){
        var db = _redis.GetDatabase();
        var key = BudgetKey(tenantId, metric, window, windowStartUtc);
        var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);
        var nowUtc = DateTimeOffset.UtcNow;
        var we = windowStartUtc + window;
        int ttlSec = (int)Math.Max(1, (we - nowUtc).TotalSeconds); // TTL 对齐窗口结束
        var result = (long)await db.ScriptEvaluateAsync(
            LUA,
            new RedisKey[]{ key },
            new RedisValue[]{
                epsilon,
                delta.HasValue ? (RedisValue)delta.Value : RedisValue.EmptyString,
                capEps,
                capDel.HasValue ? (RedisValue)capDel.Value : RedisValue.EmptyString,
                ttlSec
            });
        return result == 1L;
    }

    public async Task<(double, double?, double, double?)> GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window){
        var db = _redis.GetDatabase();
        var key = BudgetKey(tenantId, metric, window, windowStartUtc);
        var s = await db.StringGetAsync(key);
        double usedEps = 0, usedDelRaw = 0;
        if (!s.IsNullOrEmpty){
            var parts = ((string)s!).Split('|');
            usedEps   = double.Parse(parts[0], System.Globalization.CultureInfo.InvariantCulture);
            usedDelRaw= double.Parse(parts[1], System.Globalization.CultureInfo.InvariantCulture);
        }
        var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);
        double? usedDel = capDel.HasValue ? usedDelRaw : (double?)null;
        return (usedEps, usedDel, capEps, capDel);
    }
        var caps = _caps.GetCaps(tenantId, metric, window);
        return (usedEps, double.IsNaN(usedDel)? null : usedDel, caps.capEps, caps.capDel);
    }
}

5.5 发布 Worker(上一窗发布 + 幂等锁 + 小样本抑制 + 预算校验 + 加噪)

public sealed class DpPublishWorker : AsyncPeriodicBackgroundWorkerBase, ITransientDependency {
    private readonly IClock _clock; private readonly IDpRepository _repo;
    private readonly IPrivacyAccountant _acct; private readonly IRandomSource _rng = new CryptoRandom();
    private readonly GaussianSampler _gau; private readonly IConnectionMultiplexer _redis;

    public DpPublishWorker(AbpTimer timer, IClock clock, IDpRepository repo, IPrivacyAccountant acct, IConnectionMultiplexer redis)
        : base(timer){ _clock = clock; _repo = repo; _acct = acct; _gau = new GaussianSampler(_rng); _redis = redis; Timer.Period = 60_000; }

    protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext context){
        var ct = context.CancellationToken;
        foreach (var def in await _repo.ListMetricDefinitionsAsync()){
            ct.ThrowIfCancellationRequested();
            var (ws, we) = PrevWindowUtc(_clock, def.Window); // 发布上一窗(UTC)

            // 幂等发布锁:只允许一个实例发布该窗
            var db = _redis.GetDatabase();
            int ttlSec = (int)Math.Max(1, (we - _clock.Now.ToUniversalTime()).TotalSeconds);
            var relKey = $"dp:released:{def.TenantId}:{def.Name}:{(int)def.Window.TotalSeconds}:{ws.ToUnixTimeSeconds()}";
            bool acquired = await db.StringSetAsync(relKey, "1", TimeSpan.FromSeconds(ttlSec), when: When.NotExists);
            if (!acquired) continue;

            var (trueCount, distinctUnits) = await _repo.AggregateAsync(def, ws, we);
            if (distinctUnits < def.MinDistinctUnits){ await _repo.LogSuppressedAsync(def, ws, we); continue; }

            var ok = await _acct.TryConsumeAsync(def.TenantId, def.Name, ws, def.Window, def.Epsilon, def.Delta);
            if (!ok){ await _repo.LogBudgetExceededAsync(def, ws, we); continue; }

            double noisy;
            if (def.Mechanism == DpMechanism.Laplace){
                var b = def.SensitivityDelta / def.Epsilon;
                noisy = DpNoise.Laplace(trueCount, b, _rng);
            } else {
                if (!def.Delta.HasValue) { await _repo.LogErrorAsync(def, ws, we, "Gaussian requires delta"); continue; }
                var sigma = DpNoise.ClassicGaussianSigma(def.SensitivityDelta, def.Epsilon, def.Delta.Value);
                noisy = Math.Max(0, trueCount + _gau.Next() * sigma);
            }

            await _repo.PublishAsync(def, ws, we, noisy, Hash64Stable(trueCount), def.Epsilon, def.Delta, def.Mechanism.ToString());
        }
    }

5.6 比例/均值指标(分子/分母分别加噪 + 小样本抑制)

// 伪代码:同窗同预算,或将 ε 在分子/分母间按重要性拆分
public async Task<double?> ReleaseRatioAsync(MetricDefinition numDef, MetricDefinition denDef, DateTimeOffset ws, DateTimeOffset we){
    var (num, nu) = await _repo.AggregateAsync(numDef, ws, we);
    var (den, du) = await _repo.AggregateAsync(denDef, ws, we);
    var minU = Math.Min(nu, du);
    if (minU < Math.Min(numDef.MinDistinctUnits, denDef.MinDistinctUnits)) return null; // 小样本抑制

    // 分子与分母分别预算校验与加噪(略,等同计数)
    var noisyNum = /* Laplace/Gaussian */ 0.0;
    var noisyDen = /* Laplace/Gaussian */ 1.0;
    if (noisyDen <= 0) return null; // 防爆炸

    var ratio = noisyNum / noisyDen;
    return Math.Clamp(ratio, 0, 1);
}

6. 灰度与回滚 🎛️

  • 🔀 特性开关:按租户启用“DP化指标”。数值型特性可存“灰度比例”,由调用方按此值自定义分流;或在网关层做百分比分流。
  • 🧪 A/B 验证:监控 MAPE / P95;达标→放量;不达标→回滚到“内部仅真值”。

7. 可视化与审计 📊

  • 📉 面板:ε/δ 消耗曲线、剩余额度、发布热力图、小样本抑制/超额次数、失败原因分布(超 ε/超 δ)。
  • 🧾 审计:租户、指标、窗口(UTC)、真值哈希、噪声参数、ε/δ、机制、seedHash、审批单号。
  • 🆘 运维:超额硬阻断、速率限制、异常告警(深夜暴增、跨租户)。

⏱️ 窗口与 TTL 对齐示意

1970-01-01 1970-01-01 1970-01-01 1970-01-01 1970-01-01 1970-01-01 1970-01-01 1970-01-01 1970-01-01 上一窗 当前窗 budget key TTL released lock TTL Window Keys 窗口/TTL 对齐示意(Unix 秒)

8. 评测脚本 🧪

# eval_dp_counter.py(Laplace 误差曲线)
import numpy as np

def laplace_noise(b, size=1, rng=None):
    rng = rng or np.random.default_rng(2025)
    u = rng.random(size=size)
    u = np.clip(u, np.finfo(float).eps, 1-np.finfo(float).eps) - 0.5  # (-0.5,0.5)
    return -b * np.sign(u) * np.log(1 - 2 * np.abs(u))

def eval_mape_p95(true_counts, epsilon, delta_f=1.0, trials=2000):
    b = delta_f / epsilon
    rows = []
    for n in true_counts:
        preds = np.clip(n + laplace_noise(b, trials), 0, None)
        mape = np.mean(np.abs(preds - n) / max(1, n))
        p95 = np.percentile(np.abs(preds - n), 95)
        rows.append((n, mape, p95))
    return rows

if __name__ == '__main__':
    Ns = [50, 100, 300, 1000, 3000, 10000]
    for eps in [0.3, 0.5, 1.0, 2.0]:
        print(eps, eval_mape_p95(Ns, eps))

经验近似:Laplace 的 E(|噪声|)≈bP95(|噪声|)≈2.996·b,可用于看板阈值与业务影响初判。


9. 上线步骤 ✅

  1. 盘点指标 → 划分“公开/内审”;
  2. 选隐私单元与剪裁 K;
  3. 为公开集合设预算(每窗 ε/δ 上限、最小样本、发布频率),由 IBudgetPolicyProvider 配置化下发;
  4. 接入 Abp.DpMetrics,启用 Redis & Background Worker;
  5. 启用发布幂等锁Lua 原子预算消费
  6. 灰度:设置租户特性与分流比例;
  7. A/B:看板跟踪 MAPE/P95 与预算曲线,达标放量;
  8. 演练:预算见底/拒绝服务/回滚 SOP;
  9. 合规:导出审计日志与预算账本快照(UTC)。

10. 仓库结构建议 🧱

/src/Abp.DpMetrics/           // 模块源码(接口/机制/会计/Worker)
/src/Abp.DpMetrics.Tests/     // xUnit:Laplace 误差、预算原子消费并发、窗口边界、幂等锁
/tools/eval/                  // 评测脚本
/tools/dashboard/             // ε 账本 & 误差曲线仪表盘模板
/docker/docker-compose.yml    // Redis(可选)

docker-compose(可选)

version: "3.9"
services:
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    command: ["redis-server","--appendonly","yes"]

appsettings.json(片段)

{
  "Redis": { "Configuration": "localhost:6379" },
  "DistributedCache": { "KeyPrefix": "demo:dp:" }
}

11. 常见坑 🧨

  • “解析高斯”误称:若未接入解析校准,应标注为“经典高斯上界”。
  • 预算键未对齐窗:用 TTL 滑动计窗会与日报/小时看板错位;务必以 windowStartUtc 为主键,TTL=窗口剩余秒数。
  • 上一窗发布 & 幂等:避免按 floor(now,w) 导致跨点漏发/重复发;用 released 锁或 DB Upsert。
  • 预算原子性:并发发布须用 Lua 原子或分布式锁;推荐 Lua。
  • 随机源与开区间:避免 log(0) 边界;Box–Muller 产出双样本需缓存。
  • 比例/均值抑制:小样本或分母过小应抑制结果;必要时加无偏修正。
  • 审计哈希稳定:固定序列化精度/编码,跨平台一致。
  • UTC 一致性:窗口、键、审计时间统一用 UTC,避免时区与夏令时影响。

网站公告

今日签到

点亮在社区的每一天
去签到