ABP vNext+ WebRTC DataChannel 低延迟传感推送

发布于:2025-08-16 ⋅ 阅读:(14) ⋅ 点赞:(0)

ABP vNext + WebRTC DataChannel 低延迟传感推送 🚀



开篇导读

使用 WebRTC DataChannel 在浏览器侧实现毫秒级传感推送,ABP 仅做信令网关。关键:SCTP 部分可靠性、GCC 拥塞控制、ICE Restart 自愈、TURN(UDP/TCP/TLS) 保达、全链路可观测。附最小可运行 Demo + Compose + QoS 采样脚本


1. 交付物与目录结构 📦

交付物

  1. ABP Signaling 模块:JWT、多租户、SignalR Hub、端点/方法级限流
  2. 前端 Demo:Producer / Consumer 两页,双 DataChannel(sensors/control
  3. coturn 最小安全配置(UDP/TCP/TLS),时间戳凭据(HMAC)
  4. QoS 自适应与断线自愈脚本(getStats + 控制通道心跳 + ICE Restart)
  5. 压测与验收脚本 + 指标看板(前端/后端),SignalR 水平扩展指引

目录

webrtc-rt-quality/
  signaling/        # ABP 模块(SignalR Hub)
  web-demo/         # producer.html / consumer.html
  deploy/
    docker-compose.yml
    turn.env
    certs/
  scripts/          # k6 / node 采样器 / grafana json

2. 背景与目标 🛠️

  • 工业/IoT 质量监测:端到端 p95 100–200ms,可丢包、可穿企业网/NAT
  • 选择 DataChannel:浏览器原生、SCTP 乱序/分片/部分可靠、配合 GCC 拥塞控制、P2P 优先
  • 非主题:业务数据不走 SignalR/gRPC;SignalR 仅用于信令(Offer/Answer/ICE)

3. 总体架构与时序 🔄

以下是信令交互的时序图,展示从信令到数据通道建立的过程:

Producer(Browser) ABP Signaling Hub Consumer(Browser) coturn(UDP/TCP/TLS) Offer(JWT, TenantId) Offer Answer Answer ICE candidates ICE candidates STUN/TURN 收集候选, 优先直连, 不通则走 TURN DataChannel(sensors/control) via SCTP/DTLS sensors: unordered + no-retrans control: ordered + limited-retrans Handling TURN fallback (proxy) DataChannel(sensors/control) via TURN relay alt [No direct connection] Producer(Browser) ABP Signaling Hub Consumer(Browser) coturn(UDP/TCP/TLS)

4. ABP 信令网关(只做信令) 🗣️

依赖:[DependsOn(typeof(AbpAspNetCoreSignalRModule))]

模块与中间件

// signaling/SignalingModule.cs
[DependsOn(typeof(AbpAspNetCoreSignalRModule))]
public class SignalingModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext ctx)
    {
        var services = ctx.Services;

        services.AddAuthentication().AddJwtBearer(/* Authority/Audience... */);
        services.AddAuthorization();
        services.AddSignalR();

        // ASP.NET Core Rate Limiting:端点/方法双策略
        services.AddRateLimiter(o =>
        {
            o.AddFixedWindowLimiter("negotiate", opt =>
            {
                opt.Window = TimeSpan.FromSeconds(1);
                opt.PermitLimit = 5; opt.QueueLimit = 0;
            });
            o.AddConcurrencyLimiter("hub-methods", opt =>
            {
                opt.PermitLimit = 50; opt.QueueLimit = 0;
            });
        });

        // 多租户:Header/域名解析 -> ICurrentTenant
        // 可实现 ITenantResolveContributor,从 X-Tenant-Id 解析并校验与 JWT 一致
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext ctx)
    {
        var app = ctx.GetApplicationBuilder();
        app.UseAuthentication();
        app.UseAuthorization();
        app.UseRateLimiter();

        app.MapHub<SignalingHub>("/hub")
           .RequireRateLimiting("negotiate")
           .RequireRateLimiting("hub-methods");
    }
}

Hub 骨架

// signaling/SignalingHub.cs
[Authorize]
public sealed class SignalingHub : Hub
{
    public async Task Join(string room)
    {
        // 校验:room 与 JWT 中的租户/权限绑定
        await Groups.AddToGroupAsync(Context.ConnectionId, room);
    }

    public Task Offer(string room, string sdp)  =>
        Clients.OthersInGroup(room).SendAsync("offer", sdp);

    public Task Answer(string room, string sdp) =>
        Clients.OthersInGroup(room).SendAsync("answer", sdp);

    public Task Ice(string room, string cand)   =>
        Clients.OthersInGroup(room).SendAsync("ice", cand);
}

可用性:多实例部署请启用 Redis backplane 或接入 Azure SignalR Service,保证组播在节点间同步。


5. STUN/TURN 与部署(UDP/TCP/TLS 全开 + 时间戳凭据) 🔐

为什么需要 TURNS(443/TCP+TLS)
企业网常只放行 443/TCP,需 TURNS(TLS);coturn 支持时间戳凭据(HMAC),避免静态账户泄漏。

生产小贴士

  • TURN 在 NAT/防火墙后:配置 external-ip,开放中继端口范围(如 min-port/max-port
  • 容器化部署建议 host 网络(Linux)以便大范围 UDP 端口使用;演示环境可映射小段端口

docker-compose(精简示例)

# deploy/docker-compose.yml
services:
  coturn:
    image: coturn/coturn
    restart: unless-stopped
    network_mode: host   # 推荐生产(Linux);否则请映射端口范围
    command:
      - -n
      - --realm=myorg
      - --fingerprint
      - --lt-cred-mech
      - --use-auth-secret
      - --static-auth-secret=${TURN_SECRET}
      - --no-cli
      - --total-quota=100
      - --min-port=49160
      - --max-port=49200
      - --external-ip=${PUBLIC_IP}/${PRIVATE_IP}
      # TURNS:
      - --cert=/certs/fullchain.pem
      - --pkey=/certs/privkey.pem
    env_file: turn.env
    volumes:
      - ./certs:/certs:ro
# 若无法使用 host 网络,改为端口映射:
#   ports:
#     - "3478:3478/udp"
#     - "3478:3478/tcp"
#     - "5349:5349/tcp"
#     - "49160-49200:49160-49200/udp"

时间戳凭据服务端(C# Minimal API)

// signaling/TurnCredController.cs
app.MapGet("/api/turn-cred", (HttpContext http) =>
{
    // 校验来访者身份(JWT)与租户
    var userId = http.User.Identity?.Name ?? "anon";
    var ttl = TimeSpan.FromMinutes(10);
    var unix = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (long)ttl.TotalSeconds;
    var username = $"{unix}:{userId}";

    var secret = Environment.GetEnvironmentVariable("TURN_SECRET")!;
    using var hmac = new System.Security.Cryptography.HMACSHA1(
        System.Text.Encoding.UTF8.GetBytes(secret));
    var hash = hmac.ComputeHash(System.Text.Encoding.UTF8.GetBytes(username));
    var credential = Convert.ToBase64String(hash);

    return Results.Ok(new { username, credential, ttlSeconds = (int)ttl.TotalSeconds,
                            urls = new[]{
                              "stun:stun.l.google.com:19302",
                              "turn:turn.myorg:3478?transport=udp",
                              "turn:turn.myorg:3478?transport=tcp",
                              "turns:turn.myorg:5349"
                            }});
});

6. 浏览器端:Peer/双通道/消息规约(含背压) 🌐

SignalR(正确的非 ESM 引用)

<script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@8/dist/browser/signalr.min.js"></script>
<script>
  const hub = new signalR.HubConnectionBuilder()
    .withUrl("/hub", { accessTokenFactory: () => localStorage.getItem("jwt") })
    .withAutomaticReconnect()
    .build();

  (async () => {
    await hub.start();
    // ...
  })();
</script>

Peer 与 ICE(支持“强制走 TURN”)

// 拉取短期 TURN 凭据
const cred = await fetch('/api/turn-cred').then(r => r.json());

// 直连优先(默认策略)
const pc = new RTCPeerConnection({ iceServers: cred.urls.map(u => ({urls: u})) });

// 如需强制经 TURN(例如只能走 443/TCP):
const relayOnly = new RTCPeerConnection({
  iceTransportPolicy: 'relay',
  iceServers: cred.urls.map(u => ({urls: u, username: cred.username, credential: cred.credential}))
});

双通道与消息规约

const sensors = pc.createDataChannel("sensors", { ordered: false, maxRetransmits: 0 });
const control = pc.createDataChannel("control", { ordered: true,  maxRetransmits: 5 });
sensors.binaryType = "arraybuffer";

// 建议单消息 ≤ 16KB(减少 SCTP 分片/HOL 影响,跨端更稳)
const MAX_MSG = 16 * 1024;

// 发送侧背压:避免一次性堆积导致时延抖动
sensors.bufferedAmountLowThreshold = 64 * 1024;
async function sendFrame(payload /* ArrayBuffer */) {
  if (payload.byteLength > MAX_MSG) {
    // 应用层切片
    for (let off = 0; off < payload.byteLength; off += MAX_MSG) {
      await sendFrame(payload.slice(off, Math.min(off + MAX_MSG, payload.byteLength)));
    }
    return;
  }
  if (sensors.bufferedAmount > sensors.bufferedAmountLowThreshold) {
    await new Promise(res => sensors.onbufferedamountlow = res);
  }
  sensors.send(payload);
}

最小信令(Producer 侧)

await hub.invoke("Join", ROOM);
pc.onicecandidate = e => e.candidate && hub.invoke("Ice", ROOM, JSON.stringify(e.candidate));
const offer = await pc.createOffer();              // 初始协商
await pc.setLocalDescription(offer);
await hub.invoke("Offer", ROOM, JSON.stringify(offer));

hub.on("answer", async sdp => await pc.setRemoteDescription(JSON.parse(sdp)));
hub.on("ice",    async cand=> await pc.addIceCandidate(JSON.parse(cand)));

  // demo: 50Hz 推送 8KB 虚拟数据
  const MAX_MSG = 16*1024;
  sensors.bufferedAmountLowThreshold = 64*1024;
  setInterval(async ()=>{
    const payload = new Uint8Array(8*1024).buffer;
    if (payload.byteLength <= MAX_MSG) {
      if (sensors.bufferedAmount > sensors.bufferedAmountLowThreshold)
        await new Promise(res => sensors.onbufferedamountlow = res);
      sensors.send(payload);
    }
  }, 20);

Consumer 侧

await hub.invoke("Join", ROOM);
pc.onicecandidate = e => e.candidate && hub.invoke("Ice", ROOM, JSON.stringify(e.candidate));
hub.on("offer", async sdp => {
  await pc.setRemoteDescription(JSON.parse(sdp));
  const answer = await pc.createAnswer();
  await pc.setLocalDescription(answer);
  await hub.invoke("Answer", ROOM, JSON.stringify(answer));
});

  // 简化 stats 面板
  setInterval(async()=>{
    const st = await pc.getStats(); let out= [];
    st.forEach(s => { if (s.type==='transport' && s.bytesSent) out.push(`${s.bytesSent}/${s.bytesReceived}`); });
    document.getElementById('stats').textContent = out.join('\n');
  }, 1000);

7. QoS 采样与自适应(跨浏览器回退) 📊

1)getStats() 优先从“选中候选对/传输层”取关键指标

async function sampleQoS(pc) {
  const st = await pc.getStats();
  let rtt, availOut, txBytes = 0, rxBytes = 0;

  st.forEach(s => {
    // 传输与候选对
    if (s.type === 'transport' && s.selectedCandidatePairId) {
      const p = st.get(s.selectedCandidatePairId);
      rtt = p?.currentRoundTripTime ?? rtt;
      availOut = p?.availableOutgoingBitrate ?? availOut;
      txBytes += p?.bytesSent ?? 0;
      rxBytes += p?.bytesReceived ?? 0;
    }
    // 某些浏览器的 data-channel stats 可用性不一致,故只作为补充
    if (s.type === 'data-channel' && s.label === 'sensors') {
      txBytes += s?.bytesSent ?? 0;
      rxBytes += s?.bytesReceived ?? 0;
    }
  });
  return { rtt, availOut, txBytes, rxBytes, ts: performance.now() };
}

2)控制通道心跳(RTT 回退)

// 控制通道上做心跳,校准 RTT
function startHeartBeat(dc, onRtt) {
  setInterval(() => {
    const t0 = performance.now();
    dc.send(JSON.stringify({ type: "ping", t0 }));
  }, 2000);
  dc.onmessage = e => {
    const msg = JSON.parse(e.data);
    if (msg.type === "ping") dc.send(JSON.stringify({ type:"pong", t0: msg.t0 }));
    if (msg.type === "pong") onRtt(performance.now() - msg.t0);
  };
}

3)自适应(滑窗 + 回滞)

const win = [];
function trend(val){ win.push(val); if (win.length>5) win.shift(); return win; }

function dialDown() { /* 50Hz→25Hz、合帧、降精度,见业务实现 */ }
function dialUp()   { /* 逐步回升,避免抖动 */ }

async function loopAdapt(pc) {
  let last = await sampleQoS(pc);
  setInterval(async () => {
    const now = await sampleQoS(pc);
    const dt = (now.ts - last.ts)/1000;
    const outBps = ((now.txBytes - last.txBytes) * 8) / dt;
    const rtt = now.rtt ?? last.rtt;     // 浏览器不支持时用心跳回退
    trend({ rtt, outBps, avail: now.availOut });

    const worse = win.slice(-3).some(x => x.rtt>0.25 || (x.avail && now.avail && x.avail < 0.6*now.avail));
    const better= win.length===5 && win.every(x=> (x.rtt ?? 0) < 0.15);
    if (worse) dialDown(); else if (better) dialUp();

    last = now;
  }, 1000);
}

8. 断线自愈(只保留一种 ICE Restart 写法) 🔄

推荐:仅使用 createOffer({ iceRestart: true }) 触发 ICE 重启,逻辑更直观。

pc.addEventListener('iceconnectionstatechange', async () => {
  const s = pc.iceConnectionState;
  if (s === 'failed' || s === 'disconnected') {
    // 观望小窗(例如 3~5s)后执行重启
    setTimeout(async () => {
      if (pc.iceConnectionState === 'failed' || pc.iceConnectionState === 'disconnected') {
        const offer = await pc.createOffer({ iceRestart: true });
        await pc.setLocalDescription(offer);
        await hub.invoke("Offer", ROOM, JSON.stringify(offer));
      }
    }, 3000);
  }
});

关键指标

  • 断线/小时、平均恢复时长、TURN/TURNS 命中率、TURNS 退化次数

9. 安全与配额 🔒

  • JWT:短有效期(60–120s),绑定 Tenant/Room/Role,后端校验 Header 与 Token 的租户一致性
  • 限流:对 /hub 端点与 Hub 方法 同时限流(固定窗口 + 并发限制)
  • 日志:SDP/ICE 仅存 哈希;默认脱敏(User/Room/Candidate 等敏感字段)
  • XSS/CSRF

:静态页使用 Content-Security-Policy;Hub 鉴权走 Bearer Token,不暴露 Cookie


10. 可观测性(Tracing / Metrics / Logs) 📈

后端(.NET)

static readonly ActivitySource Act = new("Acme.Signaling");

app.Use(async (ctx, next) => {
  using var act = Act.StartActivity("HubRequest");
  act?.SetTag("tenant", ctx.Request.Headers["X-Tenant-Id"].ToString());
  act?.SetTag("room", ctx.Request.Query["room"].ToString());
  await next();
});

前端面板

  • getStats() 抽样(1s):rtt/availOut/bytes*、通道队列长度、丢弃帧数、ICE 状态
  • 记录 QoS 调整事件(降档/升档/原因)

11. 与 gRPC / GraphQL Subscriptions 的边界

方案 优点 局限 适用
WebRTC DataChannel 浏览器原生、极低延迟、P2P 优先 需 TURN/TLS、前端实现复杂 端边直连、超低延迟传感
gRPC 双向流 服务端统一治理/审计 浏览器需 gRPC-Web/代理、延迟略高 服务中心化
GraphQL Subscriptions 模型/权限统一、前端生态好 延迟与抖动高于 DataChannel 看板/业务消息

12. 基准与验收 🧪

场景

  • 候选:直连 host/srflx vs TURN(UDP/TCP/TLS)
  • 频率:20/50/100Hz;地区:同城/跨区

验收阈值

  • 首包(Offer→Answer→datachannel.open)≤ 800ms
  • 稳态 p95 ≤ 150ms
  • 重连恢复 ≤ 2s
  • 丢包 < 2% 时可用

工具

  • 后端:k6 压 /hub/negotiate 与凭据接口
  • 前端:采样器导出 CSV/JSON;Grafana 看板(示例 JSON 附 scripts/

k6 示例(scripts/k6-neg.js)

import http from 'k6/http';
import { sleep } from 'k6';
export const options = { vus: 50, duration: '1m' };
export default function () {
  http.get('https://your-host/api/turn-cred');
  sleep(1);
}

13. Demo 与 Compose 🎮

1)启动 TURN(或 TURNS)

cd deploy
export TURN_SECRET=ChangeMeBase64
export PUBLIC_IP=203.0.113.10
export PRIVATE_IP=10.0.0.10
docker compose up -d coturn

2)启动 ABP 信令网关(本地 https 或加反代)

cd signaling
dotnet run

3)打开两个页面(同机或跨机)

  • web-demo/producer.htmlweb-demo/consumer.html
  • 输入相同 ROOM,点击 Connect
  • 观察 QoS 面板与日志(重连/降档事件)

14. 常见坑与排障 ⚠️

  • 只放行 443/TCP:用 iceTransportPolicy:'relay' + turns:443;证书与域名一致
  • 消息卡顿/突刺:保证单消息 ≤ 16KB,应用层切片;发送侧加入 bufferedAmount 背压
  • TURN 在 NAT 后:配置 external-ip,开放(或映射)中继端口范围;容器优先 host 网络
  • 统计不准:优先读取 selected candidate pair/transport 指标;不足时用控制通道心跳回退
  • 多实例不互通:加 Redis backplaneAzure SignalR,保证组播转发

附:web-demo 最小页面片段(可直接放 Nginx 或本地静态服务)

producer.html(节选)

<!doctype html><meta charset="utf-8"/>
<input id="room" placeholder="room"/><button id="connect">Connect</button>
<script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@8/dist/browser/signalr.min.js"></script>
<script>
const ROOM = document.getElementById('room').value || 'demo';
const hub = new signalR.HubConnectionBuilder().withUrl("/hub",{
  accessTokenFactory: ()=>localStorage.getItem("jwt")
}).withAutomaticReconnect().build();

(async () => {
  await hub.start();
  const cred = await (await fetch('/api/turn-cred')).json();
  const pc = new RTCPeerConnection({ iceServers: cred.urls.map(u=>({urls:u, username:cred.username, credential:cred.credential})) });

  const sensors = pc.createDataChannel("sensors", { ordered:false, maxRetransmits:0 });
  sensors.binaryType = "arraybuffer";
  pc.onicecandidate = e => e.candidate && hub.invoke("Ice", ROOM, JSON.stringify(e.candidate));

  await hub.invoke("Join", ROOM);
  const offer = await pc.createOffer();
  await pc.setLocalDescription(offer);
  await hub.invoke("Offer", ROOM, JSON.stringify(offer));

  hub.on("answer", async sdp => await pc.setRemoteDescription(JSON.parse(sdp)));
  hub.on("ice",    async cand=> await pc.addIceCandidate(JSON.parse(cand)));

  // demo: 50Hz 推送 8KB 虚拟数据
  const MAX_MSG = 16*1024;
  sensors.bufferedAmountLowThreshold = 64*1024;
  setInterval(async ()=>{
    const payload = new Uint8Array(8*1024).buffer;
    if (payload.byteLength <= MAX_MSG) {
      if (sensors.bufferedAmount > sensors.bufferedAmountLowThreshold)
        await new Promise(res => sensors.onbufferedamountlow = res);
      sensors.send(payload);
    }
  }, 20);
})();
</script>

consumer.html(节选)

<!doctype html><meta charset="utf-8"/>
<input id="room" placeholder="room"/><button id="connect">Connect</button>
<pre id="stats"></pre>
<script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@8/dist/browser/signalr.min.js"></script>
<script>
const ROOM = document.getElementById('room').value || 'demo';
const hub = new signalR.HubConnectionBuilder().withUrl("/hub",{
  accessTokenFactory: ()=>localStorage.getItem("jwt")
}).withAutomaticReconnect().build();

(async () => {
  await hub.start();
  const cred = await (await fetch('/api/turn-cred')).json();
  const pc = new RTCPeerConnection({ iceServers: cred.urls.map(u=>({urls:u, username:cred.username, credential:cred.credential})) });

  pc.onicecandidate = e => e.candidate && hub.invoke("Ice", ROOM, JSON.stringify(e.candidate));
  pc.ondatachannel = ev => {
    if (ev.channel.label === 'sensors') {
      ev.channel.onmessage = e => {/* 渲染/计数 */};
    }
  };

  await hub.invoke("Join", ROOM);
  hub.on("offer", async sdp => {
    await pc.setRemoteDescription(JSON.parse(sdp));
    const answer = await pc.createAnswer();
    await pc.setLocalDescription(answer);
    await hub.invoke("Answer", ROOM, JSON.stringify(answer));
  });

  // 简化 stats 面板
  setInterval(async()=>{
    const st = await pc.getStats(); let out= [];
    st.forEach(s => { if (s.type==='transport' && s.bytesSent) out.push(`${s.bytesSent}/${s.bytesReceived}`); });
    document.getElementById('stats').textContent = out.join('\n');
  }, 1000);
})();
</script>

网站公告

今日签到

点亮在社区的每一天
去签到