🚀🔧【实战】基于 ABP vNext 构建高可用 S7 协议采集平台(西门子 PLC 通信全流程)📊
一、背景与目标 🎯
在工业自动化项目中,西门子 S7 系列 PLC 广泛用于设备控制与数据采集。传统 OPC 通信方式配置繁琐、延迟高,难以胜任现代 IoT 场景。
目标:
- 构建跨平台、配置化的高可用通信平台;
- 实现多台 PLC 并发采集、统一缓存与错误容忍;
- 支持部署、监控、容器化与持续运行。
二、系统架构与技术栈 🏗️
系统架构概览流程图 🏗️
模块 | 技术选型 |
---|---|
框架 | ABP vNext (.NET 8) |
通信 | S7.NetPlus |
配置 | IOptionsSnapshot + reloadOnChange + 环境变量 |
重试 | Polly (Retry + CircuitBreaker) |
后台任务 | AbpBackgroundWorker |
健康检查 | ASP.NET Core HealthChecks |
日志 | Serilog + 结构化日志 |
容器部署 | Docker 多阶段构建 |
三、配置系统设计 ⚙️
1. appsettings.json
{
"PlcOptions": {
"IntervalSeconds": 5,
"Devices": [
{
"DeviceId": "PLC-1",
"CpuType": "S7300",
"Ip": "192.168.1.100",
"Rack": 0,
"Slot": 2,
"Address": "DB1.DBW0"
}
]
}
}
2. 配置类
public class PlcDeviceOptions
{
public string DeviceId { get; set; } = null!;
public string CpuType { get; set; } = "S7300";
public string Ip { get; set; } = null!;
public int Rack { get; set; }
public int Slot { get; set; }
public string Address { get; set; } = null!;
}
public class PlcOptions
{
public int IntervalSeconds { get; set; } = 5;
public List<PlcDeviceOptions> Devices { get; set; } = new();
}
3. 热加载与 DI 注册(Program.cs)
var builder = WebApplication.CreateBuilder(args);
// 配置源:JSON + 环境变量
builder.Configuration
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddEnvironmentVariables();
// 注册配置
builder.Services.Configure<PlcOptions>(
builder.Configuration.GetSection("PlcOptions"));
// 注册核心服务
builder.Services.AddSingleton<PlcConnectionManager>();
builder.Services.AddPolicyRegistry()
.Add("PlcRetry",
Policy.Handle<Exception>()
.RetryAsync(3, onRetry: (ex, cnt) =>
builder.Logging.CreateLogger("Polly")
.LogWarning(ex, "第{Attempt}次重试失败", cnt))
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (ex, ts) =>
builder.Logging.CreateLogger("Polly")
.LogError(ex, "熔断开启,持续{Break}s", ts.TotalSeconds),
onReset: () =>
builder.Logging.CreateLogger("Polly")
.LogInformation("熔断恢复")));
// 注册 ABP Worker 和 HealthChecks
builder.Services.AddBackgroundWorker<PlcPollingWorker>();
builder.Services.AddHealthChecks()
.AddCheck<PlcHealthCheck>("plc_check");
var app = builder.Build();
// 映射健康检查端点
app.MapHealthChecks("/health");
// 优雅停机:释放连接池
app.Lifetime.ApplicationStopping.Register(async () =>
{
await app.Services.GetRequiredService<PlcConnectionManager>()
.DisposeAsync();
});
app.Run();
四、线程安全通信连接池设计 🔒
public class PlcConnectionManager : IAsyncDisposable
{
// key → (信号量, 已打开的 Plc 实例)
private readonly ConcurrentDictionary<string, (SemaphoreSlim Lock, Plc Plc)> _connections
= new();
/// <summary>
/// 租借一个 PLC 实例(线程安全)
/// </summary>
public async Task<Plc> RentAsync(PlcDeviceOptions opt)
{
var key = opt.DeviceId;
var entry = _connections.GetOrAdd(key, _ =>
{
var plc = new Plc(
Enum.Parse<CpuType>(opt.CpuType, ignoreCase: true),
opt.Ip, opt.Rack, opt.Slot);
try
{
plc.Open();
}
catch
{
// 打开失败,移除池中该项
_connections.TryRemove(key, out _);
throw;
}
return (new SemaphoreSlim(1, 1), plc);
});
// 等待获取信号量
await entry.Lock.WaitAsync();
return entry.Plc;
}
/// <summary>
/// 归还租借的实例
/// </summary>
public void Return(string deviceId)
{
if (_connections.TryGetValue(deviceId, out var entry))
{
entry.Lock.Release();
}
}
/// <summary>
/// 优雅释放所有资源
/// </summary>
public async ValueTask DisposeAsync()
{
foreach (var kv in _connections.Values)
{
var sema = kv.Lock;
await sema.WaitAsync(); // 确保没有并发占用
kv.Plc.Close();
sema.Release();
sema.Dispose();
}
}
/// <summary>
/// 获取所有断线设备列表
/// </summary>
public IEnumerable<string> GetDisconnectedDevices() =>
_connections
.Where(kv => !kv.Value.Plc.IsConnected)
.Select(kv => kv.Key);
}
连接池租借/归还流程图 🔒
五、后台采集 Worker 实现 🤖
public class PlcPollingWorker : AsyncPeriodicBackgroundWorkerBase
{
private readonly IOptionsSnapshot<PlcOptions> _options;
private readonly PlcConnectionManager _connMgr;
private readonly ILogger<PlcPollingWorker> _logger;
private readonly IDistributedCache<string> _cache;
private readonly AsyncPolicy _retry;
public PlcPollingWorker(
AbpTimer timer,
IOptionsSnapshot<PlcOptions> options,
PlcConnectionManager connMgr,
IDistributedCache<string> cache,
IPolicyRegistry<string> policyRegistry,
ILogger<PlcPollingWorker> logger
) : base(timer)
{
_options = options;
_connMgr = connMgr;
_cache = cache;
_retry = policyRegistry.Get<AsyncPolicy>("PlcRetry");
_logger = logger;
Timer.PeriodTimeSpan = TimeSpan.FromSeconds(_options.Value.IntervalSeconds);
}
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext ctx)
{
foreach (var dev in _options.Value.Devices)
{
using var scope = _logger.BeginScope("Device:{DeviceId}", dev.DeviceId);
Plc plc = null!;
try
{
plc = await _connMgr.RentAsync(dev);
// 异步执行阻塞调用
var val = await _retry.ExecuteAsync(() =>
Task.Run(() => (short)plc.Read(dev.Address)));
await _cache.SetAsync(
$"Plc:{dev.DeviceId}:Value",
val.ToString(),
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromSeconds(30)
});
_logger.LogInformation("{Device} OK - {Val}", dev.DeviceId, val);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Device {DeviceId} 读取失败", dev.DeviceId);
}
finally
{
if (plc is not null)
{
_connMgr.Return(dev.DeviceId);
}
}
}
}
}
Worker 周期执行流程图 🤖
六、重试与熔断策略注入 🔄
// 在 Program.cs 中已注册:
builder.Services.AddPolicyRegistry()
.Add("PlcRetry",
Policy.Handle<Exception>()
.RetryAsync(3, onRetry: (ex, cnt) =>
logger.LogWarning(ex, "第{Attempt}次重试失败", cnt))
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (ex, ts) =>
logger.LogError(ex, "熔断开启,持续{Break}s", ts.TotalSeconds),
onReset: () =>
logger.LogInformation("熔断已恢复")));
重试与熔断策略流程图 🔄
七、健康检查与指标上报 📈
public class PlcHealthCheck : IHealthCheck
{
private readonly PlcConnectionManager _manager;
public PlcHealthCheck(PlcConnectionManager manager) =>
_manager = manager;
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext ctx,
CancellationToken ct = default)
{
var down = _manager.GetDisconnectedDevices().ToList();
return Task.FromResult(
down.Any()
? HealthCheckResult.Unhealthy(
$"失联设备: {string.Join(", ", down)}")
: HealthCheckResult.Healthy("所有 PLC 均连接正常"));
}
}
- 已通过
app.MapHealthChecks("/health")
暴露 - 可集成 Prometheus/OpenTelemetry 暴露
/metrics
八、Docker 容器化部署建议 🐳
# 构建阶段
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish -c Release -o /app
# 运行阶段
FROM mcr.microsoft.com/dotnet/aspnet:8.0
WORKDIR /app
COPY --from=build /app ./
# 安装健康探针依赖并清理
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl \
&& rm -rf /var/lib/apt/lists/*
HEALTHCHECK --interval=10s \
CMD curl --fail http://localhost:5000/health || exit 1
ENTRYPOINT ["dotnet", "S7Reader.HttpApi.Host.dll"]
Docker 多阶段构建流程图 🐳
九、总结与最佳实践清单 📝
分类 | 实践建议 |
---|---|
配置管理 | IOptionsSnapshot + reloadOnChange + 环境变量,实现热更新 |
DI 注册 | AddSingleton() AddBackgroundWorker() AddHealthChecks()/MapHealthChecks |
连接池设计 | 租借/归还模式 + 异常清理 IAsyncDisposable 优雅关闭 |
异步与容错 | Task.Run 封装阻塞调用 Polly Retry + CircuitBreaker 日志 onBreak/onReset |
后台调度 | ABP Worker 框架周期执行 外层全捕异常,保持服务持续运行 |
健康监控 | 精准检测每台设备状态 HealthChecks + Prometheus/OpenTelemetry |
容器部署 | Docker 多阶段构建 无冗余依赖镜像 标准 HEALTHCHECK |
优雅停机 | ApplicationStopping.ReleaseAsync → DisposeAsync |
日志可视化 | Serilog 结构化 + BeginScope “设备ID”上下文 |