.net8创建tcp服务接收数据通过websocket广播

发布于:2025-06-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

注册TCP服务器 注册WebSocket中间件

using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.WebSockets;

var builder = WebApplication.CreateBuilder(args);

// 注册TCP服务
builder.Services.AddSingleton<TcpServer>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<TcpServer>());

// 注册WebSocket中间件
builder.Services.AddSingleton<WebSocketManager>();

builder.WebHost.UseUrls("http://*:5000");//指定websocket端口号

var app = builder.Build();

// WebSocket中间件
app.UseWebSockets();
app.Use(async (context, next) =>
{
    if (context.WebSockets.IsWebSocketRequest)
    {
        var webSocketManager = context.RequestServices.GetRequiredService<WebSocketManagement>();
        var webSocket = await context.WebSockets.AcceptWebSocketAsync();
        await webSocketManager.HandleWebSocketConnectionAsync(webSocket);
    }
    else
    {
        await next(context);
    }
});

app.Run();

tcp服务实现

public class TcpServer : BackgroundService
{
    private readonly WebSocketManagement _webSocketManager;
    private const int Port = 8081;
    private const int PacketSize = 14;
    private const int CheckSumSize = 2;
    private TcpListener? _listener;
    public TcpServer(WebSocketManagement webSocketManager)
    {
        _webSocketManager = webSocketManager;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _listener = new TcpListener(IPAddress.Any, Port);
        _listener.Start();
        Console.WriteLine($"TCP server started on port {Port}");

        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var client = await _listener.AcceptTcpClientAsync(stoppingToken);
                    _ = HandleClientAsync(client, stoppingToken);
                }
                catch (OperationCanceledException)
                {
                    // 服务停止时正常退出
                    break;
                }
            }
        }
        finally
        {
            _listener.Stop();
            Console.WriteLine("TCP server stopped");
        }
    }

    private async Task HandleClientAsync(TcpClient client, CancellationToken ct)
    {

        var clientId = Guid.NewGuid().ToString();
        Console.WriteLine($"Client connected: {clientId}");
        using (client)
        {
            byte[] buffer = new byte[1024];
            var stream = client.GetStream();

            while (!ct.IsCancellationRequested)
            {
                try
                {
                    int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
                    if (bytesRead > 0)
                    {
                        byte[] receivedData = new byte[bytesRead];
                        Array.Copy(buffer, receivedData, bytesRead);
                        Log.Information($"收到 {bytesRead} bytes 来自仪器.");
                        Console.WriteLine($"收到 {bytesRead} bytes 来自仪器.");

                        // 解析数据并生成应答
                        var result = await ParseData(receivedData/*, out byte[] response*/);
                        if (result.success)
                        {
                            Log.Information(result.message);
                            Console.WriteLine(result.message);
                            //响应发送
                            if (result.response.Length > 0) await stream.WriteAsync(result.response, 0, result.response.Length);
                            Log.Information($"响应发送.");
                            Console.WriteLine("响应发送.");
                        }
                        else
                        {
                            Log.Information($"Error: {result.message}");
                            Console.WriteLine($"Error: {result.message}");
                        }
                    } 
                }
                catch (IOException ex)
                {
                    Console.WriteLine($"Client {clientId} connection error: {ex.Message}");
                    return;
                }
                catch (ObjectDisposedException)
                {
                    Console.WriteLine($"Client {clientId} connection closed");
                    return;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message, $"Error processing client {clientId}");
                    return;
                }
            }
        }
    } 
    //血透
    private const int ZL_PacketLength = 14;
    public const byte ZL_START_CODE_UPLOAD = 0x55; // 血压计上传数据开始码
    private static readonly byte[] ZL_Header = { 0x55, 0xAA };

    // 解析数据包
    private async Task<(bool success, byte[] response, string message)> ParseData(byte[] buffer/*, out byte[] response*/)
    {
        //response = null; 
        // 检查前导码
        if (buffer.Length == ZL_PacketLength && buffer[0] == ZL_Header[0] && buffer[1] == ZL_Header[1])
        {//
            var result =await HemodialysisZLMonitor(buffer );
            return (result.success,new byte[0], result.message);
        }
        else
        {
            return (false, new byte[0], "前导码错误");
        }

        //return (result.success, result.message);
    } 

    /// <summary>
    /// 仪器(
    /// </summary>
    /// <param name="buffer"></param>
    /// <param name="response"></param>
    /// <returns></returns>
    private async Task<(bool success , string message)> HemodialysisZLMonitor(byte[] buffer)
    { 

        // 计算校验和(前12字节的累加和)
        ushort calculatedChecksum = 0;
        for (int i = 0; i < 12; i++)
            calculatedChecksum += buffer[i];

        // 读取数据包中的校验和(大端序)
        ushort packetChecksum = (ushort)((buffer[12] << 8) | buffer[13]);
        // 校验和验证
        if (calculatedChecksum != packetChecksum)
            throw new ArgumentException($"Checksum mismatch: calculated 0x{calculatedChecksum:X4}, received 0x{packetChecksum:X4}");

        // 解析数据
        var result = new ZL_ParsedData
        {
            DeviceType = buffer[2],
            DeviceId = (uint)((buffer[3] << 16) | (buffer[4] << 8) | buffer[5]),
            HasOtherAlarm = buffer[8] != 0,
            DataIdentifier = buffer[9]
        };

        // 处理运行模式/权值
        byte modeWeight = buffer[6];
        if (modeWeight >= 10 && modeWeight <= 17)
            result.OperationMode = GetModeName(modeWeight);
        else
            result.DataWeight = modeWeight;

        // 解析报警标志
        result.AlarmFlags = ParseAlarmFlags(buffer[7]);

        // 解析数据值(2字节)
        ushort rawValue = (ushort)((buffer[10] << 8) | buffer[11]);

        // 特殊处理有符号数据
        if (new[] { 0x0A, 0x0B, 0x0E }.Contains(result.DataIdentifier))
            rawValue = (ushort)(short)rawValue; // 保持二进制表示,后续转换为double

        // 应用权值处理
        result.DataValue = ApplyDataWeight(rawValue, result.DataWeight, result.DataIdentifier);

        // 设置数据名称和单位
        (result.DataName, result.Unit) = GetDataInfo(result.DataIdentifier);

        //return result;
         
        Console.WriteLine("Parsing successful!");
        Console.WriteLine($"设备类型: 0x{result.DeviceType:X2}");//Device Type
        Console.WriteLine($"设备id: 0x{result.DeviceId}");//十进制 Device ID   16进制:0x{result.DeviceId:X6}
        Console.WriteLine($"运行模式: {result.OperationMode ?? "N/A"}");//Operation Mode
        Console.WriteLine($"数据权值: {result.DataWeight}");//Data Weight
        Console.WriteLine($"报警标识: [漏血BloodLeak: {result.AlarmFlags.BloodLeak}, " +//Alarms
                          $"液位LiquidLevel: {result.AlarmFlags.LiquidLevel}, " +
                          $"气泡Bubble: {result.AlarmFlags.Bubble}, " +
                          $"动脉压ArterialPressure: {result.AlarmFlags.ArterialPressure}, " +
                          $"跨膜压TransmembranePressure: {result.AlarmFlags.TransmembranePressure}, " +
                          $"静脉压VenousPressure: {result.AlarmFlags.VenousPressure}, " +
                          $"温度Temperature: {result.AlarmFlags.Temperature}, " +
                          $"电导Conductivity: {result.AlarmFlags.Conductivity}]");
        Console.WriteLine($"其他报警: {result.HasOtherAlarm}");//Other Alarms
        Console.WriteLine($"数据标识: 0x{result.DataIdentifier:X2} ({result.DataName})");//数据标识
        Console.WriteLine($"数据值: {result.DataValue} {result.Unit}");//数据值

        WebSocketSend webSocketSend = new WebSocketSend { Name= result.DataName,Value= result.DataValue };
        await _webSocketManager.BroadcastAsync(Convert.ToString(result.DeviceId), webSocketSend);
        return (true,"");
    }

    private ZL_AlarmFlags ParseAlarmFlags(byte flagByte)
    {
        return new ZL_AlarmFlags
        {
            BloodLeak = (flagByte & 0x01) != 0,
            LiquidLevel = (flagByte & 0x02) != 0,
            Bubble = (flagByte & 0x04) != 0,
            ArterialPressure = (flagByte & 0x08) != 0,
            TransmembranePressure = (flagByte & 0x10) != 0,
            VenousPressure = (flagByte & 0x20) != 0,
            Temperature = (flagByte & 0x40) != 0,
            Conductivity = (flagByte & 0x80) != 0
        };
    }

    private string GetModeName(byte mode)
    {
        return mode switch
        {
            10 => "Dialysis",
            12 => "LowSuper",
            13 => "SingleSuper",
            14 => "BloodReturn",
            15 => "Precharge",
            16 => "SelfTest",
            17 => "Disinfection",
            _ => "Unknown"
        };
    }

    private double ApplyDataWeight(ushort rawValue, int weight, byte dataId)
    {
        // 特殊处理电导值(数据标识0x09)
        if (dataId == 0x09 && weight > 4)
            return rawValue; // 按整数显示

        return weight switch
        {
            0 or 15 => rawValue,          // 整数
            > 0 and <= 4 => rawValue / Math.Pow(10, weight), // 小数处理
            _ => rawValue                 // 默认按整数处理
        };
    }

    private (string name, string unit) GetDataInfo(byte dataId)
    {
        var dataMap = new Dictionary<byte, (string, string)>
    {
        { 0x01, ("dehydration", "L") },//脱水1
        { 0x02, ("currentDehydration", "L") },//当前的脱水2
        { 0x03, ("dehydrationSpeed", "L/h") },//脱水速度3
        { 0x04, ("bloodPumpFlow", "ml/min") },//血泵流量4
        { 0x05, ("auxiliaryPump", "") },//辅助泵5
        { 0x06, ("syringe", "ml/h") },//注射器6
        { 0x07, ("dialysateFlow", "") },//透析液流量7
        { 0x08, ("dialysateTemperature", "°C") },//透析液温度8
        { 0x09, ("dialysateConductivity", "mS/cm") },//透析液电导9
        { 0x0A, ("venousPressure", "") },//静脉压力A
        { 0x0B, ("transmembranePressure", "") },//跨膜压力B
        { 0x0C, ("dialyzedTime", "min") },//已透析时间C
        { 0x0D, ("remainingTime", "min") },//剩余透析时间D
        { 0x0E, ("arterialPressure", "") },//动脉压E
        { 0x0F, ("sphygmomanometerHigh", "") },//血压计测量 高压F
        { 0x10, ("sphygmomanometerLow", "") },//血压计测量 低压10
        { 0x11, ("heartRate", "bpm") }//心率11
    };

        return dataMap.TryGetValue(dataId, out var info)
            ? info
            : ("Unknown", "");
    }

    
}

WebSocket服务

public class WebSocketManagement
{
    private readonly ConcurrentDictionary<string, WebSocket> _sockets = new();

    public async Task HandleWebSocketConnectionAsync(WebSocket webSocket)
    {
        var buffer = new byte[1024 * 4];
        var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

        if (result.MessageType == WebSocketMessageType.Text)
        {
            var deviceId = Encoding.UTF8.GetString(buffer, 0, result.Count);
            _sockets[deviceId.Trim()] = webSocket;

            Console.WriteLine("socket消息:" + deviceId);
        }

        while (webSocket.State == WebSocketState.Open)
        {
            await Task.Delay(100);
        }
    }

    public async Task BroadcastAsync(string socketId, dynamic data)
    {
        var json = JsonConvert.SerializeObject(data);
        var buffer = Encoding.UTF8.GetBytes(json);
        _sockets.TryGetValue(socketId, out WebSocket socket);
        if (socket!=null&&socket.State == WebSocketState.Open)
        {
            await socket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
        }
        else
        {
            _sockets.TryRemove(socketId, out _);
        }
    }
}

测试数据发送接收

# PowerShell
$data = [byte[]](0x55, 0xAA, 0x00, 0x26, 0x35, 0xB2, 0x00, 0x00, 0x01, 0x0E, 0x00, 0x00, 0x02, 0x1B)
$client = New-Object System.Net.Sockets.TcpClient('localhost', 8081)
$stream = $client.GetStream()
$stream.Write($data, 0, $data.Length)
$client.Close()

在这里插入图片描述

使用 WebSocket 测试工具:

浏览器开发者工具

https://websocketking.com/