【实战】基于 ABP vNext 构建高可用 S7 协议采集平台(西门子 PLC 通信全流程)

发布于:2025-05-14 ⋅ 阅读:(7) ⋅ 点赞:(0)

🚀🔧【实战】基于 ABP vNext 构建高可用 S7 协议采集平台(西门子 PLC 通信全流程)📊



一、背景与目标 🎯

在工业自动化项目中,西门子 S7 系列 PLC 广泛用于设备控制与数据采集。传统 OPC 通信方式配置繁琐、延迟高,难以胜任现代 IoT 场景。

目标:

  • 构建跨平台、配置化的高可用通信平台;
  • 实现多台 PLC 并发采集、统一缓存与错误容忍;
  • 支持部署、监控、容器化与持续运行。

二、系统架构与技术栈 🏗️

系统架构概览流程图 🏗️
基础设施层
业务层
API 层
S7.NetPlus PLC 连接
Redis 缓存
Prometheus/ELK
PlcPollingWorker
PlcConnectionManager
IDistributedCache
HealthChecks
HTTP 接口
Worker 任务调度
模块 技术选型
框架 ABP vNext (.NET 8)
通信 S7.NetPlus
配置 IOptionsSnapshot + reloadOnChange + 环境变量
重试 Polly (Retry + CircuitBreaker)
后台任务 AbpBackgroundWorker
健康检查 ASP.NET Core HealthChecks
日志 Serilog + 结构化日志
容器部署 Docker 多阶段构建

三、配置系统设计 ⚙️

1. appsettings.json
{
  "PlcOptions": {
    "IntervalSeconds": 5,
    "Devices": [
      {
        "DeviceId": "PLC-1",
        "CpuType": "S7300",
        "Ip": "192.168.1.100",
        "Rack": 0,
        "Slot": 2,
        "Address": "DB1.DBW0"
      }
    ]
  }
}
2. 配置类
public class PlcDeviceOptions
{
    public string DeviceId { get; set; } = null!;
    public string CpuType   { get; set; } = "S7300";
    public string Ip        { get; set; } = null!;
    public int    Rack      { get; set; }
    public int    Slot      { get; set; }
    public string Address   { get; set; } = null!;
}

public class PlcOptions
{
    public int                  IntervalSeconds { get; set; } = 5;
    public List<PlcDeviceOptions> Devices       { get; set; } = new();
}
3. 热加载与 DI 注册(Program.cs)
var builder = WebApplication.CreateBuilder(args);

// 配置源:JSON + 环境变量
builder.Configuration
    .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
    .AddEnvironmentVariables();

// 注册配置
builder.Services.Configure<PlcOptions>(
    builder.Configuration.GetSection("PlcOptions"));

// 注册核心服务
builder.Services.AddSingleton<PlcConnectionManager>();
builder.Services.AddPolicyRegistry()
    .Add("PlcRetry",
        Policy.Handle<Exception>()
            .RetryAsync(3, onRetry: (ex, cnt) =>
                builder.Logging.CreateLogger("Polly")
                   .LogWarning(ex, "第{Attempt}次重试失败", cnt))
            .CircuitBreakerAsync(
                handledEventsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromSeconds(30),
                onBreak: (ex, ts) =>
                    builder.Logging.CreateLogger("Polly")
                       .LogError(ex, "熔断开启,持续{Break}s", ts.TotalSeconds),
                onReset: () =>
                    builder.Logging.CreateLogger("Polly")
                       .LogInformation("熔断恢复")));

// 注册 ABP Worker 和 HealthChecks
builder.Services.AddBackgroundWorker<PlcPollingWorker>();
builder.Services.AddHealthChecks()
    .AddCheck<PlcHealthCheck>("plc_check");

var app = builder.Build();

// 映射健康检查端点
app.MapHealthChecks("/health");

// 优雅停机:释放连接池
app.Lifetime.ApplicationStopping.Register(async () =>
{
    await app.Services.GetRequiredService<PlcConnectionManager>()
             .DisposeAsync();
});

app.Run();

四、线程安全通信连接池设计 🔒

public class PlcConnectionManager : IAsyncDisposable
{
    // key → (信号量, 已打开的 Plc 实例)
    private readonly ConcurrentDictionary<string, (SemaphoreSlim Lock, Plc Plc)> _connections 
        = new();

    /// <summary>
    /// 租借一个 PLC 实例(线程安全)
    /// </summary>
    public async Task<Plc> RentAsync(PlcDeviceOptions opt)
    {
        var key = opt.DeviceId;
        var entry = _connections.GetOrAdd(key, _ =>
        {
            var plc = new Plc(
                Enum.Parse<CpuType>(opt.CpuType, ignoreCase: true),
                opt.Ip, opt.Rack, opt.Slot);

            try
            {
                plc.Open();
            }
            catch
            {
                // 打开失败,移除池中该项
                _connections.TryRemove(key, out _);
                throw;
            }

            return (new SemaphoreSlim(1, 1), plc);
        });

        // 等待获取信号量
        await entry.Lock.WaitAsync();
        return entry.Plc;
    }

    /// <summary>
    /// 归还租借的实例
    /// </summary>
    public void Return(string deviceId)
    {
        if (_connections.TryGetValue(deviceId, out var entry))
        {
            entry.Lock.Release();
        }
    }

    /// <summary>
    /// 优雅释放所有资源
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        foreach (var kv in _connections.Values)
        {
            var sema = kv.Lock;
            await sema.WaitAsync();       // 确保没有并发占用
            kv.Plc.Close();
            sema.Release();
            sema.Dispose();
        }
    }

    /// <summary>
    /// 获取所有断线设备列表
    /// </summary>
    public IEnumerable<string> GetDisconnectedDevices() =>
        _connections
            .Where(kv => !kv.Value.Plc.IsConnected)
            .Select(kv => kv.Key);
}
连接池租借/归还流程图 🔒
开始 RentAsync(Device)
池中存在?
获取 (Semaphore, Plc)
创建新 Plc 实例并 Open()
存入 ConcurrentDictionary
Await Semaphore.WaitAsync()
返回 Plc 给调用方
调用完成后执行 Return(DeviceId)
Semaphore.Release()
流程结束

五、后台采集 Worker 实现 🤖

public class PlcPollingWorker : AsyncPeriodicBackgroundWorkerBase
{
    private readonly IOptionsSnapshot<PlcOptions> _options;
    private readonly PlcConnectionManager          _connMgr;
    private readonly ILogger<PlcPollingWorker>     _logger;
    private readonly IDistributedCache<string>     _cache;
    private readonly AsyncPolicy                   _retry;

    public PlcPollingWorker(
        AbpTimer                       timer,
        IOptionsSnapshot<PlcOptions>   options,
        PlcConnectionManager           connMgr,
        IDistributedCache<string>      cache,
        IPolicyRegistry<string>        policyRegistry,
        ILogger<PlcPollingWorker>      logger
    ) : base(timer)
    {
        _options = options;
        _connMgr  = connMgr;
        _cache    = cache;
        _retry    = policyRegistry.Get<AsyncPolicy>("PlcRetry");
        _logger   = logger;
        Timer.PeriodTimeSpan = TimeSpan.FromSeconds(_options.Value.IntervalSeconds);
    }

    protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext ctx)
    {
        foreach (var dev in _options.Value.Devices)
        {
            using var scope = _logger.BeginScope("Device:{DeviceId}", dev.DeviceId);
            Plc plc = null!;
            try
            {
                plc = await _connMgr.RentAsync(dev);
                // 异步执行阻塞调用
                var val = await _retry.ExecuteAsync(() =>
                   Task.Run(() => (short)plc.Read(dev.Address)));

                await _cache.SetAsync(
                   $"Plc:{dev.DeviceId}:Value",
                   val.ToString(),
                   new DistributedCacheEntryOptions
                   {
                       SlidingExpiration = TimeSpan.FromSeconds(30)
                   });

                _logger.LogInformation("{Device} OK - {Val}", dev.DeviceId, val);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "Device {DeviceId} 读取失败", dev.DeviceId);
            }
            finally
            {
                if (plc is not null)
                {
                    _connMgr.Return(dev.DeviceId);
                }
            }
        }
    }
}
Worker 周期执行流程图 🤖
Timer PlcPollingWorker PlcConnectionManager PollyRetryPolicy IDistributedCache ILogger 定时触发 DoWorkAsync() RentAsync(dev) 返回 plc 实例 ExecuteAsync(Read(dev.Address)) 返回 value SetAsync(key, value) LogInformation(...) Return(dev) loop [遍历每台设备] LogWarning(...) alt [出现异常] Timer PlcPollingWorker PlcConnectionManager PollyRetryPolicy IDistributedCache ILogger

六、重试与熔断策略注入 🔄

// 在 Program.cs 中已注册:
builder.Services.AddPolicyRegistry()
    .Add("PlcRetry",
        Policy.Handle<Exception>()
            .RetryAsync(3, onRetry: (ex, cnt) =>
                logger.LogWarning(ex, "第{Attempt}次重试失败", cnt))
            .CircuitBreakerAsync(
                handledEventsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromSeconds(30),
                onBreak: (ex, ts) =>
                    logger.LogError(ex, "熔断开启,持续{Break}s", ts.TotalSeconds),
                onReset: () =>
                    logger.LogInformation("熔断已恢复")));
重试与熔断策略流程图 🔄
成功
失败
失败
失败
重试下一次
RetryAsync 第1次
返回结果
RetryAsync 第2次
RetryAsync 第3次
CircuitBreaker 计数 +1
失败次数 ≥5?
熔断开启: 30s
拒绝所有后续调用
onReset 恢复后重置计数

七、健康检查与指标上报 📈

public class PlcHealthCheck : IHealthCheck
{
    private readonly PlcConnectionManager _manager;

    public PlcHealthCheck(PlcConnectionManager manager) =>
        _manager = manager;

    public Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext ctx,
        CancellationToken    ct = default)
    {
        var down = _manager.GetDisconnectedDevices().ToList();
        return Task.FromResult(
            down.Any()
                ? HealthCheckResult.Unhealthy(
                    $"失联设备: {string.Join(", ", down)}")
                : HealthCheckResult.Healthy("所有 PLC 均连接正常"));
    }
}
  • 已通过 app.MapHealthChecks("/health") 暴露
  • 可集成 Prometheus/OpenTelemetry 暴露 /metrics

八、Docker 容器化部署建议 🐳

# 构建阶段
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish -c Release -o /app

# 运行阶段
FROM mcr.microsoft.com/dotnet/aspnet:8.0
WORKDIR /app
COPY --from=build /app ./

# 安装健康探针依赖并清理
RUN apt-get update \
 && apt-get install -y --no-install-recommends curl \
 && rm -rf /var/lib/apt/lists/*

HEALTHCHECK --interval=10s \
  CMD curl --fail http://localhost:5000/health || exit 1

ENTRYPOINT ["dotnet", "S7Reader.HttpApi.Host.dll"]
Docker 多阶段构建流程图 🐳
Runtime 阶段
COPY /app
FROM aspnet:8.0
apt-get install curl
HEALTHCHECK
ENTRYPOINT dotnet S7Reader.HttpApi.Host.dll
Build 阶段
dotnet restore
checkout 源码
dotnet build
dotnet publish -o /app

九、总结与最佳实践清单 📝

分类 实践建议
配置管理 IOptionsSnapshot + reloadOnChange + 环境变量,实现热更新
DI 注册 AddSingleton()
AddBackgroundWorker()
AddHealthChecks()/MapHealthChecks
连接池设计 租借/归还模式 + 异常清理
IAsyncDisposable 优雅关闭
异步与容错 Task.Run 封装阻塞调用
Polly Retry + CircuitBreaker
日志 onBreak/onReset
后台调度 ABP Worker 框架周期执行
外层全捕异常,保持服务持续运行
健康监控 精准检测每台设备状态
HealthChecks + Prometheus/OpenTelemetry
容器部署 Docker 多阶段构建
无冗余依赖镜像
标准 HEALTHCHECK
优雅停机 ApplicationStopping.ReleaseAsync → DisposeAsync
日志可视化 Serilog 结构化 + BeginScope “设备ID”上下文

十、参考资料 📚



网站公告

今日签到

点亮在社区的每一天
去签到